eth, cmd, rlp: implement EIP-7975 (eth/70 - partial block receipt lists)

This commit is contained in:
Jared Wasinger 2026-03-04 20:16:28 -05:00
parent 2ce7a06cbc
commit 9fdd74be00
28 changed files with 9896 additions and 8815 deletions

View file

@ -51,6 +51,12 @@ type Chain struct {
state map[common.Address]state.DumpAccount // state of head block
senders map[common.Address]*senderInfo
config *params.ChainConfig
txInfo txInfo
}
type txInfo struct {
LargeReceiptBlock *uint64 `json:"tx-largereceipt"`
}
// NewChain takes the given chain.rlp file, and decodes and returns
@ -74,12 +80,20 @@ func NewChain(dir string) (*Chain, error) {
if err != nil {
return nil, err
}
var txInfo txInfo
err = common.LoadJSON(filepath.Join(dir, "txinfo.json"), &txInfo)
if err != nil {
return nil, err
}
return &Chain{
genesis: gen,
blocks: blocks,
state: state,
senders: accounts,
config: gen.Config,
txInfo: txInfo,
}, nil
}

View file

@ -66,9 +66,10 @@ func (s *Suite) dialAs(key *ecdsa.PrivateKey) (*Conn, error) {
return nil, err
}
conn.caps = []p2p.Cap{
{Name: "eth", Version: 70},
{Name: "eth", Version: 69},
}
conn.ourHighestProtoVersion = 69
conn.ourHighestProtoVersion = 70
return &conn, nil
}
@ -335,10 +336,12 @@ loop:
if have, want := msg.ForkID, chain.ForkID(); !reflect.DeepEqual(have, want) {
return fmt.Errorf("wrong fork ID in status: have %v, want %v", have, want)
}
if have, want := msg.ProtocolVersion, c.ourHighestProtoVersion; have != uint32(want) {
return fmt.Errorf("wrong protocol version: have %v, want %v", have, want)
for _, cap := range c.caps {
if cap.Name == "eth" && cap.Version == uint(msg.ProtocolVersion) {
break loop
}
}
break loop
return fmt.Errorf("wrong protocol version: have %v, want %v", msg.ProtocolVersion, c.caps)
case discMsg:
var msg []p2p.DiscReason
if rlp.DecodeBytes(data, &msg); len(msg) == 0 {

View file

@ -87,9 +87,9 @@ func (s *Suite) TestSnapGetAccountRange(t *utesting.T) {
root: root,
startingHash: zero,
limitHash: ffHash,
expAccounts: 67,
expAccounts: 68,
expFirst: firstKey,
expLast: common.HexToHash("0x622e662246601dd04f996289ce8b85e86db7bb15bb17f86487ec9d543ddb6f9a"),
expLast: common.HexToHash("0x59312f89c13e9e24c1cb8b103aa39a9b2800348d97a92c2c9e2a78fa02b70025"),
desc: "In this test, we request the entire state range, but limit the response to 4000 bytes.",
},
{
@ -97,9 +97,9 @@ func (s *Suite) TestSnapGetAccountRange(t *utesting.T) {
root: root,
startingHash: zero,
limitHash: ffHash,
expAccounts: 49,
expAccounts: 50,
expFirst: firstKey,
expLast: common.HexToHash("0x445cb5c1278fdce2f9cbdb681bdd76c52f8e50e41dbd9e220242a69ba99ac099"),
expLast: common.HexToHash("0x4615e5f5df5b25349a00ad313c6cd0436b6c08ee5826e33a018661997f85ebaa"),
desc: "In this test, we request the entire state range, but limit the response to 3000 bytes.",
},
{
@ -107,9 +107,9 @@ func (s *Suite) TestSnapGetAccountRange(t *utesting.T) {
root: root,
startingHash: zero,
limitHash: ffHash,
expAccounts: 34,
expAccounts: 35,
expFirst: firstKey,
expLast: common.HexToHash("0x2ef46ebd2073cecde499c2e8df028ad79a26d57bfaa812c4c6f7eb4c9617b913"),
expLast: common.HexToHash("0x2de4bdbddcfbb9c3e195dae6b45f9c38daff897e926764bf34887fb0db5c3284"),
desc: "In this test, we request the entire state range, but limit the response to 2000 bytes.",
},
{
@ -178,9 +178,9 @@ The server should return the first available account.`,
root: root,
startingHash: firstKey,
limitHash: ffHash,
expAccounts: 67,
expAccounts: 68,
expFirst: firstKey,
expLast: common.HexToHash("0x622e662246601dd04f996289ce8b85e86db7bb15bb17f86487ec9d543ddb6f9a"),
expLast: common.HexToHash("0x59312f89c13e9e24c1cb8b103aa39a9b2800348d97a92c2c9e2a78fa02b70025"),
desc: `In this test, startingHash is exactly the first available account key.
The server should return the first available account of the state as the first item.`,
},
@ -189,9 +189,9 @@ The server should return the first available account of the state as the first i
root: root,
startingHash: hashAdd(firstKey, 1),
limitHash: ffHash,
expAccounts: 67,
expAccounts: 68,
expFirst: secondKey,
expLast: common.HexToHash("0x66192e4c757fba1cdc776e6737008f42d50370d3cd801db3624274283bf7cd63"),
expLast: common.HexToHash("0x59a7c8818f1c16b298a054020dc7c3f403a970d1d1db33f9478b1c36e3a2e509"),
desc: `In this test, startingHash is after the first available key.
The server should return the second account of the state as the first item.`,
},
@ -227,9 +227,9 @@ server to return no data because genesis is older than 127 blocks.`,
root: s.chain.RootAt(int(s.chain.Head().Number().Uint64()) - 127),
startingHash: zero,
limitHash: ffHash,
expAccounts: 66,
expAccounts: 68,
expFirst: firstKey,
expLast: common.HexToHash("0x729953a43ed6c913df957172680a17e5735143ad767bda8f58ac84ec62fbec5e"),
expLast: common.HexToHash("0x683b6c03cc32afe5db8cb96050f711fdaff8f8ff44c7587a9a848f921d02815e"),
desc: `This test requests data at a state root that is 127 blocks old.
We expect the server to have this state available.`,
},
@ -658,8 +658,8 @@ The server should reject the request.`,
// It's a bit unfortunate these are hard-coded, but the result depends on
// a lot of aspects of the state trie and can't be guessed in a simple
// way. So you'll have to update this when the test chain is changed.
common.HexToHash("0x5bdc0d6057b35642a16d27223ea5454e5a17a400e28f7328971a5f2a87773b76"),
common.HexToHash("0x0a76c9812ca90ffed8ee4d191e683f93386b6e50cfe3679c0760d27510aa7fc5"),
common.HexToHash("0x4bdecec09691ad38113eebee2df94fadefdff5841c0f182bae1be3c8a6d60bf3"),
common.HexToHash("0x4178696465d4514ff5924ef8c28ce64d41a669634b63184c2c093e252d6b4bc4"),
empty, empty, empty, empty, empty, empty, empty, empty, empty, empty, empty, empty,
empty, empty, empty, empty, empty, empty, empty, empty, empty, empty, empty, empty,
empty, empty, empty, empty, empty, empty, empty, empty, empty, empty, empty, empty,
@ -679,8 +679,8 @@ The server should reject the request.`,
// be updated when the test chain is changed.
expHashes: []common.Hash{
empty,
common.HexToHash("0x0a76c9812ca90ffed8ee4d191e683f93386b6e50cfe3679c0760d27510aa7fc5"),
common.HexToHash("0x5bdc0d6057b35642a16d27223ea5454e5a17a400e28f7328971a5f2a87773b76"),
common.HexToHash("0x4178696465d4514ff5924ef8c28ce64d41a669634b63184c2c093e252d6b4bc4"),
common.HexToHash("0x4bdecec09691ad38113eebee2df94fadefdff5841c0f182bae1be3c8a6d60bf3"),
},
},

View file

@ -35,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/holiman/uint256"
)
@ -83,6 +84,7 @@ func (s *Suite) EthTests() []utesting.Test {
// get history
{Name: "GetBlockBodies", Fn: s.TestGetBlockBodies},
{Name: "GetReceipts", Fn: s.TestGetReceipts},
{Name: "GetLargeReceipts", Fn: s.TestGetLargeReceipts},
// test transactions
{Name: "LargeTxRequest", Fn: s.TestLargeTxRequest, Slow: true},
{Name: "Transaction", Fn: s.TestTransaction},
@ -429,6 +431,9 @@ func (s *Suite) TestGetReceipts(t *utesting.T) {
// Find some blocks containing receipts.
var hashes = make([]common.Hash, 0, 3)
for i := range s.chain.Len() {
if s.chain.txInfo.LargeReceiptBlock != nil && uint64(i) == *s.chain.txInfo.LargeReceiptBlock {
continue
}
block := s.chain.GetBlock(i)
if len(block.Transactions()) > 0 {
hashes = append(hashes, block.Hash())
@ -437,25 +442,121 @@ func (s *Suite) TestGetReceipts(t *utesting.T) {
break
}
}
if conn.negotiatedProtoVersion < eth.ETH70 {
// Create block bodies request.
req := &eth.GetReceiptsPacket69{
RequestId: 66,
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
}
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
// Wait for response.
resp := new(eth.ReceiptsPacket69)
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
t.Fatalf("error reading block receipts msg: %v", err)
}
if got, want := resp.RequestId, req.RequestId; got != want {
t.Fatalf("unexpected request id in respond", got, want)
}
if resp.List.Len() != len(req.GetReceiptsRequest) {
t.Fatalf("wrong receipts in response: expected %d receipts, got %d", len(req.GetReceiptsRequest), resp.List.Len())
}
} else {
// Create block bodies request.
req := &eth.GetReceiptsPacket70{
RequestId: 66,
FirstBlockReceiptIndex: 0,
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
}
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
// Wait for response.
resp := new(eth.ReceiptsPacket70)
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
t.Fatalf("error reading block receipts msg: %v", err)
}
if got, want := resp.RequestId, req.RequestId; got != want {
t.Fatalf("unexpected request id in respond", got, want)
}
if resp.List.Len() != len(req.GetReceiptsRequest) {
t.Fatalf("wrong receipts in response: expected %d receipts, got %d", len(req.GetReceiptsRequest), resp.List.Len())
}
}
}
// Create receipts request.
req := &eth.GetReceiptsPacket{
RequestId: 66,
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
func (s *Suite) TestGetLargeReceipts(t *utesting.T) {
t.Log(`This test sends GetReceipts requests to the node for large receipt (>10MiB) in the test chain.
This test is meaningful only if the client supports protocol version ETH70 or higher
and LargeReceiptBlock is configured in txInfo.json.`)
conn, err := s.dialAndPeer(nil)
if err != nil {
t.Fatalf("peering failed: %v", err)
}
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
defer conn.Close()
if conn.negotiatedProtoVersion < eth.ETH70 || s.chain.txInfo.LargeReceiptBlock == nil {
return
}
// Wait for response.
resp := new(eth.ReceiptsPacket)
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
t.Fatalf("error reading block bodies msg: %v", err)
// Find block with large receipt.
// Place the large receipt block hash in the middle of the query
start := max(int(*s.chain.txInfo.LargeReceiptBlock)-2, 0)
end := min(*s.chain.txInfo.LargeReceiptBlock+2, uint64(len(s.chain.blocks)))
var blocks []common.Hash
var receiptHashes []common.Hash
var receipts []*eth.ReceiptList
for i := uint64(start); i < end; i++ {
block := s.chain.GetBlock(int(i))
blocks = append(blocks, block.Hash())
receiptHashes = append(receiptHashes, block.Header().ReceiptHash)
receipts = append(receipts, &eth.ReceiptList{})
}
if got, want := resp.RequestId, req.RequestId; got != want {
t.Fatalf("unexpected request id in respond", got, want)
incomplete := false
lastBlock := 0
for incomplete || lastBlock != len(blocks)-1 {
// Create get receipt request.
req := &eth.GetReceiptsPacket70{
RequestId: 66,
FirstBlockReceiptIndex: uint64(receipts[lastBlock].Derivable().Len()),
GetReceiptsRequest: blocks[lastBlock:],
}
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
// Wait for response.
resp := new(eth.ReceiptsPacket70)
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
t.Fatalf("error reading block receipts msg: %v", err)
}
if got, want := resp.RequestId, req.RequestId; got != want {
t.Fatalf("unexpected request id in respond, want: %d, got: %d", got, want)
}
receiptLists, _ := resp.List.Items()
for i, rc := range receiptLists {
receipts[lastBlock+i].Append(rc)
}
lastBlock += len(receiptLists) - 1
incomplete = resp.LastBlockIncomplete
}
if resp.List.Len() != len(req.GetReceiptsRequest) {
t.Fatalf("wrong receipts in response: expected %d receipts, got %d", len(req.GetReceiptsRequest), resp.List.Len())
hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(receipts))
for i := range receipts {
hashes[i] = types.DeriveSha(receipts[i].Derivable(), hasher)
}
for i, hash := range hashes {
if receiptHashes[i] != hash {
t.Fatalf("wrong receipt root: want %x, got %x", receiptHashes[i], hash)
}
}
}

Binary file not shown.

View file

@ -37,7 +37,7 @@
"nonce": "0x0",
"timestamp": "0x0",
"extraData": "0x68697665636861696e",
"gasLimit": "0x23f3e20",
"gasLimit": "0x11e1a300",
"difficulty": "0x20000",
"mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"coinbase": "0x0000000000000000000000000000000000000000",
@ -119,6 +119,10 @@
"balance": "0x1",
"nonce": "0x1"
},
"8dcd17433742f4c0ca53122ab541d0ba67fc27ff": {
"code": "0x6202e6306000a0",
"balance": "0x0"
},
"c7b99a164efd027a93f147376cc7da7c67c6bbe0": {
"balance": "0xc097ce7bc90715b34b9f1000000000"
},

View file

@ -1,24 +1,24 @@
{
"parentHash": "0x65151b101682b54cd08ba226f640c14c86176865ff9bfc57e0147dadaeac34bb",
"parentHash": "0x7e80093a491eba0e5b2c1895837902f64f514100221801318fe391e1e09c96a6",
"sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
"miner": "0x0000000000000000000000000000000000000000",
"stateRoot": "0xce423ebc60fc7764a43f09f1fe3ae61eef25e3eb8d09b1108f7e7eb77dfff5e6",
"transactionsRoot": "0x7ec1ae3989efa75d7bcc766e5e2443afa8a89a5fda42ebba90050e7e702980f7",
"receiptsRoot": "0xfe160832b1ca85f38c6674cb0aae3a24693bc49be56e2ecdf3698b71a794de86",
"stateRoot": "0x8fcfb02cfca007773bd55bc1c3e50a3c8612a59c87ce057e5957e8bf17c1728b",
"transactionsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"receiptsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
"difficulty": "0x0",
"number": "0x258",
"gasLimit": "0x23f3e20",
"gasUsed": "0x19d36",
"gasLimit": "0x11e1a300",
"gasUsed": "0x0",
"timestamp": "0x1770",
"extraData": "0x",
"mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"nonce": "0x0000000000000000",
"baseFeePerGas": "0x7",
"withdrawalsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
"withdrawalsRoot": "0x92abfda39de7df7d705c5a8f30386802ad59d31e782a06d5c5b0f9a260056cf0",
"blobGasUsed": "0x0",
"excessBlobGas": "0x0",
"parentBeaconBlockRoot": "0xf5003fc8f92358e790a114bce93ce1d9c283c85e1787f8d7d56714d3489b49e6",
"requestsHash": "0xe3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"hash": "0xce8d86ba17a2ec303155f0e264c58a4b8f94ce3436274cf1924f91acdb7502d0"
"hash": "0x44e3809c9a3cda717f00aea3a9da336d149612c8d5657fbc0028176ef8d94d2a"
}

View file

@ -4,9 +4,9 @@
"method": "engine_forkchoiceUpdatedV3",
"params": [
{
"headBlockHash": "0xce8d86ba17a2ec303155f0e264c58a4b8f94ce3436274cf1924f91acdb7502d0",
"safeBlockHash": "0xce8d86ba17a2ec303155f0e264c58a4b8f94ce3436274cf1924f91acdb7502d0",
"finalizedBlockHash": "0xce8d86ba17a2ec303155f0e264c58a4b8f94ce3436274cf1924f91acdb7502d0"
"headBlockHash": "0x44e3809c9a3cda717f00aea3a9da336d149612c8d5657fbc0028176ef8d94d2a",
"safeBlockHash": "0x44e3809c9a3cda717f00aea3a9da336d149612c8d5657fbc0028176ef8d94d2a",
"finalizedBlockHash": "0x44e3809c9a3cda717f00aea3a9da336d149612c8d5657fbc0028176ef8d94d2a"
},
null
]

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -259,8 +259,8 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
// RequestReceipts constructs a getReceipts method associated with a particular
// peer in the download tester. The returned function can be used to retrieve
// batches of block receipts from the particularly requested peer.
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) {
blobs := eth.ServiceGetReceiptsQuery(dlp.chain, hashes)
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, sink chan *eth.Response) (*eth.Request, error) {
blobs := eth.ServiceGetReceiptsQuery69(dlp.chain, hashes)
receipts := make([]types.Receipts, blobs.Len())
// compute hashes

View file

@ -78,11 +78,15 @@ func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh ch
if q.receiptFetchHook != nil {
q.receiptFetchHook(req.Headers)
}
hashes := make([]common.Hash, 0, len(req.Headers))
var (
gasUsed = make([]uint64, 0, len(req.Headers))
hashes = make([]common.Hash, 0, len(req.Headers))
)
for _, header := range req.Headers {
hashes = append(hashes, header.Hash())
gasUsed = append(gasUsed, header.GasUsed)
}
return peer.peer.RequestReceipts(hashes, resCh)
return peer.peer.RequestReceipts(hashes, gasUsed, resCh)
}
// deliver is responsible for taking a generic response packet from the concurrent

View file

@ -60,7 +60,7 @@ type Peer interface {
RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error)
RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error)
RequestReceipts([]common.Hash, chan *eth.Response) (*eth.Request, error)
RequestReceipts([]common.Hash, []uint64, chan *eth.Response) (*eth.Request, error)
}
// newPeerConnection creates a new downloader peer.

View file

@ -208,7 +208,7 @@ func (p *skeletonTestPeer) RequestBodies([]common.Hash, chan *eth.Response) (*et
panic("skeleton sync must not request block bodies")
}
func (p *skeletonTestPeer) RequestReceipts([]common.Hash, chan *eth.Response) (*eth.Request, error) {
func (p *skeletonTestPeer) RequestReceipts([]common.Hash, []uint64, chan *eth.Response) (*eth.Request, error) {
panic("skeleton sync must not request receipts")
}

View file

@ -214,7 +214,10 @@ loop:
continue loop
}
pending[req.id] = req
// do not overwrite if it is re-request
if _, ok := pending[req.id]; !ok {
pending[req.id] = req
}
reqOp.fail <- nil
case cancelOp := <-p.reqCancel:
@ -227,6 +230,13 @@ loop:
}
// Stop tracking the request
delete(pending, cancelOp.id)
// Not sure if the request is about the receipt, but remove it anyway.
// TODO(rjl493456442, bosul): investigate whether we can avoid leaking peer fields here.
p.receiptBufferLock.Lock()
delete(p.receiptBuffer, cancelOp.id)
p.receiptBufferLock.Unlock()
cancelOp.fail <- nil
case resOp := <-p.resDispatch:

View file

@ -35,6 +35,10 @@ const (
// softResponseLimit is the target maximum size of replies to data retrievals.
softResponseLimit = 2 * 1024 * 1024
// maxPacketSize is the devp2p message size limit commonly enforced by clients.
// Any packet exceeding this limit must be rejected.
maxPacketSize = 10 * 1024 * 1024
// maxHeadersServe is the maximum number of block headers to serve. This number
// is there to limit the number of disk lookups.
maxHeadersServe = 1024
@ -173,8 +177,22 @@ var eth69 = map[uint64]msgHandler{
BlockHeadersMsg: handleBlockHeaders,
GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies,
GetReceiptsMsg: handleGetReceipts,
ReceiptsMsg: handleReceipts,
GetReceiptsMsg: handleGetReceipts69,
ReceiptsMsg: handleReceipts69,
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
BlockRangeUpdateMsg: handleBlockRangeUpdate,
}
var eth70 = map[uint64]msgHandler{
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
GetBlockHeadersMsg: handleGetBlockHeaders,
BlockHeadersMsg: handleBlockHeaders,
GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies,
GetReceiptsMsg: handleGetReceipts70,
ReceiptsMsg: handleReceipts70,
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
BlockRangeUpdateMsg: handleBlockRangeUpdate,
@ -194,9 +212,12 @@ func handleMessage(backend Backend, peer *Peer) error {
defer msg.Discard()
var handlers map[uint64]msgHandler
if peer.version == ETH69 {
switch peer.version {
case ETH69:
handlers = eth69
} else {
case ETH70:
handlers = eth70
default:
return fmt.Errorf("unknown eth protocol version: %v", peer.version)
}

View file

@ -596,11 +596,11 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
}
// Send the hash request and verify the response
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket{
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket69{
RequestId: 123,
GetReceiptsRequest: hashes,
})
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket{
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket69{
RequestId: 123,
List: receipts,
}); err != nil {
@ -608,6 +608,103 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
}
}
func TestGetBlockPartialReceipts(t *testing.T) { testGetBlockPartialReceipts(t, ETH70) }
func testGetBlockPartialReceipts(t *testing.T, protocol int) {
// First, generate the chain and overwrite the receipts.
generator := func(_ int, block *core.BlockGen) {
for j := 0; j < 5; j++ {
tx, err := types.SignTx(
types.NewTransaction(block.TxNonce(testAddr), testAddr, big.NewInt(1000), params.TxGas, block.BaseFee(), nil),
types.LatestSignerForChainID(params.TestChainConfig.ChainID),
testKey,
)
if err != nil {
t.Fatalf("failed to sign tx: %v", err)
}
block.AddTx(tx)
}
}
backend := newTestBackendWithGenerator(4, true, false, generator)
defer backend.close()
blockCutoff := 2
receiptCutoff := 4
// Replace the receipts in the database with larger receipts.
targetBlock := backend.chain.GetBlockByNumber(uint64(blockCutoff))
receipts := backend.chain.GetReceiptsByHash(targetBlock.Hash())
receiptSize := params.MaxTxGas / params.LogDataGas // ~2MiB per receipt
for i := range receipts {
payload := make([]byte, receiptSize)
for j := range payload {
payload[j] = byte(i + j)
}
receipts[i].Logs = []*types.Log{
{
Address: common.BytesToAddress([]byte{byte(i + 1)}),
Data: payload,
},
}
}
rawdb.WriteReceipts(backend.db, targetBlock.Hash(), targetBlock.NumberU64(), receipts)
peer, _ := newTestPeer("peer", uint(protocol), backend)
defer peer.close()
var (
hashes []common.Hash
partialReceipt []*ReceiptList
)
for i := uint64(0); i <= backend.chain.CurrentBlock().Number.Uint64(); i++ {
block := backend.chain.GetBlockByNumber(i)
hashes = append(hashes, block.Hash())
}
for i := 0; i <= blockCutoff; i++ {
block := backend.chain.GetBlockByNumber(uint64(i))
trs := backend.chain.GetReceiptsByHash(block.Hash())
limit := len(trs)
if i == blockCutoff {
limit = receiptCutoff
}
partialReceipt = append(partialReceipt, NewReceiptList(trs[:limit]))
}
rawPartialReceipt, _ := rlp.EncodeToRawList(partialReceipt)
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket70{
RequestId: 123,
FirstBlockReceiptIndex: 0,
GetReceiptsRequest: hashes,
})
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket70{
RequestId: 123,
LastBlockIncomplete: true,
List: rawPartialReceipt,
}); err != nil {
t.Errorf("receipts mismatch: %v", err)
}
// Simulate the continued request
partialReceipt = []*ReceiptList{NewReceiptList(receipts[receiptCutoff:])}
rawPartialReceipt, _ = rlp.EncodeToRawList(partialReceipt)
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket70{
RequestId: 123,
FirstBlockReceiptIndex: uint64(receiptCutoff),
GetReceiptsRequest: []common.Hash{hashes[blockCutoff]},
})
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket70{
RequestId: 123,
LastBlockIncomplete: false,
List: rawPartialReceipt,
}); err != nil {
t.Errorf("receipts mismatch: %v", err)
}
}
type decoder struct {
msg []byte
}
@ -670,10 +767,10 @@ func setup() (*testBackend, *testPeer) {
}
func FuzzEthProtocolHandlers(f *testing.F) {
handlers := eth69
handlers := eth70
backend, peer := setup()
f.Fuzz(func(t *testing.T, code byte, msg []byte) {
handler := handlers[uint64(code)%protocolLengths[ETH69]]
handler := handlers[uint64(code)%protocolLengths[ETH70]]
if handler == nil {
return
}

View file

@ -246,20 +246,29 @@ func ServiceGetBlockBodiesQuery(chain *core.BlockChain, query GetBlockBodiesRequ
return bodies
}
func handleGetReceipts(backend Backend, msg Decoder, peer *Peer) error {
func handleGetReceipts69(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message
var query GetReceiptsPacket
var query GetReceiptsPacket69
if err := msg.Decode(&query); err != nil {
return err
}
response := ServiceGetReceiptsQuery(backend.Chain(), query.GetReceiptsRequest)
return peer.ReplyReceiptsRLP(query.RequestId, response)
response := ServiceGetReceiptsQuery69(backend.Chain(), query.GetReceiptsRequest)
return peer.ReplyReceiptsRLP69(query.RequestId, response)
}
// ServiceGetReceiptsQuery assembles the response to a receipt query.
func handleGetReceipts70(backend Backend, msg Decoder, peer *Peer) error {
var query GetReceiptsPacket70
if err := msg.Decode(&query); err != nil {
return err
}
response, lastBlockIncomplete := serviceGetReceiptsQuery70(backend.Chain(), query.GetReceiptsRequest, query.FirstBlockReceiptIndex)
return peer.ReplyReceiptsRLP70(query.RequestId, response, lastBlockIncomplete)
}
// ServiceGetReceiptsQuery69 assembles the response to a receipt query.
// It does not send the bloom filters for the receipts. It is exposed
// to allow external packages to test protocol behavior.
func ServiceGetReceiptsQuery(chain *core.BlockChain, query GetReceiptsRequest) rlp.RawList[*ReceiptList] {
func ServiceGetReceiptsQuery69(chain *core.BlockChain, query GetReceiptsRequest) rlp.RawList[*ReceiptList] {
// Gather state data until the fetch or network limits is reached
var (
bytes int
@ -294,6 +303,83 @@ func ServiceGetReceiptsQuery(chain *core.BlockChain, query GetReceiptsRequest) r
return receipts
}
// serviceGetReceiptsQuery70 assembles the response to a receipt query.
// If the receipts exceed 10 MiB, it trims them and sets the
// lastBlockIncomplete flag. Indices smaller than firstBlockReceiptIndex
// are omitted from the first block receipt list.
func serviceGetReceiptsQuery70(chain *core.BlockChain, query GetReceiptsRequest, firstBlockReceiptIndex uint64) ([]rlp.RawValue, bool) {
var (
bytes int
receipts []rlp.RawValue
lastBlockIncomplete bool
)
for i, hash := range query {
if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe {
break
}
results := chain.GetReceiptsRLP(hash)
if results == nil {
if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
}
} else {
body := chain.GetBodyRLP(hash)
if body == nil {
continue
}
var err error
results, err = blockReceiptsToNetwork(results, body)
if err != nil {
log.Error("Error in block receipts conversion", "hash", hash, "err", err)
continue
}
}
if firstBlockReceiptIndex > 0 && i == 0 {
results, lastBlockIncomplete = trimReceiptsRLP(results, int(firstBlockReceiptIndex), maxPacketSize)
} else if bytes+len(results) > maxPacketSize {
results, lastBlockIncomplete = trimReceiptsRLP(results, 0, maxPacketSize-bytes)
}
receipts = append(receipts, results)
bytes += len(results)
}
return receipts, lastBlockIncomplete
}
// trimReceiptsRLP trims raw value from `from` index until it exceeds limit
func trimReceiptsRLP(receiptsRLP rlp.RawValue, from int, limit int) (rlp.RawValue, bool) {
var (
out bytes.Buffer
buffer = rlp.NewEncoderBuffer(&out)
iter, _ = rlp.NewListIterator(receiptsRLP)
index int
bytes int
overflow bool
)
list := buffer.List()
for iter.Next() {
if index < from {
index++
continue
}
receipt := iter.Value()
if bytes+len(receipt) > limit {
overflow = true
break
}
buffer.Write(receipt)
bytes += len(receipt)
index++
}
buffer.ListEnd(list)
buffer.Flush()
return out.Bytes(), overflow
}
func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
// A batch of headers arrived to one of our previous requests
res := new(BlockHeadersPacket)
@ -435,9 +521,9 @@ func writeTxForHash(tx []byte, buf *bytes.Buffer) {
}
}
func handleReceipts(backend Backend, msg Decoder, peer *Peer) error {
func handleReceipts69(backend Backend, msg Decoder, peer *Peer) error {
// A batch of receipts arrived to one of our previous requests
res := new(ReceiptsPacket)
res := new(ReceiptsPacket69)
if err := msg.Decode(res); err != nil {
return err
}
@ -476,6 +562,57 @@ func handleReceipts(backend Backend, msg Decoder, peer *Peer) error {
}, metadata)
}
func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error {
res := new(ReceiptsPacket70)
if err := msg.Decode(res); err != nil {
return err
}
tresp := tracker.Response{ID: res.RequestId, MsgCode: ReceiptsMsg, Size: res.List.Len()}
if err := peer.tracker.Fulfil(tresp); err != nil {
return fmt.Errorf("Receipts: %w", err)
}
// Assign temporary hashing buffer to each list item, the same buffer is shared
// between all receipt list instances.
receiptLists, err := res.List.Items()
if err != nil {
return fmt.Errorf("Receipts: %w", err)
}
if err := peer.bufferReceipts(res.RequestId, receiptLists, res.LastBlockIncomplete, backend); err != nil {
return err
}
if res.LastBlockIncomplete {
return peer.requestPartialReceipts(res.RequestId)
}
if complete := peer.flushReceipts(res.RequestId); complete != nil {
receiptLists = complete
}
metadata := func() interface{} {
hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(receiptLists))
for i := range receiptLists {
hashes[i] = types.DeriveSha(receiptLists[i].Derivable(), hasher)
}
return hashes
}
var enc ReceiptsRLPResponse
for i := range receiptLists {
encReceipts, err := receiptLists[i].EncodeForStorage()
if err != nil {
return fmt.Errorf("Receipts: invalid list %d: %v", i, err)
}
enc = append(enc, encReceipts)
}
return peer.dispatchResponse(&Response{
id: res.RequestId,
code: ReceiptsMsg,
Res: &enc,
}, metadata)
}
func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them

View file

@ -17,7 +17,10 @@
package eth
import (
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
@ -26,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/tracker"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)
@ -43,6 +47,14 @@ const (
maxQueuedTxAnns = 4096
)
// receiptRequest tracks the state of an in-flight receipt retrieval operation.
type receiptRequest struct {
request []common.Hash // block hashes corresponding to the requested receipts
gasUsed []uint64 // block gas used corresponding to the requested receipts
list []*ReceiptList // list of partially collected receipts
lastLogSize uint64 // log size of last receipt list
}
// Peer is a collection of relevant information we have about a `eth` peer.
type Peer struct {
*p2p.Peer // The embedded P2P package peer
@ -63,6 +75,9 @@ type Peer struct {
reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them
resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them
receiptBuffer map[uint64]*receiptRequest // Previously requested receipts to buffer partial receipts
receiptBufferLock sync.RWMutex // Lock for protecting the receiptBuffer
term chan struct{} // Termination channel to stop the broadcasters
}
@ -72,19 +87,20 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
cap := p2p.Cap{Name: ProtocolName, Version: version}
id := p.ID().String()
peer := &Peer{
id: id,
Peer: p,
rw: rw,
version: version,
knownTxs: newKnownCache(maxKnownTxs),
txBroadcast: make(chan []common.Hash),
txAnnounce: make(chan []common.Hash),
tracker: tracker.New(cap, id, 5*time.Minute),
reqDispatch: make(chan *request),
reqCancel: make(chan *cancel),
resDispatch: make(chan *response),
txpool: txpool,
term: make(chan struct{}),
id: p.ID().String(),
Peer: p,
rw: rw,
version: version,
knownTxs: newKnownCache(maxKnownTxs),
txBroadcast: make(chan []common.Hash),
txAnnounce: make(chan []common.Hash),
tracker: tracker.New(cap, id, 5*time.Minute),
reqDispatch: make(chan *request),
reqCancel: make(chan *cancel),
resDispatch: make(chan *response),
txpool: txpool,
receiptBuffer: make(map[uint64]*receiptRequest),
term: make(chan struct{}),
}
// Start up all the broadcasters
go peer.broadcastTransactions()
@ -214,14 +230,23 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
})
}
// ReplyReceiptsRLP is the response to GetReceipts.
func (p *Peer) ReplyReceiptsRLP(id uint64, receipts rlp.RawList[*ReceiptList]) error {
return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsPacket{
// ReplyReceiptsRLP69 is the response to GetReceipts.
func (p *Peer) ReplyReceiptsRLP69(id uint64, receipts rlp.RawList[*ReceiptList]) error {
return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsPacket69{
RequestId: id,
List: receipts,
})
}
// ReplyReceiptsRLP70 is the response to GetReceipts.
func (p *Peer) ReplyReceiptsRLP70(id uint64, receipts []rlp.RawValue, lastBlockIncomplete bool) error {
return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsRLPPacket70{
RequestId: id,
ReceiptsRLPResponse: receipts,
LastBlockIncomplete: lastBlockIncomplete,
})
}
// RequestOneHeader is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *Peer) RequestOneHeader(hash common.Hash, sink chan *Response) (*Request, error) {
@ -330,20 +355,42 @@ func (p *Peer) RequestBodies(hashes []common.Hash, sink chan *Response) (*Reques
}
// RequestReceipts fetches a batch of transaction receipts from a remote node.
func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Request, error) {
func (p *Peer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, sink chan *Response) (*Request, error) {
p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
id := rand.Uint64()
req := &Request{
id: id,
sink: sink,
code: GetReceiptsMsg,
want: ReceiptsMsg,
numItems: len(hashes),
data: &GetReceiptsPacket{
RequestId: id,
GetReceiptsRequest: hashes,
},
var req *Request
if p.version > ETH69 {
req = &Request{
id: id,
sink: sink,
code: GetReceiptsMsg,
want: ReceiptsMsg,
numItems: len(hashes),
data: &GetReceiptsPacket70{
RequestId: id,
FirstBlockReceiptIndex: 0,
GetReceiptsRequest: hashes,
},
}
p.receiptBufferLock.Lock()
p.receiptBuffer[id] = &receiptRequest{
request: hashes,
gasUsed: gasUsed,
}
p.receiptBufferLock.Unlock()
} else {
req = &Request{
id: id,
sink: sink,
code: GetReceiptsMsg,
want: ReceiptsMsg,
numItems: len(hashes),
data: &GetReceiptsPacket69{
RequestId: id,
GetReceiptsRequest: hashes,
},
}
}
if err := p.dispatchRequest(req); err != nil {
return nil, err
@ -351,6 +398,158 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
return req, nil
}
// HandlePartialReceipts re-request partial receipts
func (p *Peer) requestPartialReceipts(id uint64) error {
p.receiptBufferLock.RLock()
defer p.receiptBufferLock.RUnlock()
// Do not re-request for the stale request
if _, ok := p.receiptBuffer[id]; !ok {
return nil
}
lastBlock := len(p.receiptBuffer[id].list) - 1
lastReceipt := p.receiptBuffer[id].list[lastBlock].items.Len()
hashes := p.receiptBuffer[id].request[lastBlock:]
req := &Request{
id: id,
sink: nil,
code: GetReceiptsMsg,
want: ReceiptsMsg,
data: &GetReceiptsPacket70{
RequestId: id,
FirstBlockReceiptIndex: uint64(lastReceipt),
GetReceiptsRequest: hashes,
},
numItems: len(hashes),
}
return p.dispatchRequest(req)
}
// bufferReceipts validates a receipt packet and buffer the incomplete packet.
// If the request is completed, it appends previously collected receipts.
func (p *Peer) bufferReceipts(requestId uint64, receiptLists []*ReceiptList, lastBlockIncomplete bool, backend Backend) error {
p.receiptBufferLock.Lock()
defer p.receiptBufferLock.Unlock()
buffer := p.receiptBuffer[requestId]
// Short circuit for the canceled response
if buffer == nil {
return nil
}
// If the response is empty, the peer likely does not have the requested receipts.
// Forward the empty response to the internal handler regardless. However, note
// that an empty response marked as incomplete is considered invalid.
if len(receiptLists) == 0 {
delete(p.receiptBuffer, requestId)
if lastBlockIncomplete {
return errors.New("invalid empty receipt response with incomplete flag")
}
return nil
}
// Buffer the last block when the response is incomplete.
if lastBlockIncomplete {
lastBlock := len(receiptLists) - 1
if len(buffer.list) > 0 {
lastBlock += len(buffer.list) - 1
}
gasUsed := buffer.gasUsed[lastBlock]
logSize, err := p.validateLastBlockReceipt(receiptLists, requestId, gasUsed)
if err != nil {
delete(p.receiptBuffer, requestId)
return err
}
// Update the buffered data and trim the packet to exclude the incomplete block.
if len(buffer.list) > 0 {
// If the buffer is already allocated, it means that the previous response
// was incomplete Append the first block receipts.
buffer.list[len(buffer.list)-1].Append(receiptLists[0])
buffer.list = append(buffer.list, receiptLists[1:]...)
buffer.lastLogSize = logSize
} else {
buffer.list = receiptLists
buffer.lastLogSize = logSize
}
return nil
}
// Short circuit if there is nothing cached previously.
if len(buffer.list) == 0 {
delete(p.receiptBuffer, requestId)
return nil
}
// Aggregate the cached result into the packet.
buffer.list[len(buffer.list)-1].Append(receiptLists[0])
buffer.list = append(buffer.list, receiptLists[1:]...)
return nil
}
// flushReceipts retrieves the merged receipt lists from the buffer
// and removes the buffer entry. Returns nil if no buffered data exists.
func (p *Peer) flushReceipts(requestId uint64) []*ReceiptList {
p.receiptBufferLock.Lock()
defer p.receiptBufferLock.Unlock()
buffer, ok := p.receiptBuffer[requestId]
if !ok {
return nil
}
delete(p.receiptBuffer, requestId)
return buffer.list
}
// validateLastBlockReceipt validates receipts and return log size of last block receipt.
// This function is called only when the `lastBlockincomplete == true`.
//
// Note that the last receipt response (which completes receiptLists of a pending block)
// is not verified here. Those response doesn't need hueristics below since they can be
// verified by its trie root.
func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList, id uint64, gasUsed uint64) (uint64, error) {
lastReceipts := receiptLists[len(receiptLists)-1]
// If the receipt is in the middle of retrieval, use the buffered data.
// e.g. [[receipt1], [receipt1, receipt2], incomplete = true]
// [[receipt3, receipt4], incomplete = true] <<--
// [[receipt5], [receipt1], incomplete = false]
// This case happens only if len(receiptLists) == 1 && incomplete == true && buffered before.
var previousTxs int
var previousLog uint64
var log uint64
if buffer, ok := p.receiptBuffer[id]; ok && len(buffer.list) > 0 && len(receiptLists) == 1 {
previousTxs = buffer.list[len(buffer.list)-1].items.Len()
previousLog = buffer.lastLogSize
}
// Verify that the total number of transactions delivered is under the limit.
if uint64(previousTxs+lastReceipts.items.Len()) > gasUsed/21_000 {
// should be dropped, don't clear the buffer
return 0, fmt.Errorf("total number of tx exceeded limit")
}
// Count log size per receipt
it := lastReceipts.items.ContentIterator()
for it.Next() {
content, _, err := rlp.SplitList(it.Value())
if err != nil {
return 0, fmt.Errorf("invalid receipt structure: %v", err)
}
rest := content
for range 3 {
_, _, rest, err = rlp.Split(rest)
if err != nil {
return 0, fmt.Errorf("invalid receipt structure: %v", err)
}
}
log += uint64(len(rest))
}
// Verify that the overall downloaded receipt size does not exceed the block gas limit.
if previousLog+log > gasUsed/params.LogDataGas {
return 0, fmt.Errorf("total download receipt size exceeded the limit")
}
return previousLog + log, nil
}
// RequestTxs fetches a batch of transactions from a remote node.
func (p *Peer) RequestTxs(hashes []common.Hash) error {
p.Log().Trace("Fetching batch of transactions", "count", len(hashes))

View file

@ -21,11 +21,18 @@ package eth
import (
"crypto/rand"
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/tracker"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)
// testPeer is a simulated peer to allow testing direct network calls.
@ -88,3 +95,343 @@ func TestPeerSet(t *testing.T) {
t.Fatalf("bad size")
}
}
func TestPartialReceipt(t *testing.T) {
gen := func(_ int, g *core.BlockGen) {
signer := types.HomesteadSigner{}
for range 4 {
tx, _ := types.SignTx(types.NewTransaction(g.TxNonce(testAddr), testAddr, big.NewInt(10), params.TxGas, g.BaseFee(), nil), signer, testKey)
g.AddTx(tx)
}
}
backend := newTestBackendWithGenerator(4, true, true, gen)
defer backend.close()
app, net := p2p.MsgPipe()
var id enode.ID
if _, err := rand.Read(id[:]); err != nil {
t.Fatalf("failed to create random peer: %v", err)
}
peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil)
packetCh := make(chan *GetReceiptsPacket70, 1)
go func() {
for {
msg, err := app.ReadMsg()
if err != nil {
return
}
if msg.Code == GetReceiptsMsg {
var pkt GetReceiptsPacket70
if err := msg.Decode(&pkt); err == nil {
select {
case packetCh <- &pkt:
default:
}
}
}
msg.Discard()
}
}()
hashes := []common.Hash{
backend.chain.GetBlockByNumber(1).Hash(),
backend.chain.GetBlockByNumber(2).Hash(),
backend.chain.GetBlockByNumber(3).Hash(),
backend.chain.GetBlockByNumber(4).Hash(),
}
gasUsed := []uint64{
backend.chain.GetBlockByNumber(1).GasUsed(),
backend.chain.GetBlockByNumber(2).GasUsed(),
backend.chain.GetBlockByNumber(3).GasUsed(),
backend.chain.GetBlockByNumber(4).GasUsed(),
}
sink := make(chan *Response, 1)
req, err := peer.RequestReceipts(hashes, gasUsed, sink)
if err != nil {
t.Fatalf("RequestReceipts failed: %v", err)
}
select {
case _ = <-packetCh:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for request packet")
}
receipts := []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
}
logReceipts := []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
}
delivery := &ReceiptsPacket70{
RequestId: req.id,
LastBlockIncomplete: true,
List: encodeRL([]*ReceiptList{
{
items: encodeRL(receipts),
},
{
items: encodeRL(receipts),
},
}),
}
tresp := tracker.Response{ID: delivery.RequestId, MsgCode: ReceiptsMsg, Size: delivery.List.Len()}
if err := peer.tracker.Fulfil(tresp); err != nil {
t.Fatalf("Tracker failed: %v", err)
}
receiptList, _ := delivery.List.Items()
if err := peer.bufferReceipts(delivery.RequestId, receiptList, delivery.LastBlockIncomplete, backend); err != nil {
t.Fatalf("first bufferReceipts failed: %v", err)
}
if err := peer.requestPartialReceipts(req.id); err != nil {
t.Fatalf("requestPartialReceipts failed: %v", err)
}
var rereq *GetReceiptsPacket70
select {
case rereq = <-packetCh:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for re-request packet")
}
buffer, ok := peer.receiptBuffer[rereq.RequestId]
if !ok {
t.Fatalf("receiptBuffer should buffer incomplete receipts")
}
if rereq.FirstBlockReceiptIndex != uint64(buffer.list[len(buffer.list)-1].items.Len()) {
t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, buffer.list[len(buffer.list)-1].items.Len())
}
delivery = &ReceiptsPacket70{
RequestId: req.id,
LastBlockIncomplete: true,
List: encodeRL([]*ReceiptList{
{
items: encodeRL(receipts),
},
}),
}
tresp = tracker.Response{ID: delivery.RequestId, MsgCode: ReceiptsMsg, Size: delivery.List.Len()}
if err := peer.tracker.Fulfil(tresp); err != nil {
t.Fatalf("Tracker failed: %v", err)
}
receiptLists, _ := delivery.List.Items()
if err := peer.bufferReceipts(delivery.RequestId, receiptLists, delivery.LastBlockIncomplete, backend); err != nil {
t.Fatalf("second bufferReceipts failed: %v", err)
}
if err := peer.requestPartialReceipts(req.id); err != nil {
t.Fatalf("requestPartialReceipts failed: %v", err)
}
select {
case rereq = <-packetCh:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for re-request packet")
}
buffer, ok = peer.receiptBuffer[rereq.RequestId]
if !ok {
t.Fatalf("receiptBuffer should buffer incomplete receipts")
}
if rereq.FirstBlockReceiptIndex != uint64(buffer.list[len(buffer.list)-1].items.Len()) {
t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, buffer.list[len(buffer.list)-1].items.Len())
}
if len(rereq.GetReceiptsRequest) != 3 {
t.Fatalf("wrong partial request range, got %d want %d", len(rereq.GetReceiptsRequest), 3)
}
delivery = &ReceiptsPacket70{
RequestId: rereq.RequestId,
LastBlockIncomplete: false,
List: encodeRL([]*ReceiptList{
{
items: encodeRL(receipts),
},
{
items: encodeRL(receipts),
},
{
items: encodeRL(receipts),
},
}),
}
tresp = tracker.Response{ID: delivery.RequestId, MsgCode: ReceiptsMsg, Size: delivery.List.Len()}
if err := peer.tracker.Fulfil(tresp); err != nil {
t.Fatalf("Tracker failed: %v", err)
}
receiptList, _ = delivery.List.Items()
if err := peer.bufferReceipts(delivery.RequestId, receiptList, delivery.LastBlockIncomplete, backend); err != nil {
t.Fatalf("third bufferReceipts failed: %v", err)
}
merged := peer.flushReceipts(rereq.RequestId)
if merged == nil {
t.Fatalf("flushReceipts should return merged receipt lists")
}
if _, ok := peer.receiptBuffer[rereq.RequestId]; ok {
t.Fatalf("receiptBuffer should be cleared after flush")
}
for i, list := range merged {
if i == 1 {
if list.items.Len() != len(logReceipts) {
t.Fatalf("wrong response buffering, got %d want %d", list.items.Len(), len(logReceipts))
}
} else {
if list.items.Len() != len(receipts) {
t.Fatalf("wrong response buffering, got %d want %d", list.items.Len(), len(receipts))
}
}
}
}
func TestPartialReceiptFailure(t *testing.T) {
gen := func(_ int, g *core.BlockGen) {
signer := types.HomesteadSigner{}
for range 4 {
tx, _ := types.SignTx(types.NewTransaction(g.TxNonce(testAddr), testAddr, big.NewInt(10), params.TxGas, g.BaseFee(), nil), signer, testKey)
g.AddTx(tx)
}
}
backend := newTestBackendWithGenerator(4, true, true, gen)
defer backend.close()
app, net := p2p.MsgPipe()
var id enode.ID
if _, err := rand.Read(id[:]); err != nil {
t.Fatalf("failed to create random peer: %v", err)
}
peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil)
packetCh := make(chan *GetReceiptsPacket70, 1)
go func() {
for {
msg, err := app.ReadMsg()
if err != nil {
return
}
if msg.Code == GetReceiptsMsg {
var pkt GetReceiptsPacket70
if err := msg.Decode(&pkt); err == nil {
select {
case packetCh <- &pkt:
default:
}
}
}
msg.Discard()
}
}()
// If a peer delivers response which is never requested, the tracker should reject it.
delivery := &ReceiptsPacket70{
RequestId: 66,
LastBlockIncomplete: true,
List: encodeRL([]*ReceiptList{
{
items: encodeRL([]Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
}),
},
{
items: encodeRL([]Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 2))},
}),
},
}),
}
tresp := tracker.Response{ID: delivery.RequestId, MsgCode: ReceiptsMsg, Size: delivery.List.Len()}
if err := peer.tracker.Fulfil(tresp); err == nil {
t.Fatal("Unknown response should be rejected by tracker")
}
// If a peer deliverse excessive amount of receipts, it should also fail the validation
hashes := []common.Hash{
backend.chain.GetBlockByNumber(1).Hash(),
backend.chain.GetBlockByNumber(2).Hash(),
backend.chain.GetBlockByNumber(3).Hash(),
backend.chain.GetBlockByNumber(4).Hash(),
}
gasUsed := []uint64{
backend.chain.GetBlockByNumber(1).GasUsed(),
backend.chain.GetBlockByNumber(2).GasUsed(),
backend.chain.GetBlockByNumber(3).GasUsed(),
backend.chain.GetBlockByNumber(4).GasUsed(),
}
// Case 1 ) The number of receipts exceeds maximum tx count
sink := make(chan *Response, 1)
req, err := peer.RequestReceipts(hashes, gasUsed, sink)
if err != nil {
t.Fatalf("RequestReceipts failed: %v", err)
}
select {
case _ = <-packetCh:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for request packet")
}
maxTxCount := backend.chain.GetBlockByNumber(1).GasUsed() / 21_000
excessiveReceipts := []Receipt{{Logs: rlp.RawValue(make([]byte, 1))}}
for range maxTxCount {
excessiveReceipts = append(excessiveReceipts, Receipt{Logs: rlp.RawValue(make([]byte, 1))})
}
delivery = &ReceiptsPacket70{
RequestId: req.id,
LastBlockIncomplete: true,
List: encodeRL([]*ReceiptList{{
items: encodeRL(excessiveReceipts),
}}),
}
tresp = tracker.Response{ID: delivery.RequestId, MsgCode: ReceiptsMsg, Size: delivery.List.Len()}
if err = peer.tracker.Fulfil(tresp); err != nil {
t.Fatalf("tracker.Fulfil failed: %v", err)
}
receiptList, _ := delivery.List.Items()
err = peer.bufferReceipts(delivery.RequestId, receiptList, delivery.LastBlockIncomplete, backend)
if err == nil {
t.Fatal("Response with the excessive number of receipts should fail the validation")
}
// Case 2 ) Total receipt size exceeds the block gas limit
req, err = peer.RequestReceipts(hashes, gasUsed, sink)
if err != nil {
t.Fatalf("RequestReceipts failed: %v", err)
}
select {
case _ = <-packetCh:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for request packet")
}
maxReceiptSize := backend.chain.GetBlockByNumber(1).GasUsed() / params.LogDataGas
delivery = &ReceiptsPacket70{
RequestId: req.id,
LastBlockIncomplete: true,
List: encodeRL([]*ReceiptList{{
items: encodeRL([]Receipt{
{Logs: rlp.RawValue(make([]byte, maxReceiptSize+1))},
}),
}}),
}
tresp = tracker.Response{ID: delivery.RequestId, MsgCode: ReceiptsMsg, Size: delivery.List.Len()}
if err = peer.tracker.Fulfil(tresp); err != nil {
t.Fatalf("tracker.Fulfil failed: %v", err)
}
receiptList, _ = delivery.List.Items()
err = peer.bufferReceipts(delivery.RequestId, receiptList, delivery.LastBlockIncomplete, backend)
if err == nil {
t.Fatal("Response with the large log size should fail the validation")
}
}

View file

@ -30,6 +30,7 @@ import (
// Constants to match up protocol versions and messages
const (
ETH69 = 69
ETH70 = 70
)
// ProtocolName is the official short name of the `eth` protocol used during
@ -38,11 +39,11 @@ const ProtocolName = "eth"
// ProtocolVersions are the supported versions of the `eth` protocol (first
// is primary).
var ProtocolVersions = []uint{ETH69}
var ProtocolVersions = []uint{ETH70, ETH69}
// protocolLengths are the number of implemented message corresponding to
// different protocol versions.
var protocolLengths = map[uint]uint64{ETH69: 18}
var protocolLengths = map[uint]uint64{ETH69: 18, ETH70: 18}
// maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 10 * 1024 * 1024
@ -211,25 +212,47 @@ type BlockBody struct {
// GetReceiptsRequest represents a block receipts query.
type GetReceiptsRequest []common.Hash
// GetReceiptsPacket represents a block receipts query with request ID wrapping.
type GetReceiptsPacket struct {
// GetReceiptsPacket69 represents a block receipts query with request ID wrapping.
type GetReceiptsPacket69 struct {
RequestId uint64
GetReceiptsRequest
}
// GetReceiptsPacket70 represents a block receipts query with request ID and
// FirstBlockReceiptIndex wrapping.
type GetReceiptsPacket70 struct {
RequestId uint64
FirstBlockReceiptIndex uint64
GetReceiptsRequest
}
// ReceiptsResponse is the network packet for block receipts distribution.
type ReceiptsResponse []types.Receipts
// ReceiptsPacket is the network packet for block receipts distribution with
// ReceiptsPacket69 is the network packet for block receipts distribution with
// request ID wrapping.
type ReceiptsPacket struct {
type ReceiptsPacket69 struct {
RequestId uint64
List rlp.RawList[*ReceiptList]
}
type ReceiptsPacket70 struct {
RequestId uint64
LastBlockIncomplete bool
List rlp.RawList[*ReceiptList]
}
// ReceiptsRLPResponse is used for receipts, when we already have it encoded
type ReceiptsRLPResponse []rlp.RawValue
// ReceiptsRLPPacket70 is ReceiptsRLPResponse with request ID and
// LastBlockIncomplete wrapping.
type ReceiptsRLPPacket70 struct {
RequestId uint64
LastBlockIncomplete bool
ReceiptsRLPResponse
}
// NewPooledTransactionHashesPacket represents a transaction announcement packet on eth/68 and newer.
type NewPooledTransactionHashesPacket struct {
Types []byte

View file

@ -82,11 +82,10 @@ func TestEmptyMessages(t *testing.T) {
GetBlockBodiesPacket{1111, nil},
BlockBodiesRLPPacket{1111, nil},
// Receipts
GetReceiptsPacket{1111, nil},
GetReceiptsPacket69{1111, nil},
// Transactions
GetPooledTransactionsPacket{1111, nil},
PooledTransactionsRLPPacket{1111, nil},
// Headers
BlockHeadersPacket{1111, encodeRL([]*types.Header{})},
// Bodies
@ -94,8 +93,8 @@ func TestEmptyMessages(t *testing.T) {
BlockBodiesPacket{1111, encodeRL([]BlockBody{})},
BlockBodiesRLPPacket{1111, BlockBodiesRLPResponse([]rlp.RawValue{})},
// Receipts
GetReceiptsPacket{1111, GetReceiptsRequest([]common.Hash{})},
ReceiptsPacket{1111, encodeRL([]*ReceiptList{})},
GetReceiptsPacket69{1111, GetReceiptsRequest([]common.Hash{})},
ReceiptsPacket69{1111, encodeRL([]*ReceiptList{})},
// Transactions
GetPooledTransactionsPacket{1111, GetPooledTransactionsRequest([]common.Hash{})},
PooledTransactionsPacket{1111, encodeRL([]*types.Transaction{})},
@ -220,11 +219,11 @@ func TestMessages(t *testing.T) {
common.FromHex("f902dc820457f902d6f902d3f8d2f867088504a817c8088302e2489435353535353535353535353535353535353535358202008025a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10f867098504a817c809830334509435353535353535353535353535353535353535358202d98025a052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afba052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afbf901fcf901f9a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000940000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008208ae820d0582115c8215b3821a0a827788a00000000000000000000000000000000000000000000000000000000000000000880000000000000000"),
},
{
GetReceiptsPacket{1111, GetReceiptsRequest(hashes)},
GetReceiptsPacket69{1111, GetReceiptsRequest(hashes)},
common.FromHex("f847820457f842a000000000000000000000000000000000000000000000000000000000deadc0dea000000000000000000000000000000000000000000000000000000000feedbeef"),
},
{
ReceiptsPacket{1111, encodeRL([]*ReceiptList{NewReceiptList(receipts)})},
ReceiptsPacket69{1111, encodeRL([]*ReceiptList{NewReceiptList(receipts)})},
common.FromHex("f8da820457f8d5f8d3f866808082014df85ff85d940000000000000000000000000000000000000011f842a0000000000000000000000000000000000000000000000000000000000000deada0000000000000000000000000000000000000000000000000000000000000beef830100fff86901018201bcf862f860940000000000000000000000000000000000000022f842a00000000000000000000000000000000000000000000000000000000000005668a0000000000000000000000000000000000000000000000000000000000000977386020f0f0f0608"),
},
{

View file

@ -202,6 +202,11 @@ func (rl *ReceiptList) Derivable() types.DerivableList {
})
}
// Append appends all items from another ReceiptList to this list.
func (rl *ReceiptList) Append(other *ReceiptList) {
rl.items.AppendList(&other.items)
}
// blockReceiptsToNetwork takes a slice of rlp-encoded receipts, and transactions,
// and re-encodes them for the network protocol.
func blockReceiptsToNetwork(blockReceipts, blockBody rlp.RawValue) ([]byte, error) {

View file

@ -168,6 +168,18 @@ func (r *RawList[T]) AppendRaw(b []byte) error {
return nil
}
// AppendList appends all items from another RawList to this list.
func (r *RawList[T]) AppendList(other *RawList[T]) {
if other.enc == nil || other.length == 0 {
return
}
if r.enc == nil {
r.enc = make([]byte, 9)
}
r.enc = append(r.enc, other.Content()...)
r.length += other.length
}
// StringSize returns the encoded size of a string.
func StringSize(s string) uint64 {
switch n := len(s); n {

View file

@ -247,6 +247,55 @@ func TestRawListAppendRaw(t *testing.T) {
}
}
func TestRawListAppendList(t *testing.T) {
var rl1 RawList[uint64]
if err := rl1.Append(uint64(1)); err != nil {
t.Fatal("append 1 failed:", err)
}
if err := rl1.Append(uint64(2)); err != nil {
t.Fatal("append 2 failed:", err)
}
var rl2 RawList[uint64]
if err := rl2.Append(uint64(3)); err != nil {
t.Fatal("append 3 failed:", err)
}
if err := rl2.Append(uint64(4)); err != nil {
t.Fatal("append 4 failed:", err)
}
rl1.AppendList(&rl2)
if rl1.Len() != 4 {
t.Fatalf("wrong Len %d, want 4", rl1.Len())
}
if rl1.Size() != 5 {
t.Fatalf("wrong Size %d, want 5", rl1.Size())
}
items, err := rl1.Items()
if err != nil {
t.Fatal("Items failed:", err)
}
if !reflect.DeepEqual(items, []uint64{1, 2, 3, 4}) {
t.Fatalf("wrong items: %v", items)
}
var empty RawList[uint64]
prevLen := rl1.Len()
rl1.AppendList(&empty)
if rl1.Len() != prevLen {
t.Fatalf("appending empty list changed Len: got %d, want %d", rl1.Len(), prevLen)
}
empty.AppendList(&rl1)
if empty.Len() != 4 {
t.Fatalf("wrong Len %d, want 4", empty.Len())
}
}
func TestRawListDecodeInvalid(t *testing.T) {
tests := []struct {
input string

View file

@ -9,5 +9,5 @@ import (
type Test struct {
A eth1.MinerAPI
B eth2.GetReceiptsPacket
B eth2.GetReceiptsPacket69
}

View file

@ -41,7 +41,7 @@ func (obj *Test) DecodeRLP(dec *rlp.Stream) error {
}
_tmp0.A = _tmp1
// B:
var _tmp2 eth1.GetReceiptsPacket
var _tmp2 eth1.GetReceiptsPacket69
{
if _, err := dec.List(); err != nil {
return err