go-ethereum/cmd/devp2p/internal/ethtest/suite.go
2026-05-20 23:12:06 +02:00

1616 lines
48 KiB
Go

// Copyright 2020 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package ethtest
import (
"context"
"crypto/rand"
"errors"
"fmt"
"os"
"reflect"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/internal/utesting"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/holiman/uint256"
)
// Suite represents a structure used to test a node's conformance
// to the eth protocol.
type Suite struct {
Dest *enode.Node
chain *Chain
engine *EngineClient
}
// NewSuite creates and returns a new eth-test suite that can
// be used to test the given node against the given blockchain
// data.
func NewSuite(dest *enode.Node, chainDir, engineURL, jwt string) (*Suite, error) {
chain, err := NewChain(chainDir)
if err != nil {
return nil, err
}
engine, err := NewEngineClient(chainDir, engineURL, jwt)
if err != nil {
return nil, err
}
return &Suite{
Dest: dest,
chain: chain,
engine: engine,
}, nil
}
func (s *Suite) EthTests() []utesting.Test {
return []utesting.Test{
// status
{Name: "Status", Fn: s.TestStatus},
{Name: "MaliciousHandshake", Fn: s.TestMaliciousHandshake},
{Name: "BlockRangeUpdateExpired", Fn: s.TestBlockRangeUpdateHistoryExp},
{Name: "BlockRangeUpdateFuture", Fn: s.TestBlockRangeUpdateFuture},
{Name: "BlockRangeUpdateInvalid", Fn: s.TestBlockRangeUpdateInvalid},
// get block headers
{Name: "GetBlockHeaders", Fn: s.TestGetBlockHeaders},
{Name: "GetNonexistentBlockHeaders", Fn: s.TestGetNonexistentBlockHeaders},
{Name: "SimultaneousRequests", Fn: s.TestSimultaneousRequests},
{Name: "SameRequestID", Fn: s.TestSameRequestID},
{Name: "ZeroRequestID", Fn: s.TestZeroRequestID},
// get history
{Name: "GetBlockBodies", Fn: s.TestGetBlockBodies},
{Name: "GetReceipts", Fn: s.TestGetReceipts},
{Name: "GetLargeReceipts", Fn: s.TestGetLargeReceipts},
// test transactions
{Name: "LargeTxRequest", Fn: s.TestLargeTxRequest, Slow: true},
{Name: "Transaction", Fn: s.TestTransaction},
{Name: "InvalidTxs", Fn: s.TestInvalidTxs},
{Name: "NewPooledTxs", Fn: s.TestNewPooledTxs},
{Name: "BlobViolations", Fn: s.TestBlobViolations},
//todo: 4 tests below requires hive test changes
{Name: "TestBlobTxWithoutSidecar", Fn: s.TestBlobTxWithoutSidecar},
{Name: "TestBlobTxWithMismatchedSidecar", Fn: s.TestBlobTxWithMismatchedSidecar},
// test eth/72 blob txs
{Name: "BlobTxAvailabilityFailure", Fn: s.TestBlobTxAvailabilityFailure},
{Name: "GetCells", Fn: s.TestGetCells},
{Name: "BlobTxWithInvalidCells", Fn: s.TestBlobTxWithInvalidCells},
}
}
func (s *Suite) SnapTests() []utesting.Test {
return []utesting.Test{
{Name: "Status", Fn: s.TestSnapStatus},
{Name: "AccountRange", Fn: s.TestSnapGetAccountRange},
{Name: "GetByteCodes", Fn: s.TestSnapGetByteCodes},
{Name: "GetTrieNodes", Fn: s.TestSnapTrieNodes},
{Name: "GetStorageRanges", Fn: s.TestSnapGetStorageRanges},
}
}
func (s *Suite) TestStatus(t *utesting.T) {
t.Log(`This test is just a sanity check. It performs an eth protocol handshake.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatal("peering failed:", err)
}
conn.Close()
}
// headersMatch returns whether the received headers match the given request
func headersMatch(expected []*types.Header, headers []*types.Header) bool {
return reflect.DeepEqual(expected, headers)
}
func (s *Suite) TestGetBlockHeaders(t *utesting.T) {
t.Log(`This test requests block headers from the node.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatalf("peering failed: %v", err)
}
defer conn.Close()
// Send headers request.
req := &eth.GetBlockHeadersPacket{
RequestId: 33,
GetBlockHeadersRequest: &eth.GetBlockHeadersRequest{
Origin: eth.HashOrNumber{Hash: s.chain.blocks[1].Hash()},
Amount: 2,
Skip: 1,
Reverse: false,
},
}
// Read headers response.
if err := conn.Write(ethProto, eth.GetBlockHeadersMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
headers := new(eth.BlockHeadersPacket)
if err := conn.ReadMsg(ethProto, eth.BlockHeadersMsg, &headers); err != nil {
t.Fatalf("error reading msg: %v", err)
}
if got, want := headers.RequestId, req.RequestId; got != want {
t.Fatalf("unexpected request id")
}
// Check for correct headers.
expected, err := s.chain.GetHeaders(req)
if err != nil {
t.Fatalf("failed to get headers for given request: %v", err)
}
received, err := headers.List.Items()
if err != nil {
t.Fatalf("invalid headers received: %v", err)
}
if !headersMatch(expected, received) {
t.Fatalf("header mismatch: \nexpected %v \ngot %v", expected, headers)
}
}
func (s *Suite) TestGetNonexistentBlockHeaders(t *utesting.T) {
t.Log(`This test sends GetBlockHeaders requests for nonexistent blocks (using max uint64 value)
to check if the node disconnects after receiving multiple invalid requests.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatalf("peering failed: %v", err)
}
defer conn.Close()
// Create request with max uint64 value for a nonexistent block
badReq := &eth.GetBlockHeadersPacket{
GetBlockHeadersRequest: &eth.GetBlockHeadersRequest{
Origin: eth.HashOrNumber{Number: ^uint64(0)},
Amount: 1,
Skip: 0,
Reverse: false,
},
}
// Send request 10 times. Some clients are lient on the first few invalids.
for i := 0; i < 10; i++ {
badReq.RequestId = uint64(i)
if err := conn.Write(ethProto, eth.GetBlockHeadersMsg, badReq); err != nil {
if err == errDisc {
t.Fatalf("peer disconnected after %d requests", i+1)
}
t.Fatalf("write failed: %v", err)
}
}
// Check if peer disconnects at the end.
code, _, err := conn.Read()
if err == errDisc || code == discMsg {
t.Fatal("peer improperly disconnected")
}
}
func (s *Suite) TestSimultaneousRequests(t *utesting.T) {
t.Log(`This test requests blocks headers from the node, performing two requests
concurrently, with different request IDs.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatalf("peering failed: %v", err)
}
defer conn.Close()
// Create two different requests.
req1 := &eth.GetBlockHeadersPacket{
RequestId: uint64(111),
GetBlockHeadersRequest: &eth.GetBlockHeadersRequest{
Origin: eth.HashOrNumber{
Hash: s.chain.blocks[1].Hash(),
},
Amount: 2,
Skip: 1,
Reverse: false,
},
}
req2 := &eth.GetBlockHeadersPacket{
RequestId: uint64(222),
GetBlockHeadersRequest: &eth.GetBlockHeadersRequest{
Origin: eth.HashOrNumber{
Hash: s.chain.blocks[1].Hash(),
},
Amount: 4,
Skip: 1,
Reverse: false,
},
}
// Send both requests.
if err := conn.Write(ethProto, eth.GetBlockHeadersMsg, req1); err != nil {
t.Fatalf("failed to write to connection: %v", err)
}
if err := conn.Write(ethProto, eth.GetBlockHeadersMsg, req2); err != nil {
t.Fatalf("failed to write to connection: %v", err)
}
// Wait for responses.
// Note they can arrive in either order.
resp, err := collectHeaderResponses(conn, 2, func(msg *eth.BlockHeadersPacket) uint64 {
if msg.RequestId != 111 && msg.RequestId != 222 {
t.Fatalf("response with unknown request ID: %v", msg.RequestId)
}
return msg.RequestId
})
if err != nil {
t.Fatal(err)
}
// Check if headers match.
if err := s.checkHeadersAgainstChain(req1, resp[111]); err != nil {
t.Fatal(err)
}
if err := s.checkHeadersAgainstChain(req2, resp[222]); err != nil {
t.Fatal(err)
}
}
func (s *Suite) TestSameRequestID(t *utesting.T) {
t.Log(`This test requests block headers, performing two concurrent requests with the
same request ID. The node should handle the request by responding to both requests.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatalf("peering failed: %v", err)
}
defer conn.Close()
// Create two different requests with the same ID.
reqID := uint64(1234)
request1 := &eth.GetBlockHeadersPacket{
RequestId: reqID,
GetBlockHeadersRequest: &eth.GetBlockHeadersRequest{
Origin: eth.HashOrNumber{
Number: 1,
},
Amount: 2,
},
}
request2 := &eth.GetBlockHeadersPacket{
RequestId: reqID,
GetBlockHeadersRequest: &eth.GetBlockHeadersRequest{
Origin: eth.HashOrNumber{
Number: 33,
},
Amount: 3,
},
}
// Send the requests.
if err = conn.Write(ethProto, eth.GetBlockHeadersMsg, request1); err != nil {
t.Fatalf("failed to write to connection: %v", err)
}
if err = conn.Write(ethProto, eth.GetBlockHeadersMsg, request2); err != nil {
t.Fatalf("failed to write to connection: %v", err)
}
// Wait for the responses. They can arrive in either order, and we can't tell them
// apart by their request ID, so use the number of headers instead.
resp, err := collectHeaderResponses(conn, 2, func(msg *eth.BlockHeadersPacket) uint64 {
id := uint64(msg.List.Len())
if id != 2 && id != 3 {
t.Fatalf("invalid number of headers in response: %d", id)
}
return id
})
if err != nil {
t.Fatal(err)
}
// Check if headers match.
if err := s.checkHeadersAgainstChain(request1, resp[2]); err != nil {
t.Fatal(err)
}
if err := s.checkHeadersAgainstChain(request2, resp[3]); err != nil {
t.Fatal(err)
}
}
func (s *Suite) checkHeadersAgainstChain(req *eth.GetBlockHeadersPacket, resp *eth.BlockHeadersPacket) error {
received2, err := resp.List.Items()
if err != nil {
return fmt.Errorf("invalid headers in response with request ID %v (%d items): %v", resp.RequestId, resp.List.Len(), err)
}
if expected, err := s.chain.GetHeaders(req); err != nil {
return fmt.Errorf("test chain failed to get expected headers for request: %v", err)
} else if !headersMatch(expected, received2) {
return fmt.Errorf("header mismatch for request ID %v (%d items): \nexpected %v \ngot %v", resp.RequestId, resp.List.Len(), expected, resp)
}
return nil
}
// collectResponses waits for n messages of type T on the given connection.
// The messsages are collected according to the 'identity' function.
//
// This function is written in a generic way to handle
func collectHeaderResponses(conn *Conn, n int, identity func(*eth.BlockHeadersPacket) uint64) (map[uint64]*eth.BlockHeadersPacket, error) {
resp := make(map[uint64]*eth.BlockHeadersPacket, n)
for range n {
r := new(eth.BlockHeadersPacket)
if err := conn.ReadMsg(ethProto, eth.BlockHeadersMsg, r); err != nil {
return resp, fmt.Errorf("read error: %v", err)
}
id := identity(r)
if resp[id] != nil {
return resp, fmt.Errorf("duplicate response %v", r)
}
resp[id] = r
}
return resp, nil
}
func (s *Suite) TestZeroRequestID(t *utesting.T) {
t.Log(`This test sends a GetBlockHeaders message with a request-id of zero,
and expects a response.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatalf("peering failed: %v", err)
}
defer conn.Close()
req := &eth.GetBlockHeadersPacket{
GetBlockHeadersRequest: &eth.GetBlockHeadersRequest{
Origin: eth.HashOrNumber{Number: 0},
Amount: 2,
},
}
// Read headers response.
if err := conn.Write(ethProto, eth.GetBlockHeadersMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
headers := new(eth.BlockHeadersPacket)
if err := conn.ReadMsg(ethProto, eth.BlockHeadersMsg, &headers); err != nil {
t.Fatalf("error reading msg: %v", err)
}
if got, want := headers.RequestId, req.RequestId; got != want {
t.Fatalf("unexpected request id")
}
if err := s.checkHeadersAgainstChain(req, headers); err != nil {
t.Fatal(err)
}
}
func (s *Suite) TestGetBlockBodies(t *utesting.T) {
t.Log(`This test sends GetBlockBodies requests to the node for known blocks in the test chain.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatalf("peering failed: %v", err)
}
defer conn.Close()
// Create block bodies request.
req := &eth.GetBlockBodiesPacket{
RequestId: 55,
GetBlockBodiesRequest: eth.GetBlockBodiesRequest{
s.chain.blocks[54].Hash(),
s.chain.blocks[75].Hash(),
},
}
if err := conn.Write(ethProto, eth.GetBlockBodiesMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
// Wait for response.
resp := new(eth.BlockBodiesPacket)
if err := conn.ReadMsg(ethProto, eth.BlockBodiesMsg, &resp); err != nil {
t.Fatalf("error reading block bodies msg: %v", err)
}
if got, want := resp.RequestId, req.RequestId; got != want {
t.Fatalf("unexpected request id in respond", got, want)
}
if resp.List.Len() != len(req.GetBlockBodiesRequest) {
t.Fatalf("wrong bodies in response: expected %d bodies, got %d", len(req.GetBlockBodiesRequest), resp.List.Len())
}
}
func (s *Suite) TestGetReceipts(t *utesting.T) {
t.Log(`This test sends GetReceipts requests to the node for known blocks in the test chain.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatalf("peering failed: %v", err)
}
defer conn.Close()
// Find some blocks containing receipts.
var hashes = make([]common.Hash, 0, 3)
for i := range s.chain.Len() {
if s.chain.txInfo.LargeReceiptBlock != nil && uint64(i) == *s.chain.txInfo.LargeReceiptBlock {
continue
}
block := s.chain.GetBlock(i)
if len(block.Transactions()) > 0 {
hashes = append(hashes, block.Hash())
}
if len(hashes) == cap(hashes) {
break
}
}
if conn.negotiatedProtoVersion < eth.ETH70 {
// Create block bodies request.
req := &eth.GetReceiptsPacket69{
RequestId: 66,
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
}
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
// Wait for response.
resp := new(eth.ReceiptsPacket69)
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
t.Fatalf("error reading block receipts msg: %v", err)
}
if got, want := resp.RequestId, req.RequestId; got != want {
t.Fatalf("unexpected request id in respond", got, want)
}
if resp.List.Len() != len(req.GetReceiptsRequest) {
t.Fatalf("wrong receipts in response: expected %d receipts, got %d", len(req.GetReceiptsRequest), resp.List.Len())
}
} else {
// Create block bodies request.
req := &eth.GetReceiptsPacket70{
RequestId: 66,
FirstBlockReceiptIndex: 0,
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
}
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
// Wait for response.
resp := new(eth.ReceiptsPacket70)
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
t.Fatalf("error reading block receipts msg: %v", err)
}
if got, want := resp.RequestId, req.RequestId; got != want {
t.Fatalf("unexpected request id in respond", got, want)
}
if resp.List.Len() != len(req.GetReceiptsRequest) {
t.Fatalf("wrong receipts in response: expected %d receipts, got %d", len(req.GetReceiptsRequest), resp.List.Len())
}
}
}
func (s *Suite) TestGetLargeReceipts(t *utesting.T) {
t.Log(`This test sends GetReceipts requests to the node for large receipt (>10MiB) in the test chain.
This test is meaningful only if the client supports protocol version ETH70 or higher
and LargeReceiptBlock is configured in txInfo.json.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatalf("peering failed: %v", err)
}
defer conn.Close()
if conn.negotiatedProtoVersion < eth.ETH70 || s.chain.txInfo.LargeReceiptBlock == nil {
return
}
// Find block with large receipt.
// Place the large receipt block hash in the middle of the query
start := max(int(*s.chain.txInfo.LargeReceiptBlock)-2, 0)
end := min(*s.chain.txInfo.LargeReceiptBlock+2, uint64(len(s.chain.blocks)))
var blocks []common.Hash
var receiptHashes []common.Hash
var receipts []*eth.ReceiptList
for i := uint64(start); i < end; i++ {
block := s.chain.GetBlock(int(i))
blocks = append(blocks, block.Hash())
receiptHashes = append(receiptHashes, block.Header().ReceiptHash)
receipts = append(receipts, &eth.ReceiptList{})
}
incomplete := false
lastBlock := 0
for incomplete || lastBlock != len(blocks)-1 {
// Create get receipt request.
req := &eth.GetReceiptsPacket70{
RequestId: 66,
FirstBlockReceiptIndex: uint64(receipts[lastBlock].Derivable().Len()),
GetReceiptsRequest: blocks[lastBlock:],
}
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
// Wait for response.
resp := new(eth.ReceiptsPacket70)
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
t.Fatalf("error reading block receipts msg: %v", err)
}
if got, want := resp.RequestId, req.RequestId; got != want {
t.Fatalf("unexpected request id in respond, want: %d, got: %d", got, want)
}
receiptLists, _ := resp.List.Items()
for i, rc := range receiptLists {
receipts[lastBlock+i].Append(rc)
}
lastBlock += len(receiptLists) - 1
incomplete = resp.LastBlockIncomplete
}
hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(receipts))
for i := range receipts {
hashes[i] = types.DeriveSha(receipts[i].Derivable(), hasher)
}
for i, hash := range hashes {
if receiptHashes[i] != hash {
t.Fatalf("wrong receipt root: want %x, got %x", receiptHashes[i], hash)
}
}
}
// randBuf makes a random buffer size kilobytes large.
func randBuf(size int) []byte {
buf := make([]byte, size*1024)
rand.Read(buf)
return buf
}
func (s *Suite) TestMaliciousHandshake(t *utesting.T) {
t.Log(`This test tries to send malicious data during the devp2p handshake, in various ways.`)
// Write hello to client.
var (
key, _ = crypto.GenerateKey()
pub0 = crypto.FromECDSAPub(&key.PublicKey)[1:]
version = eth.ProtocolVersions[0]
)
handshakes := []*protoHandshake{
{
Version: 5,
Caps: []p2p.Cap{
{Name: string(randBuf(2)), Version: version},
},
ID: pub0,
},
{
Version: 5,
Caps: []p2p.Cap{
{Name: "eth", Version: version},
},
ID: append(pub0, byte(0)),
},
{
Version: 5,
Caps: []p2p.Cap{
{Name: "eth", Version: version},
},
ID: append(pub0, pub0...),
},
{
Version: 5,
Caps: []p2p.Cap{
{Name: "eth", Version: version},
},
ID: randBuf(2),
},
{
Version: 5,
Caps: []p2p.Cap{
{Name: string(randBuf(2)), Version: version},
},
ID: randBuf(2),
},
}
for _, handshake := range handshakes {
conn, err := s.dialAs(key)
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn.Close()
if err := conn.Write(ethProto, handshakeMsg, handshake); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
// Check that the peer disconnected
for i := 0; i < 2; i++ {
code, _, err := conn.Read()
if err != nil {
// Client may have disconnected without sending disconnect msg.
continue
}
switch code {
case discMsg:
case handshakeMsg:
// Discard one hello as Hello's are sent concurrently
continue
default:
t.Fatalf("unexpected msg: code %d", code)
}
}
}
}
func (s *Suite) TestBlockRangeUpdateInvalid(t *utesting.T) {
t.Log(`This test sends an invalid BlockRangeUpdate message to the node and expects to be disconnected.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
conn.Write(ethProto, eth.BlockRangeUpdateMsg, &eth.BlockRangeUpdatePacket{
EarliestBlock: 10,
LatestBlock: 8,
LatestBlockHash: s.chain.GetBlock(8).Hash(),
})
if code, _, err := conn.Read(); err != nil {
t.Fatalf("expected disconnect, got err: %v", err)
} else if code != discMsg {
t.Fatalf("expected disconnect message, got msg code %d", code)
}
}
func (s *Suite) TestBlockRangeUpdateFuture(t *utesting.T) {
t.Log(`This test sends a BlockRangeUpdate that is beyond the chain head.
The node should accept the update and should not disonnect.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
head := s.chain.Head().NumberU64()
var hash common.Hash
rand.Read(hash[:])
conn.Write(ethProto, eth.BlockRangeUpdateMsg, &eth.BlockRangeUpdatePacket{
EarliestBlock: head + 10,
LatestBlock: head + 50,
LatestBlockHash: hash,
})
// Ensure the node does not disconnect us.
// Just send a few ping messages.
for range 10 {
time.Sleep(100 * time.Millisecond)
if err := conn.Write(baseProto, pingMsg, []any{}); err != nil {
t.Fatal("write error:", err)
}
code, _, err := conn.Read()
switch {
case err != nil:
t.Fatal("read error:", err)
case code == discMsg:
t.Fatal("got disconnect")
case code == pongMsg:
}
}
}
func (s *Suite) TestBlockRangeUpdateHistoryExp(t *utesting.T) {
t.Log(`This test sends a BlockRangeUpdate announcing incomplete (expired) history.
The node should accept the update and should not disonnect.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
head := s.chain.Head()
conn.Write(ethProto, eth.BlockRangeUpdateMsg, &eth.BlockRangeUpdatePacket{
EarliestBlock: head.NumberU64() - 10,
LatestBlock: head.NumberU64(),
LatestBlockHash: head.Hash(),
})
// Ensure the node does not disconnect us.
// Just send a few ping messages.
for range 10 {
time.Sleep(100 * time.Millisecond)
if err := conn.Write(baseProto, pingMsg, []any{}); err != nil {
t.Fatal("write error:", err)
}
code, _, err := conn.Read()
switch {
case err != nil:
t.Fatal("read error:", err)
case code == discMsg:
t.Fatal("got disconnect")
case code == pongMsg:
}
}
}
func (s *Suite) TestTransaction(t *utesting.T) {
t.Log(`This test sends a valid transaction to the node and checks if the
transaction gets propagated.`)
// Nudge client out of syncing mode to accept pending txs.
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("failed to send next block: %v", err)
}
from, nonce := s.chain.GetSender(0)
inner := &types.DynamicFeeTx{
ChainID: s.chain.config.ChainID,
Nonce: nonce,
GasTipCap: common.Big1,
GasFeeCap: s.chain.Head().BaseFee(),
Gas: 30000,
To: &common.Address{0xaa},
Value: common.Big1,
}
tx, err := s.chain.SignTx(from, types.NewTx(inner))
if err != nil {
t.Fatalf("failed to sign tx: %v", err)
}
if err := s.sendTxs(t, []*types.Transaction{tx}); err != nil {
t.Fatal(err)
}
s.chain.IncNonce(from, 1)
}
func (s *Suite) TestInvalidTxs(t *utesting.T) {
t.Log(`This test sends several kinds of invalid transactions and checks that the node
does not propagate them.`)
// Nudge client out of syncing mode to accept pending txs.
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("failed to send next block: %v", err)
}
from, nonce := s.chain.GetSender(0)
inner := &types.DynamicFeeTx{
ChainID: s.chain.config.ChainID,
Nonce: nonce,
GasTipCap: common.Big1,
GasFeeCap: s.chain.Head().BaseFee(),
Gas: 30000,
To: &common.Address{0xaa},
}
tx, err := s.chain.SignTx(from, types.NewTx(inner))
if err != nil {
t.Fatalf("failed to sign tx: %v", err)
}
if err := s.sendTxs(t, []*types.Transaction{tx}); err != nil {
t.Fatalf("failed to send txs: %v", err)
}
s.chain.IncNonce(from, 1)
inners := []*types.DynamicFeeTx{
// Nonce already used
{
ChainID: s.chain.config.ChainID,
Nonce: nonce - 1,
GasTipCap: common.Big1,
GasFeeCap: s.chain.Head().BaseFee(),
Gas: 100000,
},
// Value exceeds balance
{
Nonce: nonce,
GasTipCap: common.Big1,
GasFeeCap: s.chain.Head().BaseFee(),
Gas: 100000,
Value: s.chain.Balance(from),
},
// Gas limit too low
{
Nonce: nonce,
GasTipCap: common.Big1,
GasFeeCap: s.chain.Head().BaseFee(),
Gas: 1337,
},
// Code size too large
{
Nonce: nonce,
GasTipCap: common.Big1,
GasFeeCap: s.chain.Head().BaseFee(),
Data: randBuf(50),
Gas: 1_000_000,
},
// Data too large
{
Nonce: nonce,
GasTipCap: common.Big1,
GasFeeCap: s.chain.Head().BaseFee(),
To: &common.Address{0xaa},
Data: randBuf(128),
Gas: 5_000_000,
},
}
var txs []*types.Transaction
for _, inner := range inners {
tx, err := s.chain.SignTx(from, types.NewTx(inner))
if err != nil {
t.Fatalf("failed to sign tx: %v", err)
}
txs = append(txs, tx)
}
if err := s.sendInvalidTxs(t, txs); err != nil {
t.Fatalf("failed to send invalid txs: %v", err)
}
}
func (s *Suite) TestLargeTxRequest(t *utesting.T) {
t.Log(`This test first send ~2000 transactions to the node, then requests them
on another peer connection using GetPooledTransactions.`)
// Nudge client out of syncing mode to accept pending txs.
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("failed to send next block: %v", err)
}
// Generate many transactions to seed target with.
var (
from, nonce = s.chain.GetSender(1)
count = 2000
txs []*types.Transaction
hashes []common.Hash
set = make(map[common.Hash]struct{})
)
for i := 0; i < count; i++ {
inner := &types.DynamicFeeTx{
ChainID: s.chain.config.ChainID,
Nonce: nonce + uint64(i),
GasTipCap: common.Big1,
GasFeeCap: s.chain.Head().BaseFee(),
Gas: 75000,
}
tx, err := s.chain.SignTx(from, types.NewTx(inner))
if err != nil {
t.Fatalf("failed to sign tx: err")
}
txs = append(txs, tx)
set[tx.Hash()] = struct{}{}
hashes = append(hashes, tx.Hash())
}
s.chain.IncNonce(from, uint64(count))
// Send txs.
if err := s.sendTxs(t, txs); err != nil {
t.Fatalf("failed to send txs: %v", err)
}
// Set up receive connection to ensure node is peered with the receiving
// connection before tx request is sent.
conn, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn.Close()
if err = conn.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
// Create and send pooled tx request.
req := &eth.GetPooledTransactionsPacket{
RequestId: 1234,
GetPooledTransactionsRequest: hashes,
}
if err = conn.Write(ethProto, eth.GetPooledTransactionsMsg, req); err != nil {
t.Fatalf("could not write to conn: %v", err)
}
// Check that all received transactions match those that were sent to node.
msg := new(eth.PooledTransactionsPacket)
if err := conn.ReadMsg(ethProto, eth.PooledTransactionsMsg, &msg); err != nil {
t.Fatalf("error reading from connection: %v", err)
}
if got, want := msg.RequestId, req.RequestId; got != want {
t.Fatalf("unexpected request id in response: got %d, want %d", got, want)
}
responseTxs, err := msg.List.Items()
if err != nil {
t.Fatalf("invalid transactions in response: %v", err)
}
for _, got := range responseTxs {
if _, exists := set[got.Hash()]; !exists {
t.Fatalf("unexpected tx received: %v", got.Hash())
}
}
}
func (s *Suite) TestNewPooledTxs(t *utesting.T) {
t.Log(`This test announces transaction hashes to the node and expects it to fetch
the transactions using a GetPooledTransactions request.`)
// Nudge client out of syncing mode to accept pending txs.
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("failed to send next block: %v", err)
}
var (
count = 50
from, nonce = s.chain.GetSender(1)
hashes = make([]common.Hash, count)
txTypes = make([]byte, count)
sizes = make([]uint32, count)
)
for i := 0; i < count; i++ {
inner := &types.DynamicFeeTx{
ChainID: s.chain.config.ChainID,
Nonce: nonce + uint64(i),
GasTipCap: common.Big1,
GasFeeCap: s.chain.Head().BaseFee(),
Gas: 75000,
}
tx, err := s.chain.SignTx(from, types.NewTx(inner))
if err != nil {
t.Fatalf("failed to sign tx: err")
}
hashes[i] = tx.Hash()
txTypes[i] = tx.Type()
sizes[i] = uint32(tx.Size())
}
s.chain.IncNonce(from, uint64(count))
// Connect to peer.
conn, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn.Close()
if err = conn.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
// Send announcement.
ann := eth.NewPooledTransactionHashesPacket72{Types: txTypes, Sizes: sizes, Hashes: hashes}
err = conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann)
if err != nil {
t.Fatalf("failed to write to connection: %v", err)
}
// Wait for GetPooledTxs request.
for {
msg, err := conn.ReadEth()
if err != nil {
t.Fatalf("failed to read eth msg: %v", err)
}
switch msg := msg.(type) {
case *eth.GetPooledTransactionsPacket:
if len(msg.GetPooledTransactionsRequest) != len(hashes) {
t.Fatalf("unexpected number of txs requested: wanted %d, got %d", len(hashes), len(msg.GetPooledTransactionsRequest))
}
return
case *eth.NewPooledTransactionHashesPacket72:
continue
case *eth.TransactionsPacket:
continue
default:
t.Fatalf("unexpected %s", pretty.Sdump(msg))
}
}
}
func makeSidecar(data ...byte) *types.BlobTxSidecar {
var (
blobs = make([]kzg4844.Blob, len(data))
commitments []kzg4844.Commitment
proofs []kzg4844.Proof
)
for i := range blobs {
blobs[i][0] = data[i]
c, _ := kzg4844.BlobToCommitment(&blobs[i])
cellProofs, _ := kzg4844.ComputeCellProofs(&blobs[i])
commitments = append(commitments, c)
proofs = append(proofs, cellProofs...)
}
return types.NewBlobTxSidecar(types.BlobSidecarVersion1, blobs, commitments, proofs)
}
func (s *Suite) makeBlobTxs(txCount, blobCount int, discriminator byte) (txs types.Transactions, blobs [][]kzg4844.Blob) {
from, nonce := s.chain.GetSender(5)
for i := 0; i < txCount; i++ {
// Make blob data, max of 2 blobs per tx.
blobdata := make([]byte, min(blobCount, 2))
for i := range blobdata {
blobdata[i] = discriminator
blobCount -= 1
}
sidecar := makeSidecar(blobdata...)
inner := &types.BlobTx{
ChainID: uint256.MustFromBig(s.chain.config.ChainID),
Nonce: nonce + uint64(i),
GasTipCap: uint256.NewInt(1),
GasFeeCap: uint256.MustFromBig(s.chain.Head().BaseFee()),
Gas: 100000,
BlobFeeCap: uint256.MustFromBig(eip4844.CalcBlobFee(s.chain.config, s.chain.Head().Header())),
BlobHashes: sidecar.BlobHashes(),
Sidecar: sidecar,
}
tx, err := s.chain.SignTx(from, types.NewTx(inner))
if err != nil {
panic("blob tx signing failed")
}
blobs = append(blobs, sidecar.Blobs)
txs = append(txs, tx.WithoutBlob())
}
return txs, blobs
}
func (s *Suite) TestBlobViolations(t *utesting.T) {
t.Log(`This test sends some invalid blob tx announcements and expects the node to disconnect.`)
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("send fcu failed: %v", err)
}
// Create blob txs for each tests with unique tx hashes.
var (
t1, _ = s.makeBlobTxs(2, 3, 0x1)
t2, _ = s.makeBlobTxs(2, 3, 0x2)
)
for _, test := range []struct {
ann eth.NewPooledTransactionHashesPacket72
resp eth.PooledTransactionsResponse
}{
// Invalid tx size.
{
ann: eth.NewPooledTransactionHashesPacket72{
Types: []byte{types.BlobTxType, types.BlobTxType},
Sizes: []uint32{uint32(t1[0].Size()), uint32(t1[1].Size() + 10)},
Hashes: []common.Hash{t1[0].Hash(), t1[1].Hash()},
Mask: *types.CustodyBitmapAll,
},
resp: eth.PooledTransactionsResponse(t1),
},
// Wrong tx type.
{
ann: eth.NewPooledTransactionHashesPacket72{
Types: []byte{types.DynamicFeeTxType, types.BlobTxType},
Sizes: []uint32{uint32(t2[0].Size()), uint32(t2[1].Size())},
Hashes: []common.Hash{t2[0].Hash(), t2[1].Hash()},
Mask: *types.CustodyBitmapAll,
},
resp: eth.PooledTransactionsResponse(t2),
},
} {
conn, err := s.dial()
if err != nil {
t.Fatalf("dial fail: %v", err)
}
if err := conn.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, test.ann); err != nil {
t.Fatalf("sending announcement failed: %v", err)
}
req := new(eth.GetPooledTransactionsPacket)
if err := conn.ReadMsg(ethProto, eth.GetPooledTransactionsMsg, req); err != nil {
t.Fatalf("reading pooled tx request failed: %v", err)
}
encTxs, _ := rlp.EncodeToRawList(test.resp)
resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, List: encTxs}
if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil {
t.Fatalf("writing pooled tx response failed: %v", err)
}
if code, _, err := conn.Read(); err != nil {
t.Fatalf("expected disconnect on blob violation, got err: %v", err)
} else if code != discMsg {
for {
code, _, err := conn.Read()
if err != nil {
t.Fatalf("expected disconnect on blob violation, got err: %v", err)
}
if code == discMsg {
break
}
switch code {
case protoOffset(ethProto) + eth.NewPooledTransactionHashesMsg,
protoOffset(ethProto) + eth.GetCellsMsg:
continue
default:
t.Fatalf("expected disconnect on blob violation, got msg code: %d", code)
}
}
}
conn.Close()
}
}
// mangleSidecar returns a copy of the given blob transaction where the sidecar
// data has been modified to produce a different commitment hash.
func mangleSidecar(tx *types.Transaction) *types.Transaction {
sidecar := tx.BlobTxSidecar()
cpy := sidecar.Copy()
// zero the first commitment to alter the sidecar hash
cpy.Commitments[0] = kzg4844.Commitment{}
return tx.WithBlobTxSidecar(cpy)
}
func (s *Suite) TestBlobTxWithoutSidecar(t *utesting.T) {
t.Log(`This test checks that a blob transaction first advertised/transmitted without blobs will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer.`)
tx, _ := s.makeBlobTxs(1, 2, 42)
badTx := tx[0].WithoutBlobTxSidecar()
s.testBadBlobTx(t, tx[0], badTx)
}
func (s *Suite) TestBlobTxWithMismatchedSidecar(t *utesting.T) {
t.Log(`This test checks that a blob transaction first advertised/transmitted without blobs, whose commitment don't correspond to the blob_versioned_hashes in the transaction, will result in the sending peer being disconnected, and the full transaction should be successfully retrieved from another peer.`)
tx, _ := s.makeBlobTxs(1, 2, 43)
badTx := mangleSidecar(tx[0])
s.testBadBlobTx(t, tx[0], badTx)
}
// readUntil reads eth protocol messages until a message of the target type is
// received. It returns an error if there is a disconnect, or if the context
// is cancelled before a message of the desired type can be read.
func readUntil[T any](ctx context.Context, conn *Conn) (*T, error) {
// First check the buffer for a previously-stashed match.
for i, msg := range conn.pending {
if t, ok := msg.(*T); ok {
conn.pending = append(conn.pending[:i], conn.pending[i+1:]...)
return t, nil
}
}
for {
select {
case <-ctx.Done():
return nil, context.Canceled
default:
}
received, err := conn.ReadEth()
if err != nil {
if err == errDisc {
return nil, errDisc
}
continue
}
if t, ok := received.(*T); ok {
return t, nil
}
conn.pending = append(conn.pending, received)
}
}
// readUntilDisconnect reads eth protocol messages until the peer disconnects.
// It returns whether the peer disconnects in the next 100ms.
func readUntilDisconnect(conn *Conn) (disconnected bool) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err := readUntil[struct{}](ctx, conn)
return err == errDisc
}
func (s *Suite) testBadBlobTx(t *utesting.T, tx *types.Transaction, badTx *types.Transaction) {
stage1, stage2, stage3 := new(sync.WaitGroup), new(sync.WaitGroup), new(sync.WaitGroup)
stage1.Add(1)
stage2.Add(1)
stage3.Add(1)
errc := make(chan error)
badPeer := func() {
// announce the correct hash from the bad peer.
// when the transaction is first requested before transmitting it from the bad peer,
// trigger step 2: connection and announcement by good peers
conn, err := s.dial()
if err != nil {
errc <- fmt.Errorf("dial fail: %v", err)
return
}
defer conn.Close()
if err := conn.peer(s.chain, nil); err != nil {
errc <- fmt.Errorf("bad peer: peering failed: %v", err)
return
}
ann := eth.NewPooledTransactionHashesPacket72{
Types: []byte{types.BlobTxType},
Sizes: []uint32{uint32(badTx.Size())},
Hashes: []common.Hash{badTx.Hash()},
Mask: *types.CustodyBitmapAll,
}
if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
errc <- fmt.Errorf("sending announcement failed: %v", err)
return
}
req, err := readUntil[eth.GetPooledTransactionsPacket](context.Background(), conn)
if err != nil {
errc <- fmt.Errorf("failed to read GetPooledTransactions message: %v", err)
return
}
stage1.Done()
stage2.Wait()
// the good peer is connected, and has announced the tx.
// proceed to send the incorrect one from the bad peer.
encTxs, _ := rlp.EncodeToRawList([]*types.Transaction{badTx})
resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, List: encTxs}
if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil {
errc <- fmt.Errorf("writing pooled tx response failed: %v", err)
return
}
if !readUntilDisconnect(conn) {
errc <- errors.New("expected bad peer to be disconnected")
return
}
stage3.Done()
}
goodPeer := func() {
stage1.Wait()
conn, err := s.dial()
if err != nil {
errc <- fmt.Errorf("dial fail: %v", err)
return
}
defer conn.Close()
if err := conn.peer(s.chain, nil); err != nil {
errc <- fmt.Errorf("peering failed: %v", err)
return
}
ann := eth.NewPooledTransactionHashesPacket72{
Types: []byte{types.BlobTxType},
Sizes: []uint32{uint32(tx.Size())},
Hashes: []common.Hash{tx.Hash()},
Mask: *types.CustodyBitmapAll,
}
if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
errc <- fmt.Errorf("sending first announcement failed: %v", err)
return
}
// wait until the bad peer has transmitted the incorrect transaction
stage2.Done()
stage3.Wait()
// the bad peer has transmitted the bad tx, and been disconnected.
// transmit the same tx but with correct sidecar from the good peer.
var req *eth.GetPooledTransactionsPacket
ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()
req, err = readUntil[eth.GetPooledTransactionsPacket](ctx, conn)
if err != nil {
errc <- fmt.Errorf("reading pooled tx request failed: %v", err)
return
}
if req.GetPooledTransactionsRequest[0] != tx.Hash() {
errc <- errors.New("requested unknown tx hash")
return
}
encTxs, _ := rlp.EncodeToRawList([]*types.Transaction{tx})
resp := eth.PooledTransactionsPacket{RequestId: req.RequestId, List: encTxs}
if err := conn.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil {
errc <- fmt.Errorf("writing pooled tx response failed: %v", err)
return
}
if readUntilDisconnect(conn) {
errc <- errors.New("unexpected disconnect")
return
}
close(errc)
}
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("send fcu failed: %v", err)
}
go goodPeer()
go badPeer()
err := <-errc
if err != nil {
t.Fatalf("%v", err)
}
}
func (s *Suite) TestBlobTxAvailabilityFailure(t *utesting.T) {
t.Log(`This test announces 4 blob txs from a single peer. With fetchProbability 0.15,
there will be at least one partial fetch (1-0.15^4). When only 1 peer announced availability,
partial fetch GetCells should never arrive. Any GetCells that does arrive must be a full fetch.`)
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("send fcu failed: %v", err)
}
txs, _ := s.makeBlobTxs(4, 4, 0x30)
conn, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn.Close()
if err := conn.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
// Announce all 4 txs from a single peer.
hashes := make([]common.Hash, len(txs))
txTypes := make([]byte, len(txs))
sizes := make([]uint32, len(txs))
for i, tx := range txs {
hashes[i] = tx.Hash()
txTypes[i] = types.BlobTxType
sizes[i] = uint32(tx.WithoutBlob().Size())
}
ann := eth.NewPooledTransactionHashesPacket72{
Types: txTypes,
Sizes: sizes,
Hashes: hashes,
Mask: *types.CustodyBitmapAll,
}
if err := conn.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
t.Fatalf("announce failed: %v", err)
}
// Read messages for a short period. Any GetCells that arrives must be
// a full fetch request (mask >= DataPerBlob), not a partial fetch.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
return
default:
}
msg, err := conn.ReadEth()
if err != nil {
if errors.Is(err, os.ErrDeadlineExceeded) {
return // timeout, test passed
}
t.Fatalf("unexpected error: %v", err)
}
switch req := msg.(type) {
case *eth.GetCellsRequestPacket:
if req.Mask.OneCount() < kzg4844.DataPerBlob {
t.Fatalf("received partial GetCells request with only %d cells from single peer announcement", req.Mask.OneCount())
}
case *eth.GetPooledTransactionsPacket:
encTxs, _ := rlp.EncodeToRawList(txs)
conn.Write(ethProto, eth.PooledTransactionsMsg, eth.PooledTransactionsPacket{
RequestId: req.RequestId,
List: encTxs,
})
}
}
}
// buildCells extracts cells at mask indices from the original tx's blobs
func buildCells(blobs []kzg4844.Blob, mask types.CustodyBitmap) []kzg4844.Cell {
allCells, _ := kzg4844.ComputeCells(blobs)
indices := mask.Indices()
result := make([]kzg4844.Cell, 0, len(blobs)*len(indices))
for b := 0; b < len(blobs); b++ {
for _, idx := range indices {
result = append(result, allCells[b*kzg4844.CellsPerBlob+int(idx)])
}
}
return result
}
// readAnyFrom waits for a message of type T on any of the given conns
// and returns the packet and the conn it came from.
func readAnyFrom[T any](conns ...*Conn) (*T, *Conn, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
type result struct {
pkt *T
c *Conn
}
ch := make(chan result, len(conns))
errCh := make(chan error, len(conns))
for _, c := range conns {
go func(c *Conn) {
pkt, err := readUntil[T](ctx, c)
if err != nil {
if !errors.Is(err, context.Canceled) {
errCh <- err
}
return
}
ch <- result{pkt, c}
}(c)
}
select {
case r := <-ch:
return r.pkt, r.c, nil
case err := <-errCh:
return nil, nil, err
}
}
func (s *Suite) TestGetCells(t *utesting.T) {
t.Log(`This test checks that blob tx announcements trigger GetCells requests,
and that providing valid cells causes the tx to enter the pool.`)
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("send fcu failed: %v", err)
}
txs, blobs := s.makeBlobTxs(1, 1, 0x31)
tx := txs[0]
blob := blobs[0]
// Two peers ensure GetCells arrives regardless of full/partial fetch path.
conn1, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn1.Close()
if err := conn1.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
conn2, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn2.Close()
if err := conn2.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
ann := eth.NewPooledTransactionHashesPacket72{
Types: []byte{types.BlobTxType},
Sizes: []uint32{uint32(tx.Size())},
Hashes: []common.Hash{tx.Hash()},
Mask: *types.CustodyBitmapAll,
}
if err := conn1.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
t.Fatalf("conn1 announce failed: %v", err)
}
if err := conn2.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
t.Fatalf("conn2 announce failed: %v", err)
}
// Wait for GetPooledTransactions on either conn, respond with tx (without blobs).
pooledReq, pc, err := readAnyFrom[eth.GetPooledTransactionsPacket](conn1, conn2)
if err != nil {
t.Fatalf("failed to read GetPooledTransactions: %v", err)
}
encTxs, _ := rlp.EncodeToRawList([]*types.Transaction{tx})
resp := eth.PooledTransactionsPacket{RequestId: pooledReq.RequestId, List: encTxs}
if err := pc.Write(ethProto, eth.PooledTransactionsMsg, resp); err != nil {
t.Fatalf("writing pooled tx response failed: %v", err)
}
// Wait for GetCells request on either conn.
cellsReq, cc, err := readAnyFrom[eth.GetCellsRequestPacket](conn1, conn2)
if err != nil {
t.Fatalf("failed to read GetCells: %v", err)
}
if len(cellsReq.Hashes) == 0 || cellsReq.Hashes[0] != tx.Hash() {
t.Fatalf("GetCells for wrong hash: %v", cellsReq.Hashes)
}
// Respond with valid cells matching the requested mask.
cells := buildCells(blob, cellsReq.Mask)
cellsResp := eth.CellsPacket{
RequestId: cellsReq.RequestId,
CellsResponse: eth.CellsResponse{
Hashes: []common.Hash{tx.Hash()},
Cells: [][]kzg4844.Cell{cells},
Mask: cellsReq.Mask,
},
}
if err := cc.Write(ethProto, eth.CellsMsg, cellsResp); err != nil {
t.Fatalf("writing cells response failed: %v", err)
}
// Either peer should not be disconnected after providing valid data.
if readUntilDisconnect(cc) {
t.Fatalf("unexpected disconnect on cells-providing peer")
}
}
func (s *Suite) TestBlobTxWithInvalidCells(t *utesting.T) {
t.Log(`This test checks that a peer responding to GetCells with invalid cells is disconnected,
while the other peer is not.`)
if err := s.engine.sendForkchoiceUpdated(); err != nil {
t.Fatalf("send fcu failed: %v", err)
}
txs, blobs := s.makeBlobTxs(1, 1, 0x32)
tx := txs[0]
blob := blobs[0]
conn1, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn1.Close()
if err := conn1.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
conn2, err := s.dial()
if err != nil {
t.Fatalf("dial failed: %v", err)
}
defer conn2.Close()
if err := conn2.peer(s.chain, nil); err != nil {
t.Fatalf("peering failed: %v", err)
}
ann := eth.NewPooledTransactionHashesPacket72{
Types: []byte{types.BlobTxType},
Sizes: []uint32{uint32(tx.Size())},
Hashes: []common.Hash{tx.Hash()},
Mask: *types.CustodyBitmapAll,
}
if err := conn1.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
t.Fatalf("conn1 announce failed: %v", err)
}
if err := conn2.Write(ethProto, eth.NewPooledTransactionHashesMsg, ann); err != nil {
t.Fatalf("conn2 announce failed: %v", err)
}
pooledReq, pc, err := readAnyFrom[eth.GetPooledTransactionsPacket](conn1, conn2)
if err != nil {
t.Fatalf("failed to read GetPooledTransactions: %v", err)
}
encTxs, _ := rlp.EncodeToRawList([]*types.Transaction{tx})
if err := pc.Write(ethProto, eth.PooledTransactionsMsg,
eth.PooledTransactionsPacket{RequestId: pooledReq.RequestId, List: encTxs}); err != nil {
t.Fatalf("writing pooled tx response failed: %v", err)
}
cellsReq, cc, err := readAnyFrom[eth.GetCellsRequestPacket](conn1, conn2)
if err != nil {
t.Fatalf("failed to read GetCells: %v", err)
}
// Respond with corrupted cells (all zero bytes).
blobCount := len(blob)
corrupted := make([]kzg4844.Cell, blobCount*cellsReq.Mask.OneCount())
badResp := eth.CellsPacket{
RequestId: cellsReq.RequestId,
CellsResponse: eth.CellsResponse{
Hashes: []common.Hash{tx.Hash()},
Cells: [][]kzg4844.Cell{corrupted},
Mask: cellsReq.Mask,
},
}
if err := cc.Write(ethProto, eth.CellsMsg, badResp); err != nil {
t.Fatalf("writing bad cells response failed: %v", err)
}
// The peer that sent corrupted cells must be disconnected.
if !readUntilDisconnect(cc) {
t.Fatalf("expected peer to be disconnected after invalid cells")
}
// The innocent peer must stay connected.
otherConn := conn1
if cc == conn1 {
otherConn = conn2
}
if readUntilDisconnect(otherConn) {
t.Fatalf("innocent peer should not be disconnected")
}
}