From d2bbde2f2d6b02152c2d102e0639eae8ce89f319 Mon Sep 17 00:00:00 2001 From: jwasinger Date: Sat, 1 Mar 2025 14:10:38 +0100 Subject: [PATCH] eth: check blob transaction validity on the peer goroutine when received (#31219) This ensures that if we receive a blob transaction announcement where we cannot link the tx to the sidecar commitments, we will drop the sending peer. This check is added in the protocol handler for the PooledTransactions message. Tests for this have also been added in the cross-client "eth" protocol test suite. --------- Co-authored-by: Felix Lange --- cmd/devp2p/internal/ethtest/conn.go | 5 + cmd/devp2p/internal/ethtest/suite.go | 197 +++++++++++++++++++++++++++ core/txpool/validation.go | 14 +- core/types/tx_blob.go | 17 +++ eth/handler_eth.go | 13 ++ 5 files changed, 234 insertions(+), 12 deletions(-) diff --git a/cmd/devp2p/internal/ethtest/conn.go b/cmd/devp2p/internal/ethtest/conn.go index b555b14784..a7bc70cbf5 100644 --- a/cmd/devp2p/internal/ethtest/conn.go +++ b/cmd/devp2p/internal/ethtest/conn.go @@ -130,11 +130,16 @@ func (c *Conn) Write(proto Proto, code uint64, msg any) error { return err } +var errDisc error = fmt.Errorf("disconnect") + // ReadEth reads an Eth sub-protocol wire message. func (c *Conn) ReadEth() (any, error) { c.SetReadDeadline(time.Now().Add(timeout)) for { code, data, _, err := c.Conn.Read() + if code == discMsg { + return nil, errDisc + } if err != nil { return nil, err } diff --git a/cmd/devp2p/internal/ethtest/suite.go b/cmd/devp2p/internal/ethtest/suite.go index 2f1731b60c..8ebbe2a05d 100644 --- a/cmd/devp2p/internal/ethtest/suite.go +++ b/cmd/devp2p/internal/ethtest/suite.go @@ -17,8 +17,12 @@ package ethtest import ( + "context" "crypto/rand" + "fmt" "reflect" + "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/misc/eip4844" @@ -79,6 +83,8 @@ func (s *Suite) EthTests() []utesting.Test { {Name: "InvalidTxs", Fn: s.TestInvalidTxs}, {Name: "NewPooledTxs", Fn: s.TestNewPooledTxs}, {Name: "BlobViolations", Fn: s.TestBlobViolations}, + {Name: "TestBlobTxWithoutSidecar", Fn: s.TestBlobTxWithoutSidecar}, + {Name: "TestBlobTxWithMismatchedSidecar", Fn: s.TestBlobTxWithMismatchedSidecar}, } } @@ -825,3 +831,194 @@ func (s *Suite) TestBlobViolations(t *utesting.T) { conn.Close() } } + +// mangleSidecar returns a copy of the given blob transaction where the sidecar +// data has been modified to produce a different commitment hash. +func mangleSidecar(tx *types.Transaction) *types.Transaction { + sidecar := tx.BlobTxSidecar() + copy := types.BlobTxSidecar{ + Blobs: append([]kzg4844.Blob{}, sidecar.Blobs...), + Commitments: append([]kzg4844.Commitment{}, sidecar.Commitments...), + Proofs: append([]kzg4844.Proof{}, sidecar.Proofs...), + } + // zero the first commitment to alter the sidecar hash + copy.Commitments[0] = kzg4844.Commitment{} + return tx.WithBlobTxSidecar(©) +} + +func (s *Suite) TestBlobTxWithoutSidecar(t *utesting.T) { + t.Log(`This test checks that a blob transaction first advertised/transmitted without blobs will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer.`) + tx := s.makeBlobTxs(1, 2, 42)[0] + badTx := tx.WithoutBlobTxSidecar() + s.testBadBlobTx(t, tx, badTx) +} + +func (s *Suite) TestBlobTxWithMismatchedSidecar(t *utesting.T) { + t.Log(`This test checks that a blob transaction first advertised/transmitted without blobs, whose commitment don't correspond to the blob_versioned_hashes in the transaction, will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer.`) + tx := s.makeBlobTxs(1, 2, 43)[0] + badTx := mangleSidecar(tx) + s.testBadBlobTx(t, tx, badTx) +} + +// readUntil reads eth protocol messages until a message of the target type is +// received. It returns an error if there is a disconnect, or if the context +// is cancelled before a message of the desired type can be read. +func readUntil[T any](ctx context.Context, conn *Conn) (*T, error) { + for { + select { + case <-ctx.Done(): + return nil, context.Canceled + default: + } + received, err := conn.ReadEth() + if err != nil { + if err == errDisc { + return nil, errDisc + } + continue + } + + switch res := received.(type) { + case *T: + return res, nil + } + } +} + +// readUntilDisconnect reads eth protocol messages until the peer disconnects. +// It returns whether the peer disconnects in the next 100ms. +func readUntilDisconnect(conn *Conn) (disconnected bool) { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + _, err := readUntil[struct{}](ctx, conn) + return err == errDisc +} + +func (s *Suite) testBadBlobTx(t *utesting.T, tx *types.Transaction, badTx *types.Transaction) { + stage1, stage2, stage3 := new(sync.WaitGroup), new(sync.WaitGroup), new(sync.WaitGroup) + stage1.Add(1) + stage2.Add(1) + stage3.Add(1) + + errc := make(chan error) + + badPeer := func() { + // announce the correct hash from the bad peer. + // when the transaction is first requested before transmitting it from the bad peer, + // trigger step 2: connection and announcement by good peers + + conn, err := s.dial() + if err != nil { + errc <- fmt.Errorf("dial fail: %v", err) + return + } + defer conn.Close() + + if err := conn.peer(s.chain, nil); err != nil { + errc <- fmt.Errorf("bad peer: peering failed: %v", err) + return + } + + ann := eth.NewPooledTransactionHashesPacket{ + Types: []byte{types.BlobTxType}, + Sizes: []uint32{uint32(badTx.Size())}, + Hashes: []common.Hash{badTx.Hash()}, + } + + if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil { + errc <- fmt.Errorf("sending announcement failed: %v", err) + return + } + + req, err := readUntil[eth.GetPooledTransactionsPacket](context.Background(), conn) + if err != nil { + errc <- fmt.Errorf("failed to read GetPooledTransactions message: %v", err) + return + } + + stage1.Done() + stage2.Wait() + + // the good peer is connected, and has announced the tx. + // proceed to send the incorrect one from the bad peer. + + resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, PooledTransactionsResponse: eth.PooledTransactionsResponse(types.Transactions{badTx})} + if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil { + errc <- fmt.Errorf("writing pooled tx response failed: %v", err) + return + } + if !readUntilDisconnect(conn) { + errc <- fmt.Errorf("expected bad peer to be disconnected") + return + } + stage3.Done() + } + + goodPeer := func() { + stage1.Wait() + + conn, err := s.dial() + if err != nil { + errc <- fmt.Errorf("dial fail: %v", err) + return + } + defer conn.Close() + + if err := conn.peer(s.chain, nil); err != nil { + errc <- fmt.Errorf("peering failed: %v", err) + return + } + + ann := eth.NewPooledTransactionHashesPacket{ + Types: []byte{types.BlobTxType}, + Sizes: []uint32{uint32(tx.Size())}, + Hashes: []common.Hash{tx.Hash()}, + } + + if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil { + errc <- fmt.Errorf("sending announcement failed: %v", err) + return + } + + // wait until the bad peer has transmitted the incorrect transaction + stage2.Done() + stage3.Wait() + + // the bad peer has transmitted the bad tx, and been disconnected. + // transmit the same tx but with correct sidecar from the good peer. + + var req *eth.GetPooledTransactionsPacket + req, err = readUntil[eth.GetPooledTransactionsPacket](context.Background(), conn) + if err != nil { + errc <- fmt.Errorf("reading pooled tx request failed: %v", err) + return + } + + if req.GetPooledTransactionsRequest[0] != tx.Hash() { + errc <- fmt.Errorf("requested unknown tx hash") + return + } + + resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, PooledTransactionsResponse: eth.PooledTransactionsResponse(types.Transactions{tx})} + if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil { + errc <- fmt.Errorf("writing pooled tx response failed: %v", err) + return + } + if readUntilDisconnect(conn) { + errc <- fmt.Errorf("unexpected disconnect") + return + } + close(errc) + } + + if err := s.engine.sendForkchoiceUpdated(); err != nil { + t.Fatalf("send fcu failed: %v", err) + } + + go goodPeer() + go badPeer() + err := <-errc + if err != nil { + t.Fatalf("%v", err) + } +} diff --git a/core/txpool/validation.go b/core/txpool/validation.go index 9565c77def..8747724247 100644 --- a/core/txpool/validation.go +++ b/core/txpool/validation.go @@ -17,7 +17,6 @@ package txpool import ( - "crypto/sha256" "errors" "fmt" "math/big" @@ -170,20 +169,11 @@ func validateBlobSidecar(hashes []common.Hash, sidecar *types.BlobTxSidecar) err if len(sidecar.Blobs) != len(hashes) { return fmt.Errorf("invalid number of %d blobs compared to %d blob hashes", len(sidecar.Blobs), len(hashes)) } - if len(sidecar.Commitments) != len(hashes) { - return fmt.Errorf("invalid number of %d blob commitments compared to %d blob hashes", len(sidecar.Commitments), len(hashes)) - } if len(sidecar.Proofs) != len(hashes) { return fmt.Errorf("invalid number of %d blob proofs compared to %d blob hashes", len(sidecar.Proofs), len(hashes)) } - // Blob quantities match up, validate that the provers match with the - // transaction hash before getting to the cryptography - hasher := sha256.New() - for i, vhash := range hashes { - computed := kzg4844.CalcBlobHashV1(hasher, &sidecar.Commitments[i]) - if vhash != computed { - return fmt.Errorf("blob %d: computed hash %#x mismatches transaction one %#x", i, computed, vhash) - } + if err := sidecar.ValidateBlobCommitmentHashes(hashes); err != nil { + return err } // Blob commitments match with the hashes in the transaction, verify the // blobs themselves via KZG diff --git a/core/types/tx_blob.go b/core/types/tx_blob.go index 88251ab957..32401db101 100644 --- a/core/types/tx_blob.go +++ b/core/types/tx_blob.go @@ -19,6 +19,7 @@ package types import ( "bytes" "crypto/sha256" + "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -85,6 +86,22 @@ func (sc *BlobTxSidecar) encodedSize() uint64 { return rlp.ListSize(blobs) + rlp.ListSize(commitments) + rlp.ListSize(proofs) } +// ValidateBlobCommitmentHashes checks whether the given hashes correspond to the +// commitments in the sidecar +func (sc *BlobTxSidecar) ValidateBlobCommitmentHashes(hashes []common.Hash) error { + if len(sc.Commitments) != len(hashes) { + return fmt.Errorf("invalid number of %d blob commitments compared to %d blob hashes", len(sc.Commitments), len(hashes)) + } + hasher := sha256.New() + for i, vhash := range hashes { + computed := kzg4844.CalcBlobHashV1(hasher, &sc.Commitments[i]) + if vhash != computed { + return fmt.Errorf("blob %d: computed hash %#x mismatches transaction one %#x", i, computed, vhash) + } + } + return nil +} + // blobTxWithBlobs is used for encoding of transactions when blobs are present. type blobTxWithBlobs struct { BlobTx *BlobTx diff --git a/eth/handler_eth.go b/eth/handler_eth.go index b2cd52a221..11742b14ad 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -69,6 +69,19 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { return h.txFetcher.Enqueue(peer.ID(), *packet, false) case *eth.PooledTransactionsResponse: + // If we receive any blob transactions missing sidecars, or with + // sidecars that don't correspond to the versioned hashes reported + // in the header, disconnect from the sending peer. + for _, tx := range *packet { + if tx.Type() == types.BlobTxType { + if tx.BlobTxSidecar() == nil { + return errors.New("received sidecar-less blob transaction") + } + if err := tx.BlobTxSidecar().ValidateBlobCommitmentHashes(tx.BlobHashes()); err != nil { + return err + } + } + } return h.txFetcher.Enqueue(peer.ID(), *packet, true) default: