diff --git a/cmd/devp2p/internal/ethtest/conn.go b/cmd/devp2p/internal/ethtest/conn.go index b555b14784..a7bc70cbf5 100644 --- a/cmd/devp2p/internal/ethtest/conn.go +++ b/cmd/devp2p/internal/ethtest/conn.go @@ -130,11 +130,16 @@ func (c *Conn) Write(proto Proto, code uint64, msg any) error { return err } +var errDisc error = fmt.Errorf("disconnect") + // ReadEth reads an Eth sub-protocol wire message. func (c *Conn) ReadEth() (any, error) { c.SetReadDeadline(time.Now().Add(timeout)) for { code, data, _, err := c.Conn.Read() + if code == discMsg { + return nil, errDisc + } if err != nil { return nil, err } diff --git a/cmd/devp2p/internal/ethtest/suite.go b/cmd/devp2p/internal/ethtest/suite.go index 2f1731b60c..8ebbe2a05d 100644 --- a/cmd/devp2p/internal/ethtest/suite.go +++ b/cmd/devp2p/internal/ethtest/suite.go @@ -17,8 +17,12 @@ package ethtest import ( + "context" "crypto/rand" + "fmt" "reflect" + "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/misc/eip4844" @@ -79,6 +83,8 @@ func (s *Suite) EthTests() []utesting.Test { {Name: "InvalidTxs", Fn: s.TestInvalidTxs}, {Name: "NewPooledTxs", Fn: s.TestNewPooledTxs}, {Name: "BlobViolations", Fn: s.TestBlobViolations}, + {Name: "TestBlobTxWithoutSidecar", Fn: s.TestBlobTxWithoutSidecar}, + {Name: "TestBlobTxWithMismatchedSidecar", Fn: s.TestBlobTxWithMismatchedSidecar}, } } @@ -825,3 +831,194 @@ func (s *Suite) TestBlobViolations(t *utesting.T) { conn.Close() } } + +// mangleSidecar returns a copy of the given blob transaction where the sidecar +// data has been modified to produce a different commitment hash. +func mangleSidecar(tx *types.Transaction) *types.Transaction { + sidecar := tx.BlobTxSidecar() + copy := types.BlobTxSidecar{ + Blobs: append([]kzg4844.Blob{}, sidecar.Blobs...), + Commitments: append([]kzg4844.Commitment{}, sidecar.Commitments...), + Proofs: append([]kzg4844.Proof{}, sidecar.Proofs...), + } + // zero the first commitment to alter the sidecar hash + copy.Commitments[0] = kzg4844.Commitment{} + return tx.WithBlobTxSidecar(©) +} + +func (s *Suite) TestBlobTxWithoutSidecar(t *utesting.T) { + t.Log(`This test checks that a blob transaction first advertised/transmitted without blobs will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer.`) + tx := s.makeBlobTxs(1, 2, 42)[0] + badTx := tx.WithoutBlobTxSidecar() + s.testBadBlobTx(t, tx, badTx) +} + +func (s *Suite) TestBlobTxWithMismatchedSidecar(t *utesting.T) { + t.Log(`This test checks that a blob transaction first advertised/transmitted without blobs, whose commitment don't correspond to the blob_versioned_hashes in the transaction, will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer.`) + tx := s.makeBlobTxs(1, 2, 43)[0] + badTx := mangleSidecar(tx) + s.testBadBlobTx(t, tx, badTx) +} + +// readUntil reads eth protocol messages until a message of the target type is +// received. It returns an error if there is a disconnect, or if the context +// is cancelled before a message of the desired type can be read. +func readUntil[T any](ctx context.Context, conn *Conn) (*T, error) { + for { + select { + case <-ctx.Done(): + return nil, context.Canceled + default: + } + received, err := conn.ReadEth() + if err != nil { + if err == errDisc { + return nil, errDisc + } + continue + } + + switch res := received.(type) { + case *T: + return res, nil + } + } +} + +// readUntilDisconnect reads eth protocol messages until the peer disconnects. +// It returns whether the peer disconnects in the next 100ms. +func readUntilDisconnect(conn *Conn) (disconnected bool) { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + _, err := readUntil[struct{}](ctx, conn) + return err == errDisc +} + +func (s *Suite) testBadBlobTx(t *utesting.T, tx *types.Transaction, badTx *types.Transaction) { + stage1, stage2, stage3 := new(sync.WaitGroup), new(sync.WaitGroup), new(sync.WaitGroup) + stage1.Add(1) + stage2.Add(1) + stage3.Add(1) + + errc := make(chan error) + + badPeer := func() { + // announce the correct hash from the bad peer. + // when the transaction is first requested before transmitting it from the bad peer, + // trigger step 2: connection and announcement by good peers + + conn, err := s.dial() + if err != nil { + errc <- fmt.Errorf("dial fail: %v", err) + return + } + defer conn.Close() + + if err := conn.peer(s.chain, nil); err != nil { + errc <- fmt.Errorf("bad peer: peering failed: %v", err) + return + } + + ann := eth.NewPooledTransactionHashesPacket{ + Types: []byte{types.BlobTxType}, + Sizes: []uint32{uint32(badTx.Size())}, + Hashes: []common.Hash{badTx.Hash()}, + } + + if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil { + errc <- fmt.Errorf("sending announcement failed: %v", err) + return + } + + req, err := readUntil[eth.GetPooledTransactionsPacket](context.Background(), conn) + if err != nil { + errc <- fmt.Errorf("failed to read GetPooledTransactions message: %v", err) + return + } + + stage1.Done() + stage2.Wait() + + // the good peer is connected, and has announced the tx. + // proceed to send the incorrect one from the bad peer. + + resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, PooledTransactionsResponse: eth.PooledTransactionsResponse(types.Transactions{badTx})} + if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil { + errc <- fmt.Errorf("writing pooled tx response failed: %v", err) + return + } + if !readUntilDisconnect(conn) { + errc <- fmt.Errorf("expected bad peer to be disconnected") + return + } + stage3.Done() + } + + goodPeer := func() { + stage1.Wait() + + conn, err := s.dial() + if err != nil { + errc <- fmt.Errorf("dial fail: %v", err) + return + } + defer conn.Close() + + if err := conn.peer(s.chain, nil); err != nil { + errc <- fmt.Errorf("peering failed: %v", err) + return + } + + ann := eth.NewPooledTransactionHashesPacket{ + Types: []byte{types.BlobTxType}, + Sizes: []uint32{uint32(tx.Size())}, + Hashes: []common.Hash{tx.Hash()}, + } + + if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil { + errc <- fmt.Errorf("sending announcement failed: %v", err) + return + } + + // wait until the bad peer has transmitted the incorrect transaction + stage2.Done() + stage3.Wait() + + // the bad peer has transmitted the bad tx, and been disconnected. + // transmit the same tx but with correct sidecar from the good peer. + + var req *eth.GetPooledTransactionsPacket + req, err = readUntil[eth.GetPooledTransactionsPacket](context.Background(), conn) + if err != nil { + errc <- fmt.Errorf("reading pooled tx request failed: %v", err) + return + } + + if req.GetPooledTransactionsRequest[0] != tx.Hash() { + errc <- fmt.Errorf("requested unknown tx hash") + return + } + + resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, PooledTransactionsResponse: eth.PooledTransactionsResponse(types.Transactions{tx})} + if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil { + errc <- fmt.Errorf("writing pooled tx response failed: %v", err) + return + } + if readUntilDisconnect(conn) { + errc <- fmt.Errorf("unexpected disconnect") + return + } + close(errc) + } + + if err := s.engine.sendForkchoiceUpdated(); err != nil { + t.Fatalf("send fcu failed: %v", err) + } + + go goodPeer() + go badPeer() + err := <-errc + if err != nil { + t.Fatalf("%v", err) + } +} diff --git a/core/txpool/validation.go b/core/txpool/validation.go index 9565c77def..8747724247 100644 --- a/core/txpool/validation.go +++ b/core/txpool/validation.go @@ -17,7 +17,6 @@ package txpool import ( - "crypto/sha256" "errors" "fmt" "math/big" @@ -170,20 +169,11 @@ func validateBlobSidecar(hashes []common.Hash, sidecar *types.BlobTxSidecar) err if len(sidecar.Blobs) != len(hashes) { return fmt.Errorf("invalid number of %d blobs compared to %d blob hashes", len(sidecar.Blobs), len(hashes)) } - if len(sidecar.Commitments) != len(hashes) { - return fmt.Errorf("invalid number of %d blob commitments compared to %d blob hashes", len(sidecar.Commitments), len(hashes)) - } if len(sidecar.Proofs) != len(hashes) { return fmt.Errorf("invalid number of %d blob proofs compared to %d blob hashes", len(sidecar.Proofs), len(hashes)) } - // Blob quantities match up, validate that the provers match with the - // transaction hash before getting to the cryptography - hasher := sha256.New() - for i, vhash := range hashes { - computed := kzg4844.CalcBlobHashV1(hasher, &sidecar.Commitments[i]) - if vhash != computed { - return fmt.Errorf("blob %d: computed hash %#x mismatches transaction one %#x", i, computed, vhash) - } + if err := sidecar.ValidateBlobCommitmentHashes(hashes); err != nil { + return err } // Blob commitments match with the hashes in the transaction, verify the // blobs themselves via KZG diff --git a/core/types/tx_blob.go b/core/types/tx_blob.go index 88251ab957..32401db101 100644 --- a/core/types/tx_blob.go +++ b/core/types/tx_blob.go @@ -19,6 +19,7 @@ package types import ( "bytes" "crypto/sha256" + "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -85,6 +86,22 @@ func (sc *BlobTxSidecar) encodedSize() uint64 { return rlp.ListSize(blobs) + rlp.ListSize(commitments) + rlp.ListSize(proofs) } +// ValidateBlobCommitmentHashes checks whether the given hashes correspond to the +// commitments in the sidecar +func (sc *BlobTxSidecar) ValidateBlobCommitmentHashes(hashes []common.Hash) error { + if len(sc.Commitments) != len(hashes) { + return fmt.Errorf("invalid number of %d blob commitments compared to %d blob hashes", len(sc.Commitments), len(hashes)) + } + hasher := sha256.New() + for i, vhash := range hashes { + computed := kzg4844.CalcBlobHashV1(hasher, &sc.Commitments[i]) + if vhash != computed { + return fmt.Errorf("blob %d: computed hash %#x mismatches transaction one %#x", i, computed, vhash) + } + } + return nil +} + // blobTxWithBlobs is used for encoding of transactions when blobs are present. type blobTxWithBlobs struct { BlobTx *BlobTx diff --git a/eth/handler_eth.go b/eth/handler_eth.go index b2cd52a221..11742b14ad 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -69,6 +69,19 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { return h.txFetcher.Enqueue(peer.ID(), *packet, false) case *eth.PooledTransactionsResponse: + // If we receive any blob transactions missing sidecars, or with + // sidecars that don't correspond to the versioned hashes reported + // in the header, disconnect from the sending peer. + for _, tx := range *packet { + if tx.Type() == types.BlobTxType { + if tx.BlobTxSidecar() == nil { + return errors.New("received sidecar-less blob transaction") + } + if err := tx.BlobTxSidecar().ValidateBlobCommitmentHashes(tx.BlobHashes()); err != nil { + return err + } + } + } return h.txFetcher.Enqueue(peer.ID(), *packet, true) default: