diff --git a/cmd/devp2p/internal/ethtest/conn.go b/cmd/devp2p/internal/ethtest/conn.go index 66924d8f54..7f5f6a5dd1 100644 --- a/cmd/devp2p/internal/ethtest/conn.go +++ b/cmd/devp2p/internal/ethtest/conn.go @@ -175,9 +175,9 @@ func (c *Conn) ReadEth() (any, error) { case eth.PooledTransactionsMsg: msg = new(eth.PooledTransactionsPacket) case eth.GetCellsMsg: - msg = new(eth.GetCellsRequest) + msg = new(eth.GetCellsRequestPacket) case eth.CellsMsg: - msg = new(eth.CellsResponse) + msg = new(eth.CellsPacket) default: panic(fmt.Sprintf("unhandled eth msg code %d", code)) } diff --git a/cmd/devp2p/internal/ethtest/suite.go b/cmd/devp2p/internal/ethtest/suite.go index 286872ff63..db6a10d3d4 100644 --- a/cmd/devp2p/internal/ethtest/suite.go +++ b/cmd/devp2p/internal/ethtest/suite.go @@ -21,6 +21,7 @@ import ( "crypto/rand" "errors" "fmt" + "os" "reflect" "sync" "time" @@ -91,6 +92,10 @@ func (s *Suite) EthTests() []utesting.Test { {Name: "BlobViolations", Fn: s.TestBlobViolations}, {Name: "TestBlobTxWithoutSidecar", Fn: s.TestBlobTxWithoutSidecar}, {Name: "TestBlobTxWithMismatchedSidecar", Fn: s.TestBlobTxWithMismatchedSidecar}, + // test eth/72 blob txs + {Name: "BlobTxAvailabilityFailure", Fn: s.TestBlobTxAvailabilityFailure}, + {Name: "GetCells", Fn: s.TestGetCells}, + {Name: "BlobTxWithInvalidCells", Fn: s.TestBlobTxWithInvalidCells}, } } @@ -1210,3 +1215,298 @@ func (s *Suite) testBadBlobTx(t *utesting.T, tx *types.Transaction, badTx *types t.Fatalf("%v", err) } } + +func (s *Suite) TestBlobTxAvailabilityFailure(t *utesting.T) { + t.Log(`This test announces 4 blob txs from a single peer. With fetchProbability 0.15, +there will be at least one partial fetch (1-0.15^4). When only 1 peer announced availability, +partial fetch GetCells should never arrive. Any GetCells that does arrive must be a full fetch.`) + + if err := s.engine.sendForkchoiceUpdated(); err != nil { + t.Fatalf("send fcu failed: %v", err) + } + + txs := s.makeBlobTxs(4, 4, 0x30) + + conn, err := s.dial() + if err != nil { + t.Fatalf("dial failed: %v", err) + } + defer conn.Close() + if err := conn.peer(s.chain, nil); err != nil { + t.Fatalf("peering failed: %v", err) + } + + // Announce all 4 txs from a single peer. + hashes := make([]common.Hash, len(txs)) + txTypes := make([]byte, len(txs)) + sizes := make([]uint32, len(txs)) + for i, tx := range txs { + hashes[i] = tx.Hash() + txTypes[i] = types.BlobTxType + sizes[i] = uint32(tx.WithoutBlob().Size()) + } + ann := eth.NewPooledTransactionHashesPacket72{ + Types: txTypes, + Sizes: sizes, + Hashes: hashes, + Mask: *types.CustodyBitmapAll, + } + if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil { + t.Fatalf("announce failed: %v", err) + } + + // Read messages for a short period. Any GetCells that arrives must be + // a full fetch request (mask >= DataPerBlob), not a partial fetch. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + for { + select { + case <-ctx.Done(): + return + default: + } + msg, err := conn.ReadEth() + if err != nil { + if errors.Is(err, os.ErrDeadlineExceeded) { + return // timeout, test passed + } + t.Fatalf("unexpected error: %v", err) + } + switch req := msg.(type) { + case *eth.GetCellsRequestPacket: + if req.Mask.OneCount() < kzg4844.DataPerBlob { + t.Fatalf("received partial GetCells request with only %d cells from single peer announcement", req.Mask.OneCount()) + } + case *eth.GetPooledTransactionsPacket: + var txsWithoutBlob []*types.Transaction + for _, h := range req.GetPooledTransactionsRequest { + for _, tx := range txs { + if tx.Hash() == h { + txsWithoutBlob = append(txsWithoutBlob, tx.WithoutBlob()) + } + } + } + encTxs, _ := rlp.EncodeToRawList(txsWithoutBlob) + conn.Write(ethProto, eth.PooledTransactionsMsg, eth.PooledTransactionsPacket{ + RequestId: req.RequestId, + List: encTxs, + }) + } + } +} + +// buildCells extracts cells at mask indices from the original tx's blobs +func buildCells(sidecar *types.BlobTxSidecar, mask types.CustodyBitmap) []kzg4844.Cell { + allCells, _ := kzg4844.ComputeCells(sidecar.Blobs) + indices := mask.Indices() + result := make([]kzg4844.Cell, 0, len(sidecar.Blobs)*len(indices)) + for b := 0; b < len(sidecar.Blobs); b++ { + for _, idx := range indices { + result = append(result, allCells[b*kzg4844.CellsPerBlob+int(idx)]) + } + } + return result +} + +// readAnyFrom waits for a message of type T on any of the given conns +// and returns the packet and the conn it came from. +func readAnyFrom[T any](conns ...*Conn) (*T, *Conn, error) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + type result struct { + pkt *T + c *Conn + } + ch := make(chan result, len(conns)) + errCh := make(chan error, len(conns)) + + for _, c := range conns { + go func(c *Conn) { + pkt, err := readUntil[T](ctx, c) + if err != nil { + if !errors.Is(err, context.Canceled) { + errCh <- err + } + return + } + ch <- result{pkt, c} + }(c) + } + select { + case r := <-ch: + return r.pkt, r.c, nil + case err := <-errCh: + return nil, nil, err + } +} + +func (s *Suite) TestGetCells(t *utesting.T) { + t.Log(`This test checks that blob tx announcements trigger GetCells requests, +and that providing valid cells causes the tx to enter the pool.`) + + if err := s.engine.sendForkchoiceUpdated(); err != nil { + t.Fatalf("send fcu failed: %v", err) + } + + tx := s.makeBlobTxs(1, 1, 0x31)[0] + sidecar := tx.BlobTxSidecar() + tx = tx.WithoutBlob() + + // Two peers ensure GetCells arrives regardless of full/partial fetch path. + conn1, err := s.dial() + if err != nil { + t.Fatalf("dial failed: %v", err) + } + defer conn1.Close() + if err := conn1.peer(s.chain, nil); err != nil { + t.Fatalf("peering failed: %v", err) + } + + conn2, err := s.dial() + if err != nil { + t.Fatalf("dial failed: %v", err) + } + defer conn2.Close() + if err := conn2.peer(s.chain, nil); err != nil { + t.Fatalf("peering failed: %v", err) + } + + ann := eth.NewPooledTransactionHashesPacket72{ + Types: []byte{types.BlobTxType}, + Sizes: []uint32{uint32(tx.Size())}, + Hashes: []common.Hash{tx.Hash()}, + Mask: *types.CustodyBitmapAll, + } + if err := conn1.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil { + t.Fatalf("conn1 announce failed: %v", err) + } + if err := conn2.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil { + t.Fatalf("conn2 announce failed: %v", err) + } + + // Wait for GetPooledTransactions on either conn, respond with tx (without blobs). + pooledReq, pc, err := readAnyFrom[eth.GetPooledTransactionsPacket](conn1, conn2) + if err != nil { + t.Fatalf("failed to read GetPooledTransactions: %v", err) + } + encTxs, _ := rlp.EncodeToRawList([]*types.Transaction{tx}) + resp := eth.PooledTransactionsPacket{RequestId: pooledReq.RequestId, List: encTxs} + if err := pc.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil { + t.Fatalf("writing pooled tx response failed: %v", err) + } + + // Wait for GetCells request on either conn. + cellsReq, cc, err := readAnyFrom[eth.GetCellsRequestPacket](conn1, conn2) + if err != nil { + t.Fatalf("failed to read GetCells: %v", err) + } + if len(cellsReq.Hashes) == 0 || cellsReq.Hashes[0] != tx.Hash() { + t.Fatalf("GetCells for wrong hash: %v", cellsReq.Hashes) + } + + // Respond with valid cells matching the requested mask. + cells := buildCells(sidecar, cellsReq.Mask) + cellsResp := eth.CellsPacket{ + RequestId: cellsReq.RequestId, + CellsResponse: eth.CellsResponse{ + Hashes: []common.Hash{tx.Hash()}, + Cells: [][]kzg4844.Cell{cells}, + Mask: cellsReq.Mask, + }, + } + if err := cc.Write(ethProto, eth.CellsMsg, cellsResp); err != nil { + t.Fatalf("writing cells response failed: %v", err) + } + + // Either peer should not be disconnected after providing valid data. + if readUntilDisconnect(cc) { + t.Fatalf("unexpected disconnect on cells-providing peer") + } +} + +func (s *Suite) TestBlobTxWithInvalidCells(t *utesting.T) { + t.Log(`This test checks that a peer responding to GetCells with invalid cells is disconnected, +while the other peer is not.`) + + if err := s.engine.sendForkchoiceUpdated(); err != nil { + t.Fatalf("send fcu failed: %v", err) + } + + tx := s.makeBlobTxs(1, 1, 0x32)[0].WithoutBlob() + + conn1, err := s.dial() + if err != nil { + t.Fatalf("dial failed: %v", err) + } + defer conn1.Close() + if err := conn1.peer(s.chain, nil); err != nil { + t.Fatalf("peering failed: %v", err) + } + + conn2, err := s.dial() + if err != nil { + t.Fatalf("dial failed: %v", err) + } + defer conn2.Close() + if err := conn2.peer(s.chain, nil); err != nil { + t.Fatalf("peering failed: %v", err) + } + + ann := eth.NewPooledTransactionHashesPacket72{ + Types: []byte{types.BlobTxType}, + Sizes: []uint32{uint32(tx.Size())}, + Hashes: []common.Hash{tx.Hash()}, + Mask: *types.CustodyBitmapAll, + } + if err := conn1.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil { + t.Fatalf("conn1 announce failed: %v", err) + } + if err := conn2.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil { + t.Fatalf("conn2 announce failed: %v", err) + } + + pooledReq, pc, err := readAnyFrom[eth.GetPooledTransactionsPacket](conn1, conn2) + if err != nil { + t.Fatalf("failed to read GetPooledTransactions: %v", err) + } + encTxs, _ := rlp.EncodeToRawList([]*types.Transaction{tx}) + if err := pc.Write(ethProto, eth.PooledTransactionsMsg, + eth.PooledTransactionsPacket{RequestId: pooledReq.RequestId, List: encTxs}); err != nil { + t.Fatalf("writing pooled tx response failed: %v", err) + } + + cellsReq, cc, err := readAnyFrom[eth.GetCellsRequestPacket](conn1, conn2) + if err != nil { + t.Fatalf("failed to read GetCells: %v", err) + } + + // Respond with corrupted cells (all zero bytes). + blobCount := len(tx.BlobTxSidecar().Blobs) + corrupted := make([]kzg4844.Cell, blobCount*cellsReq.Mask.OneCount()) + badResp := eth.CellsPacket{ + RequestId: cellsReq.RequestId, + CellsResponse: eth.CellsResponse{ + Hashes: []common.Hash{tx.Hash()}, + Cells: [][]kzg4844.Cell{corrupted}, + Mask: cellsReq.Mask, + }, + } + if err := cc.Write(ethProto, eth.CellsMsg, badResp); err != nil { + t.Fatalf("writing bad cells response failed: %v", err) + } + + // The peer that sent corrupted cells must be disconnected. + if !readUntilDisconnect(cc) { + t.Fatalf("expected peer to be disconnected after invalid cells") + } + + // The innocent peer must stay connected. + otherConn := conn1 + if cc == conn1 { + otherConn = conn2 + } + if readUntilDisconnect(otherConn) { + t.Fatalf("innocent peer should not be disconnected") + } +}