eth: check blob transaction validity on the peer goroutine when received (#31219)

This ensures that if we receive a blob transaction announcement where we cannot
link the tx to the sidecar commitments, we will drop the sending peer. This check
is added in the protocol handler for the PooledTransactions message.

Tests for this have also been added in the cross-client "eth" protocol test suite.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
jwasinger 2025-03-01 14:10:38 +01:00 committed by GitHub
parent ebc3232b49
commit d2bbde2f2d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 234 additions and 12 deletions

View file

@ -130,11 +130,16 @@ func (c *Conn) Write(proto Proto, code uint64, msg any) error {
return err
}
var errDisc error = fmt.Errorf("disconnect")
// ReadEth reads an Eth sub-protocol wire message.
func (c *Conn) ReadEth() (any, error) {
c.SetReadDeadline(time.Now().Add(timeout))
for {
code, data, _, err := c.Conn.Read()
if code == discMsg {
return nil, errDisc
}
if err != nil {
return nil, err
}

View file

@ -17,8 +17,12 @@
package ethtest
import (
"context"
"crypto/rand"
"fmt"
"reflect"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
@ -79,6 +83,8 @@ func (s *Suite) EthTests() []utesting.Test {
{Name: "InvalidTxs", Fn: s.TestInvalidTxs},
{Name: "NewPooledTxs", Fn: s.TestNewPooledTxs},
{Name: "BlobViolations", Fn: s.TestBlobViolations},
{Name: "TestBlobTxWithoutSidecar", Fn: s.TestBlobTxWithoutSidecar},
{Name: "TestBlobTxWithMismatchedSidecar", Fn: s.TestBlobTxWithMismatchedSidecar},
}
}
@ -825,3 +831,194 @@ func (s *Suite) TestBlobViolations(t *utesting.T) {
conn.Close()
}
}
// mangleSidecar returns a copy of the given blob transaction where the sidecar
// data has been modified to produce a different commitment hash.
func mangleSidecar(tx *types.Transaction) *types.Transaction {
sidecar := tx.BlobTxSidecar()
copy := types.BlobTxSidecar{
Blobs: append([]kzg4844.Blob{}, sidecar.Blobs...),
Commitments: append([]kzg4844.Commitment{}, sidecar.Commitments...),
Proofs: append([]kzg4844.Proof{}, sidecar.Proofs...),
}
// zero the first commitment to alter the sidecar hash
copy.Commitments[0] = kzg4844.Commitment{}
return tx.WithBlobTxSidecar(&copy)
}
func (s *Suite) TestBlobTxWithoutSidecar(t *utesting.T) {
t.Log(`This test checks that a blob transaction first advertised/transmitted without blobs will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer.`)
tx := s.makeBlobTxs(1, 2, 42)[0]
badTx := tx.WithoutBlobTxSidecar()
s.testBadBlobTx(t, tx, badTx)
}
func (s *Suite) TestBlobTxWithMismatchedSidecar(t *utesting.T) {
t.Log(`This test checks that a blob transaction first advertised/transmitted without blobs, whose commitment don't correspond to the blob_versioned_hashes in the transaction, will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer.`)
tx := s.makeBlobTxs(1, 2, 43)[0]
badTx := mangleSidecar(tx)
s.testBadBlobTx(t, tx, badTx)
}
// readUntil reads eth protocol messages until a message of the target type is
// received. It returns an error if there is a disconnect, or if the context
// is cancelled before a message of the desired type can be read.
func readUntil[T any](ctx context.Context, conn *Conn) (*T, error) {
for {
select {
case <-ctx.Done():
return nil, context.Canceled
default:
}
received, err := conn.ReadEth()
if err != nil {
if err == errDisc {
return nil, errDisc
}
continue
}
switch res := received.(type) {
case *T:
return res, nil
}
}
}
// readUntilDisconnect reads eth protocol messages until the peer disconnects.
// It returns whether the peer disconnects in the next 100ms.
func readUntilDisconnect(conn *Conn) (disconnected bool) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err := readUntil[struct{}](ctx, conn)
return err == errDisc
}
func (s *Suite) testBadBlobTx(t *utesting.T, tx *types.Transaction, badTx *types.Transaction) {
stage1, stage2, stage3 := new(sync.WaitGroup), new(sync.WaitGroup), new(sync.WaitGroup)
stage1.Add(1)
stage2.Add(1)
stage3.Add(1)
errc := make(chan error)
badPeer := func() {
// announce the correct hash from the bad peer.
// when the transaction is first requested before transmitting it from the bad peer,
// trigger step 2: connection and announcement by good peers
conn, err := s.dial()
if err != nil {
errc <- fmt.Errorf("dial fail: %v", err)
return
}
defer conn.Close()
if err := conn.peer(s.chain, nil); err != nil {
errc <- fmt.Errorf("bad peer: peering failed: %v", err)
return
}
ann := eth.NewPooledTransactionHashesPacket{
Types: []byte{types.BlobTxType},
Sizes: []uint32{uint32(badTx.Size())},
Hashes: []common.Hash{badTx.Hash()},
}
if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
errc <- fmt.Errorf("sending announcement failed: %v", err)
return
}
req, err := readUntil[eth.GetPooledTransactionsPacket](context.Background(), conn)
if err != nil {
errc <- fmt.Errorf("failed to read GetPooledTransactions message: %v", err)
return
}
stage1.Done()
stage2.Wait()
// the good peer is connected, and has announced the tx.
// proceed to send the incorrect one from the bad peer.
resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, PooledTransactionsResponse: eth.PooledTransactionsResponse(types.Transactions{badTx})}
if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil {
errc <- fmt.Errorf("writing pooled tx response failed: %v", err)
return
}
if !readUntilDisconnect(conn) {
errc <- fmt.Errorf("expected bad peer to be disconnected")
return
}
stage3.Done()
}
goodPeer := func() {
stage1.Wait()
conn, err := s.dial()
if err != nil {
errc <- fmt.Errorf("dial fail: %v", err)
return
}
defer conn.Close()
if err := conn.peer(s.chain, nil); err != nil {
errc <- fmt.Errorf("peering failed: %v", err)
return
}
ann := eth.NewPooledTransactionHashesPacket{
Types: []byte{types.BlobTxType},
Sizes: []uint32{uint32(tx.Size())},
Hashes: []common.Hash{tx.Hash()},
}
if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
errc <- fmt.Errorf("sending announcement failed: %v", err)
return
}
// wait until the bad peer has transmitted the incorrect transaction
stage2.Done()
stage3.Wait()
// the bad peer has transmitted the bad tx, and been disconnected.
// transmit the same tx but with correct sidecar from the good peer.
var req *eth.GetPooledTransactionsPacket
req, err = readUntil[eth.GetPooledTransactionsPacket](context.Background(), conn)
if err != nil {
errc <- fmt.Errorf("reading pooled tx request failed: %v", err)
return
}
if req.GetPooledTransactionsRequest[0] != tx.Hash() {
errc <- fmt.Errorf("requested unknown tx hash")
return
}
resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, PooledTransactionsResponse: eth.PooledTransactionsResponse(types.Transactions{tx})}
if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil {
errc <- fmt.Errorf("writing pooled tx response failed: %v", err)
return
}
if readUntilDisconnect(conn) {
errc <- fmt.Errorf("unexpected disconnect")
return
}
close(errc)
}
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("send fcu failed: %v", err)
}
go goodPeer()
go badPeer()
err := <-errc
if err != nil {
t.Fatalf("%v", err)
}
}

View file

@ -17,7 +17,6 @@
package txpool
import (
"crypto/sha256"
"errors"
"fmt"
"math/big"
@ -170,20 +169,11 @@ func validateBlobSidecar(hashes []common.Hash, sidecar *types.BlobTxSidecar) err
if len(sidecar.Blobs) != len(hashes) {
return fmt.Errorf("invalid number of %d blobs compared to %d blob hashes", len(sidecar.Blobs), len(hashes))
}
if len(sidecar.Commitments) != len(hashes) {
return fmt.Errorf("invalid number of %d blob commitments compared to %d blob hashes", len(sidecar.Commitments), len(hashes))
}
if len(sidecar.Proofs) != len(hashes) {
return fmt.Errorf("invalid number of %d blob proofs compared to %d blob hashes", len(sidecar.Proofs), len(hashes))
}
// Blob quantities match up, validate that the provers match with the
// transaction hash before getting to the cryptography
hasher := sha256.New()
for i, vhash := range hashes {
computed := kzg4844.CalcBlobHashV1(hasher, &sidecar.Commitments[i])
if vhash != computed {
return fmt.Errorf("blob %d: computed hash %#x mismatches transaction one %#x", i, computed, vhash)
}
if err := sidecar.ValidateBlobCommitmentHashes(hashes); err != nil {
return err
}
// Blob commitments match with the hashes in the transaction, verify the
// blobs themselves via KZG

View file

@ -19,6 +19,7 @@ package types
import (
"bytes"
"crypto/sha256"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
@ -85,6 +86,22 @@ func (sc *BlobTxSidecar) encodedSize() uint64 {
return rlp.ListSize(blobs) + rlp.ListSize(commitments) + rlp.ListSize(proofs)
}
// ValidateBlobCommitmentHashes checks whether the given hashes correspond to the
// commitments in the sidecar
func (sc *BlobTxSidecar) ValidateBlobCommitmentHashes(hashes []common.Hash) error {
if len(sc.Commitments) != len(hashes) {
return fmt.Errorf("invalid number of %d blob commitments compared to %d blob hashes", len(sc.Commitments), len(hashes))
}
hasher := sha256.New()
for i, vhash := range hashes {
computed := kzg4844.CalcBlobHashV1(hasher, &sc.Commitments[i])
if vhash != computed {
return fmt.Errorf("blob %d: computed hash %#x mismatches transaction one %#x", i, computed, vhash)
}
}
return nil
}
// blobTxWithBlobs is used for encoding of transactions when blobs are present.
type blobTxWithBlobs struct {
BlobTx *BlobTx

View file

@ -69,6 +69,19 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
return h.txFetcher.Enqueue(peer.ID(), *packet, false)
case *eth.PooledTransactionsResponse:
// If we receive any blob transactions missing sidecars, or with
// sidecars that don't correspond to the versioned hashes reported
// in the header, disconnect from the sending peer.
for _, tx := range *packet {
if tx.Type() == types.BlobTxType {
if tx.BlobTxSidecar() == nil {
return errors.New("received sidecar-less blob transaction")
}
if err := tx.BlobTxSidecar().ValidateBlobCommitmentHashes(tx.BlobHashes()); err != nil {
return err
}
}
}
return h.txFetcher.Enqueue(peer.ID(), *packet, true)
default: