mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-03-31 15:15:56 +00:00
eth: implement EIP-7975 (eth/70 - partial block receipt lists) (#33153)
In this PR, we add support for protocol version eth/70, defined by EIP-7975. Overall changes: - Each response is buffered in the peer’s receipt buffer when the `lastBlockIncomplete` field is true. - Continued request uses the same request id of its original request(`RequestPartialReceipts`). - Partial responses are verified in `validateLastBlockReceipt`. - Even if all receipts for partial blocks of the request are collected, those partial results are not sinked to the downloader, to avoid complexity. This assumes that partial response and buffering occur only in exceptional cases. --------- Co-authored-by: Gary Rong <garyrong0905@gmail.com> Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
parent
fe47c39903
commit
965bd6b6a0
31 changed files with 9535 additions and 8859 deletions
|
|
@ -51,6 +51,12 @@ type Chain struct {
|
|||
state map[common.Address]state.DumpAccount // state of head block
|
||||
senders map[common.Address]*senderInfo
|
||||
config *params.ChainConfig
|
||||
|
||||
txInfo txInfo
|
||||
}
|
||||
|
||||
type txInfo struct {
|
||||
LargeReceiptBlock *uint64 `json:"tx-largereceipt"`
|
||||
}
|
||||
|
||||
// NewChain takes the given chain.rlp file, and decodes and returns
|
||||
|
|
@ -74,12 +80,20 @@ func NewChain(dir string) (*Chain, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var txInfo txInfo
|
||||
err = common.LoadJSON(filepath.Join(dir, "txinfo.json"), &txInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Chain{
|
||||
genesis: gen,
|
||||
blocks: blocks,
|
||||
state: state,
|
||||
senders: accounts,
|
||||
config: gen.Config,
|
||||
txInfo: txInfo,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -66,9 +66,10 @@ func (s *Suite) dialAs(key *ecdsa.PrivateKey) (*Conn, error) {
|
|||
return nil, err
|
||||
}
|
||||
conn.caps = []p2p.Cap{
|
||||
{Name: "eth", Version: 70},
|
||||
{Name: "eth", Version: 69},
|
||||
}
|
||||
conn.ourHighestProtoVersion = 69
|
||||
conn.ourHighestProtoVersion = 70
|
||||
return &conn, nil
|
||||
}
|
||||
|
||||
|
|
@ -335,10 +336,12 @@ loop:
|
|||
if have, want := msg.ForkID, chain.ForkID(); !reflect.DeepEqual(have, want) {
|
||||
return fmt.Errorf("wrong fork ID in status: have %v, want %v", have, want)
|
||||
}
|
||||
if have, want := msg.ProtocolVersion, c.ourHighestProtoVersion; have != uint32(want) {
|
||||
return fmt.Errorf("wrong protocol version: have %v, want %v", have, want)
|
||||
for _, cap := range c.caps {
|
||||
if cap.Name == "eth" && cap.Version == uint(msg.ProtocolVersion) {
|
||||
break loop
|
||||
}
|
||||
}
|
||||
break loop
|
||||
return fmt.Errorf("wrong protocol version: have %v, want %v", msg.ProtocolVersion, c.caps)
|
||||
case discMsg:
|
||||
var msg []p2p.DiscReason
|
||||
if rlp.DecodeBytes(data, &msg); len(msg) == 0 {
|
||||
|
|
|
|||
|
|
@ -87,9 +87,9 @@ func (s *Suite) TestSnapGetAccountRange(t *utesting.T) {
|
|||
root: root,
|
||||
startingHash: zero,
|
||||
limitHash: ffHash,
|
||||
expAccounts: 67,
|
||||
expAccounts: 68,
|
||||
expFirst: firstKey,
|
||||
expLast: common.HexToHash("0x622e662246601dd04f996289ce8b85e86db7bb15bb17f86487ec9d543ddb6f9a"),
|
||||
expLast: common.HexToHash("0x59312f89c13e9e24c1cb8b103aa39a9b2800348d97a92c2c9e2a78fa02b70025"),
|
||||
desc: "In this test, we request the entire state range, but limit the response to 4000 bytes.",
|
||||
},
|
||||
{
|
||||
|
|
@ -97,9 +97,9 @@ func (s *Suite) TestSnapGetAccountRange(t *utesting.T) {
|
|||
root: root,
|
||||
startingHash: zero,
|
||||
limitHash: ffHash,
|
||||
expAccounts: 49,
|
||||
expAccounts: 50,
|
||||
expFirst: firstKey,
|
||||
expLast: common.HexToHash("0x445cb5c1278fdce2f9cbdb681bdd76c52f8e50e41dbd9e220242a69ba99ac099"),
|
||||
expLast: common.HexToHash("0x4615e5f5df5b25349a00ad313c6cd0436b6c08ee5826e33a018661997f85ebaa"),
|
||||
desc: "In this test, we request the entire state range, but limit the response to 3000 bytes.",
|
||||
},
|
||||
{
|
||||
|
|
@ -107,9 +107,9 @@ func (s *Suite) TestSnapGetAccountRange(t *utesting.T) {
|
|||
root: root,
|
||||
startingHash: zero,
|
||||
limitHash: ffHash,
|
||||
expAccounts: 34,
|
||||
expAccounts: 35,
|
||||
expFirst: firstKey,
|
||||
expLast: common.HexToHash("0x2ef46ebd2073cecde499c2e8df028ad79a26d57bfaa812c4c6f7eb4c9617b913"),
|
||||
expLast: common.HexToHash("0x2de4bdbddcfbb9c3e195dae6b45f9c38daff897e926764bf34887fb0db5c3284"),
|
||||
desc: "In this test, we request the entire state range, but limit the response to 2000 bytes.",
|
||||
},
|
||||
{
|
||||
|
|
@ -178,9 +178,9 @@ The server should return the first available account.`,
|
|||
root: root,
|
||||
startingHash: firstKey,
|
||||
limitHash: ffHash,
|
||||
expAccounts: 67,
|
||||
expAccounts: 68,
|
||||
expFirst: firstKey,
|
||||
expLast: common.HexToHash("0x622e662246601dd04f996289ce8b85e86db7bb15bb17f86487ec9d543ddb6f9a"),
|
||||
expLast: common.HexToHash("0x59312f89c13e9e24c1cb8b103aa39a9b2800348d97a92c2c9e2a78fa02b70025"),
|
||||
desc: `In this test, startingHash is exactly the first available account key.
|
||||
The server should return the first available account of the state as the first item.`,
|
||||
},
|
||||
|
|
@ -189,9 +189,9 @@ The server should return the first available account of the state as the first i
|
|||
root: root,
|
||||
startingHash: hashAdd(firstKey, 1),
|
||||
limitHash: ffHash,
|
||||
expAccounts: 67,
|
||||
expAccounts: 68,
|
||||
expFirst: secondKey,
|
||||
expLast: common.HexToHash("0x66192e4c757fba1cdc776e6737008f42d50370d3cd801db3624274283bf7cd63"),
|
||||
expLast: common.HexToHash("0x59a7c8818f1c16b298a054020dc7c3f403a970d1d1db33f9478b1c36e3a2e509"),
|
||||
desc: `In this test, startingHash is after the first available key.
|
||||
The server should return the second account of the state as the first item.`,
|
||||
},
|
||||
|
|
@ -227,9 +227,9 @@ server to return no data because genesis is older than 127 blocks.`,
|
|||
root: s.chain.RootAt(int(s.chain.Head().Number().Uint64()) - 127),
|
||||
startingHash: zero,
|
||||
limitHash: ffHash,
|
||||
expAccounts: 66,
|
||||
expAccounts: 68,
|
||||
expFirst: firstKey,
|
||||
expLast: common.HexToHash("0x729953a43ed6c913df957172680a17e5735143ad767bda8f58ac84ec62fbec5e"),
|
||||
expLast: common.HexToHash("0x683b6c03cc32afe5db8cb96050f711fdaff8f8ff44c7587a9a848f921d02815e"),
|
||||
desc: `This test requests data at a state root that is 127 blocks old.
|
||||
We expect the server to have this state available.`,
|
||||
},
|
||||
|
|
@ -658,8 +658,8 @@ The server should reject the request.`,
|
|||
// It's a bit unfortunate these are hard-coded, but the result depends on
|
||||
// a lot of aspects of the state trie and can't be guessed in a simple
|
||||
// way. So you'll have to update this when the test chain is changed.
|
||||
common.HexToHash("0x5bdc0d6057b35642a16d27223ea5454e5a17a400e28f7328971a5f2a87773b76"),
|
||||
common.HexToHash("0x0a76c9812ca90ffed8ee4d191e683f93386b6e50cfe3679c0760d27510aa7fc5"),
|
||||
common.HexToHash("0x4bdecec09691ad38113eebee2df94fadefdff5841c0f182bae1be3c8a6d60bf3"),
|
||||
common.HexToHash("0x4178696465d4514ff5924ef8c28ce64d41a669634b63184c2c093e252d6b4bc4"),
|
||||
empty, empty, empty, empty, empty, empty, empty, empty, empty, empty, empty, empty,
|
||||
empty, empty, empty, empty, empty, empty, empty, empty, empty, empty, empty, empty,
|
||||
empty, empty, empty, empty, empty, empty, empty, empty, empty, empty, empty, empty,
|
||||
|
|
@ -679,8 +679,8 @@ The server should reject the request.`,
|
|||
// be updated when the test chain is changed.
|
||||
expHashes: []common.Hash{
|
||||
empty,
|
||||
common.HexToHash("0x0a76c9812ca90ffed8ee4d191e683f93386b6e50cfe3679c0760d27510aa7fc5"),
|
||||
common.HexToHash("0x5bdc0d6057b35642a16d27223ea5454e5a17a400e28f7328971a5f2a87773b76"),
|
||||
common.HexToHash("0x4178696465d4514ff5924ef8c28ce64d41a669634b63184c2c093e252d6b4bc4"),
|
||||
common.HexToHash("0x4bdecec09691ad38113eebee2df94fadefdff5841c0f182bae1be3c8a6d60bf3"),
|
||||
},
|
||||
},
|
||||
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
"github.com/holiman/uint256"
|
||||
)
|
||||
|
||||
|
|
@ -83,6 +84,7 @@ func (s *Suite) EthTests() []utesting.Test {
|
|||
// get history
|
||||
{Name: "GetBlockBodies", Fn: s.TestGetBlockBodies},
|
||||
{Name: "GetReceipts", Fn: s.TestGetReceipts},
|
||||
{Name: "GetLargeReceipts", Fn: s.TestGetLargeReceipts},
|
||||
// test transactions
|
||||
{Name: "LargeTxRequest", Fn: s.TestLargeTxRequest, Slow: true},
|
||||
{Name: "Transaction", Fn: s.TestTransaction},
|
||||
|
|
@ -429,6 +431,9 @@ func (s *Suite) TestGetReceipts(t *utesting.T) {
|
|||
// Find some blocks containing receipts.
|
||||
var hashes = make([]common.Hash, 0, 3)
|
||||
for i := range s.chain.Len() {
|
||||
if s.chain.txInfo.LargeReceiptBlock != nil && uint64(i) == *s.chain.txInfo.LargeReceiptBlock {
|
||||
continue
|
||||
}
|
||||
block := s.chain.GetBlock(i)
|
||||
if len(block.Transactions()) > 0 {
|
||||
hashes = append(hashes, block.Hash())
|
||||
|
|
@ -437,25 +442,121 @@ func (s *Suite) TestGetReceipts(t *utesting.T) {
|
|||
break
|
||||
}
|
||||
}
|
||||
if conn.negotiatedProtoVersion < eth.ETH70 {
|
||||
// Create block bodies request.
|
||||
req := ð.GetReceiptsPacket69{
|
||||
RequestId: 66,
|
||||
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
|
||||
}
|
||||
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
|
||||
t.Fatalf("could not write to connection: %v", err)
|
||||
}
|
||||
// Wait for response.
|
||||
resp := new(eth.ReceiptsPacket69)
|
||||
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
|
||||
t.Fatalf("error reading block receipts msg: %v", err)
|
||||
}
|
||||
if got, want := resp.RequestId, req.RequestId; got != want {
|
||||
t.Fatalf("unexpected request id in respond", got, want)
|
||||
}
|
||||
if resp.List.Len() != len(req.GetReceiptsRequest) {
|
||||
t.Fatalf("wrong receipts in response: expected %d receipts, got %d", len(req.GetReceiptsRequest), resp.List.Len())
|
||||
}
|
||||
} else {
|
||||
// Create block bodies request.
|
||||
req := ð.GetReceiptsPacket70{
|
||||
RequestId: 66,
|
||||
FirstBlockReceiptIndex: 0,
|
||||
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
|
||||
}
|
||||
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
|
||||
t.Fatalf("could not write to connection: %v", err)
|
||||
}
|
||||
// Wait for response.
|
||||
resp := new(eth.ReceiptsPacket70)
|
||||
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
|
||||
t.Fatalf("error reading block receipts msg: %v", err)
|
||||
}
|
||||
if got, want := resp.RequestId, req.RequestId; got != want {
|
||||
t.Fatalf("unexpected request id in respond", got, want)
|
||||
}
|
||||
if resp.List.Len() != len(req.GetReceiptsRequest) {
|
||||
t.Fatalf("wrong receipts in response: expected %d receipts, got %d", len(req.GetReceiptsRequest), resp.List.Len())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create receipts request.
|
||||
req := ð.GetReceiptsPacket{
|
||||
RequestId: 66,
|
||||
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
|
||||
func (s *Suite) TestGetLargeReceipts(t *utesting.T) {
|
||||
t.Log(`This test sends GetReceipts requests to the node for large receipt (>10MiB) in the test chain.
|
||||
This test is meaningful only if the client supports protocol version ETH70 or higher
|
||||
and LargeReceiptBlock is configured in txInfo.json.`)
|
||||
conn, err := s.dialAndPeer(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("peering failed: %v", err)
|
||||
}
|
||||
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
|
||||
t.Fatalf("could not write to connection: %v", err)
|
||||
defer conn.Close()
|
||||
|
||||
if conn.negotiatedProtoVersion < eth.ETH70 || s.chain.txInfo.LargeReceiptBlock == nil {
|
||||
return
|
||||
}
|
||||
// Wait for response.
|
||||
resp := new(eth.ReceiptsPacket)
|
||||
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
|
||||
t.Fatalf("error reading block bodies msg: %v", err)
|
||||
|
||||
// Find block with large receipt.
|
||||
// Place the large receipt block hash in the middle of the query
|
||||
start := max(int(*s.chain.txInfo.LargeReceiptBlock)-2, 0)
|
||||
end := min(*s.chain.txInfo.LargeReceiptBlock+2, uint64(len(s.chain.blocks)))
|
||||
|
||||
var blocks []common.Hash
|
||||
var receiptHashes []common.Hash
|
||||
var receipts []*eth.ReceiptList
|
||||
|
||||
for i := uint64(start); i < end; i++ {
|
||||
block := s.chain.GetBlock(int(i))
|
||||
blocks = append(blocks, block.Hash())
|
||||
receiptHashes = append(receiptHashes, block.Header().ReceiptHash)
|
||||
receipts = append(receipts, ð.ReceiptList{})
|
||||
}
|
||||
if got, want := resp.RequestId, req.RequestId; got != want {
|
||||
t.Fatalf("unexpected request id in respond", got, want)
|
||||
|
||||
incomplete := false
|
||||
lastBlock := 0
|
||||
|
||||
for incomplete || lastBlock != len(blocks)-1 {
|
||||
// Create get receipt request.
|
||||
req := ð.GetReceiptsPacket70{
|
||||
RequestId: 66,
|
||||
FirstBlockReceiptIndex: uint64(receipts[lastBlock].Derivable().Len()),
|
||||
GetReceiptsRequest: blocks[lastBlock:],
|
||||
}
|
||||
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
|
||||
t.Fatalf("could not write to connection: %v", err)
|
||||
}
|
||||
// Wait for response.
|
||||
resp := new(eth.ReceiptsPacket70)
|
||||
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
|
||||
t.Fatalf("error reading block receipts msg: %v", err)
|
||||
}
|
||||
if got, want := resp.RequestId, req.RequestId; got != want {
|
||||
t.Fatalf("unexpected request id in respond, want: %d, got: %d", got, want)
|
||||
}
|
||||
|
||||
receiptLists, _ := resp.List.Items()
|
||||
for i, rc := range receiptLists {
|
||||
receipts[lastBlock+i].Append(rc)
|
||||
}
|
||||
lastBlock += len(receiptLists) - 1
|
||||
|
||||
incomplete = resp.LastBlockIncomplete
|
||||
}
|
||||
if resp.List.Len() != len(req.GetReceiptsRequest) {
|
||||
t.Fatalf("wrong receipts in response: expected %d receipts, got %d", len(req.GetReceiptsRequest), resp.List.Len())
|
||||
|
||||
hasher := trie.NewStackTrie(nil)
|
||||
hashes := make([]common.Hash, len(receipts))
|
||||
for i := range receipts {
|
||||
hashes[i] = types.DeriveSha(receipts[i].Derivable(), hasher)
|
||||
}
|
||||
|
||||
for i, hash := range hashes {
|
||||
if receiptHashes[i] != hash {
|
||||
t.Fatalf("wrong receipt root: want %x, got %x", receiptHashes[i], hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
BIN
cmd/devp2p/internal/ethtest/testdata/chain.rlp
vendored
BIN
cmd/devp2p/internal/ethtest/testdata/chain.rlp
vendored
Binary file not shown.
|
|
@ -37,7 +37,7 @@
|
|||
"nonce": "0x0",
|
||||
"timestamp": "0x0",
|
||||
"extraData": "0x68697665636861696e",
|
||||
"gasLimit": "0x23f3e20",
|
||||
"gasLimit": "0x11e1a300",
|
||||
"difficulty": "0x20000",
|
||||
"mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
|
||||
"coinbase": "0x0000000000000000000000000000000000000000",
|
||||
|
|
@ -119,6 +119,10 @@
|
|||
"balance": "0x1",
|
||||
"nonce": "0x1"
|
||||
},
|
||||
"8dcd17433742f4c0ca53122ab541d0ba67fc27ff": {
|
||||
"code": "0x6202e6306000a0",
|
||||
"balance": "0x0"
|
||||
},
|
||||
"c7b99a164efd027a93f147376cc7da7c67c6bbe0": {
|
||||
"balance": "0xc097ce7bc90715b34b9f1000000000"
|
||||
},
|
||||
|
|
|
|||
|
|
@ -1,24 +1,24 @@
|
|||
{
|
||||
"parentHash": "0x65151b101682b54cd08ba226f640c14c86176865ff9bfc57e0147dadaeac34bb",
|
||||
"parentHash": "0x7e80093a491eba0e5b2c1895837902f64f514100221801318fe391e1e09c96a6",
|
||||
"sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",
|
||||
"miner": "0x0000000000000000000000000000000000000000",
|
||||
"stateRoot": "0xce423ebc60fc7764a43f09f1fe3ae61eef25e3eb8d09b1108f7e7eb77dfff5e6",
|
||||
"transactionsRoot": "0x7ec1ae3989efa75d7bcc766e5e2443afa8a89a5fda42ebba90050e7e702980f7",
|
||||
"receiptsRoot": "0xfe160832b1ca85f38c6674cb0aae3a24693bc49be56e2ecdf3698b71a794de86",
|
||||
"stateRoot": "0x8fcfb02cfca007773bd55bc1c3e50a3c8612a59c87ce057e5957e8bf17c1728b",
|
||||
"transactionsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
|
||||
"receiptsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
|
||||
"logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
|
||||
"difficulty": "0x0",
|
||||
"number": "0x258",
|
||||
"gasLimit": "0x23f3e20",
|
||||
"gasUsed": "0x19d36",
|
||||
"gasLimit": "0x11e1a300",
|
||||
"gasUsed": "0x0",
|
||||
"timestamp": "0x1770",
|
||||
"extraData": "0x",
|
||||
"mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
|
||||
"nonce": "0x0000000000000000",
|
||||
"baseFeePerGas": "0x7",
|
||||
"withdrawalsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421",
|
||||
"withdrawalsRoot": "0x92abfda39de7df7d705c5a8f30386802ad59d31e782a06d5c5b0f9a260056cf0",
|
||||
"blobGasUsed": "0x0",
|
||||
"excessBlobGas": "0x0",
|
||||
"parentBeaconBlockRoot": "0xf5003fc8f92358e790a114bce93ce1d9c283c85e1787f8d7d56714d3489b49e6",
|
||||
"requestsHash": "0xe3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
|
||||
"hash": "0xce8d86ba17a2ec303155f0e264c58a4b8f94ce3436274cf1924f91acdb7502d0"
|
||||
"hash": "0x44e3809c9a3cda717f00aea3a9da336d149612c8d5657fbc0028176ef8d94d2a"
|
||||
}
|
||||
|
|
@ -4,9 +4,9 @@
|
|||
"method": "engine_forkchoiceUpdatedV3",
|
||||
"params": [
|
||||
{
|
||||
"headBlockHash": "0xce8d86ba17a2ec303155f0e264c58a4b8f94ce3436274cf1924f91acdb7502d0",
|
||||
"safeBlockHash": "0xce8d86ba17a2ec303155f0e264c58a4b8f94ce3436274cf1924f91acdb7502d0",
|
||||
"finalizedBlockHash": "0xce8d86ba17a2ec303155f0e264c58a4b8f94ce3436274cf1924f91acdb7502d0"
|
||||
"headBlockHash": "0x44e3809c9a3cda717f00aea3a9da336d149612c8d5657fbc0028176ef8d94d2a",
|
||||
"safeBlockHash": "0x44e3809c9a3cda717f00aea3a9da336d149612c8d5657fbc0028176ef8d94d2a",
|
||||
"finalizedBlockHash": "0x44e3809c9a3cda717f00aea3a9da336d149612c8d5657fbc0028176ef8d94d2a"
|
||||
},
|
||||
null
|
||||
]
|
||||
|
|
|
|||
4210
cmd/devp2p/internal/ethtest/testdata/headstate.json
vendored
4210
cmd/devp2p/internal/ethtest/testdata/headstate.json
vendored
File diff suppressed because it is too large
Load diff
10313
cmd/devp2p/internal/ethtest/testdata/newpayload.json
vendored
10313
cmd/devp2p/internal/ethtest/testdata/newpayload.json
vendored
File diff suppressed because it is too large
Load diff
2943
cmd/devp2p/internal/ethtest/testdata/txinfo.json
vendored
2943
cmd/devp2p/internal/ethtest/testdata/txinfo.json
vendored
File diff suppressed because it is too large
Load diff
|
|
@ -259,8 +259,8 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
|
|||
// RequestReceipts constructs a getReceipts method associated with a particular
|
||||
// peer in the download tester. The returned function can be used to retrieve
|
||||
// batches of block receipts from the particularly requested peer.
|
||||
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) {
|
||||
blobs := eth.ServiceGetReceiptsQuery(dlp.chain, hashes)
|
||||
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, timestamps []uint64, sink chan *eth.Response) (*eth.Request, error) {
|
||||
blobs := eth.ServiceGetReceiptsQuery69(dlp.chain, hashes)
|
||||
receipts := make([]types.Receipts, blobs.Len())
|
||||
|
||||
// compute hashes
|
||||
|
|
|
|||
|
|
@ -78,11 +78,17 @@ func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh ch
|
|||
if q.receiptFetchHook != nil {
|
||||
q.receiptFetchHook(req.Headers)
|
||||
}
|
||||
hashes := make([]common.Hash, 0, len(req.Headers))
|
||||
var (
|
||||
gasUsed = make([]uint64, 0, len(req.Headers))
|
||||
timestamps = make([]uint64, 0, len(req.Headers))
|
||||
hashes = make([]common.Hash, 0, len(req.Headers))
|
||||
)
|
||||
for _, header := range req.Headers {
|
||||
hashes = append(hashes, header.Hash())
|
||||
gasUsed = append(gasUsed, header.GasUsed)
|
||||
timestamps = append(timestamps, header.Time)
|
||||
}
|
||||
return peer.peer.RequestReceipts(hashes, resCh)
|
||||
return peer.peer.RequestReceipts(hashes, gasUsed, timestamps, resCh)
|
||||
}
|
||||
|
||||
// deliver is responsible for taking a generic response packet from the concurrent
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ type Peer interface {
|
|||
RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error)
|
||||
|
||||
RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error)
|
||||
RequestReceipts([]common.Hash, chan *eth.Response) (*eth.Request, error)
|
||||
RequestReceipts([]common.Hash, []uint64, []uint64, chan *eth.Response) (*eth.Request, error)
|
||||
}
|
||||
|
||||
// newPeerConnection creates a new downloader peer.
|
||||
|
|
|
|||
|
|
@ -208,7 +208,7 @@ func (p *skeletonTestPeer) RequestBodies([]common.Hash, chan *eth.Response) (*et
|
|||
panic("skeleton sync must not request block bodies")
|
||||
}
|
||||
|
||||
func (p *skeletonTestPeer) RequestReceipts([]common.Hash, chan *eth.Response) (*eth.Request, error) {
|
||||
func (p *skeletonTestPeer) RequestReceipts([]common.Hash, []uint64, []uint64, chan *eth.Response) (*eth.Request, error) {
|
||||
panic("skeleton sync must not request receipts")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -137,8 +137,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {
|
|||
defer p2pNoFork.Close()
|
||||
defer p2pProFork.Close()
|
||||
|
||||
peerNoFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil)
|
||||
peerProFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil)
|
||||
peerNoFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil, nil)
|
||||
peerProFork := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil, nil)
|
||||
defer peerNoFork.Close()
|
||||
defer peerProFork.Close()
|
||||
|
||||
|
|
@ -168,8 +168,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {
|
|||
defer p2pNoFork.Close()
|
||||
defer p2pProFork.Close()
|
||||
|
||||
peerNoFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil)
|
||||
peerProFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil)
|
||||
peerNoFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil, nil)
|
||||
peerProFork = eth.NewPeer(protocol, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil, nil)
|
||||
defer peerNoFork.Close()
|
||||
defer peerProFork.Close()
|
||||
|
||||
|
|
@ -199,8 +199,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {
|
|||
defer p2pNoFork.Close()
|
||||
defer p2pProFork.Close()
|
||||
|
||||
peerNoFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil)
|
||||
peerProFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil)
|
||||
peerNoFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pNoFork), p2pNoFork, nil, nil)
|
||||
peerProFork = eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pProFork), p2pProFork, nil, nil)
|
||||
defer peerNoFork.Close()
|
||||
defer peerProFork.Close()
|
||||
|
||||
|
|
@ -249,8 +249,8 @@ func testRecvTransactions(t *testing.T, protocol uint) {
|
|||
defer p2pSrc.Close()
|
||||
defer p2pSink.Close()
|
||||
|
||||
src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool)
|
||||
sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool)
|
||||
src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool, nil)
|
||||
sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool, nil)
|
||||
defer src.Close()
|
||||
defer sink.Close()
|
||||
|
||||
|
|
@ -305,8 +305,8 @@ func testSendTransactions(t *testing.T, protocol uint) {
|
|||
defer p2pSrc.Close()
|
||||
defer p2pSink.Close()
|
||||
|
||||
src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool)
|
||||
sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool)
|
||||
src := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pSrc), p2pSrc, handler.txpool, nil)
|
||||
sink := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pSink), p2pSink, handler.txpool, nil)
|
||||
defer src.Close()
|
||||
defer sink.Close()
|
||||
|
||||
|
|
@ -380,8 +380,8 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
|
|||
defer sourcePipe.Close()
|
||||
defer sinkPipe.Close()
|
||||
|
||||
sourcePeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{byte(i + 1)}, "", nil, sourcePipe), sourcePipe, source.txpool)
|
||||
sinkPeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, sink.txpool)
|
||||
sourcePeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{byte(i + 1)}, "", nil, sourcePipe), sourcePipe, source.txpool, nil)
|
||||
sinkPeer := eth.NewPeer(protocol, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, sink.txpool, nil)
|
||||
defer sourcePeer.Close()
|
||||
defer sinkPeer.Close()
|
||||
|
||||
|
|
|
|||
|
|
@ -317,7 +317,7 @@ func createTestPeers(rand *rand.Rand, n int) []*ethPeer {
|
|||
var id enode.ID
|
||||
rand.Read(id[:])
|
||||
p2pPeer := p2p.NewPeer(id, "test", nil)
|
||||
ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil)
|
||||
ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil, nil)
|
||||
peers[i] = ðPeer{Peer: ep}
|
||||
}
|
||||
return peers
|
||||
|
|
|
|||
|
|
@ -214,7 +214,10 @@ loop:
|
|||
continue loop
|
||||
}
|
||||
|
||||
pending[req.id] = req
|
||||
// do not overwrite if it is re-request
|
||||
if _, ok := pending[req.id]; !ok {
|
||||
pending[req.id] = req
|
||||
}
|
||||
reqOp.fail <- nil
|
||||
|
||||
case cancelOp := <-p.reqCancel:
|
||||
|
|
@ -227,6 +230,13 @@ loop:
|
|||
}
|
||||
// Stop tracking the request
|
||||
delete(pending, cancelOp.id)
|
||||
|
||||
// Not sure if the request is about the receipt, but remove it anyway.
|
||||
// TODO(rjl493456442, bosul): investigate whether we can avoid leaking peer fields here.
|
||||
p.receiptBufferLock.Lock()
|
||||
delete(p.receiptBuffer, cancelOp.id)
|
||||
p.receiptBufferLock.Unlock()
|
||||
|
||||
cancelOp.fail <- nil
|
||||
|
||||
case resOp := <-p.resDispatch:
|
||||
|
|
|
|||
|
|
@ -35,6 +35,10 @@ const (
|
|||
// softResponseLimit is the target maximum size of replies to data retrievals.
|
||||
softResponseLimit = 2 * 1024 * 1024
|
||||
|
||||
// maxPacketSize is the devp2p message size limit commonly enforced by clients.
|
||||
// Any packet exceeding this limit must be rejected.
|
||||
maxPacketSize = 10 * 1024 * 1024
|
||||
|
||||
// maxHeadersServe is the maximum number of block headers to serve. This number
|
||||
// is there to limit the number of disk lookups.
|
||||
maxHeadersServe = 1024
|
||||
|
|
@ -106,7 +110,7 @@ func MakeProtocols(backend Backend, network uint64, disc enode.Iterator) []p2p.P
|
|||
Version: version,
|
||||
Length: protocolLengths[version],
|
||||
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
|
||||
peer := NewPeer(version, p, rw, backend.TxPool())
|
||||
peer := NewPeer(version, p, rw, backend.TxPool(), backend.Chain().Config())
|
||||
defer peer.Close()
|
||||
|
||||
return backend.RunPeer(peer, func(peer *Peer) error {
|
||||
|
|
@ -173,8 +177,22 @@ var eth69 = map[uint64]msgHandler{
|
|||
BlockHeadersMsg: handleBlockHeaders,
|
||||
GetBlockBodiesMsg: handleGetBlockBodies,
|
||||
BlockBodiesMsg: handleBlockBodies,
|
||||
GetReceiptsMsg: handleGetReceipts,
|
||||
ReceiptsMsg: handleReceipts,
|
||||
GetReceiptsMsg: handleGetReceipts69,
|
||||
ReceiptsMsg: handleReceipts69,
|
||||
GetPooledTransactionsMsg: handleGetPooledTransactions,
|
||||
PooledTransactionsMsg: handlePooledTransactions,
|
||||
BlockRangeUpdateMsg: handleBlockRangeUpdate,
|
||||
}
|
||||
|
||||
var eth70 = map[uint64]msgHandler{
|
||||
TransactionsMsg: handleTransactions,
|
||||
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
|
||||
GetBlockHeadersMsg: handleGetBlockHeaders,
|
||||
BlockHeadersMsg: handleBlockHeaders,
|
||||
GetBlockBodiesMsg: handleGetBlockBodies,
|
||||
BlockBodiesMsg: handleBlockBodies,
|
||||
GetReceiptsMsg: handleGetReceipts70,
|
||||
ReceiptsMsg: handleReceipts70,
|
||||
GetPooledTransactionsMsg: handleGetPooledTransactions,
|
||||
PooledTransactionsMsg: handlePooledTransactions,
|
||||
BlockRangeUpdateMsg: handleBlockRangeUpdate,
|
||||
|
|
@ -194,9 +212,12 @@ func handleMessage(backend Backend, peer *Peer) error {
|
|||
defer msg.Discard()
|
||||
|
||||
var handlers map[uint64]msgHandler
|
||||
if peer.version == ETH69 {
|
||||
switch peer.version {
|
||||
case ETH69:
|
||||
handlers = eth69
|
||||
} else {
|
||||
case ETH70:
|
||||
handlers = eth70
|
||||
default:
|
||||
return fmt.Errorf("unknown eth protocol version: %v", peer.version)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -596,11 +596,11 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
|
|||
}
|
||||
|
||||
// Send the hash request and verify the response
|
||||
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket{
|
||||
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket69{
|
||||
RequestId: 123,
|
||||
GetReceiptsRequest: hashes,
|
||||
})
|
||||
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket{
|
||||
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket69{
|
||||
RequestId: 123,
|
||||
List: receipts,
|
||||
}); err != nil {
|
||||
|
|
@ -608,6 +608,103 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestGetBlockPartialReceipts(t *testing.T) { testGetBlockPartialReceipts(t, ETH70) }
|
||||
|
||||
func testGetBlockPartialReceipts(t *testing.T, protocol int) {
|
||||
// First, generate the chain and overwrite the receipts.
|
||||
generator := func(_ int, block *core.BlockGen) {
|
||||
for j := 0; j < 5; j++ {
|
||||
tx, err := types.SignTx(
|
||||
types.NewTransaction(block.TxNonce(testAddr), testAddr, big.NewInt(1000), params.TxGas, block.BaseFee(), nil),
|
||||
types.LatestSignerForChainID(params.TestChainConfig.ChainID),
|
||||
testKey,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to sign tx: %v", err)
|
||||
}
|
||||
block.AddTx(tx)
|
||||
}
|
||||
}
|
||||
backend := newTestBackendWithGenerator(4, true, false, generator)
|
||||
defer backend.close()
|
||||
|
||||
blockCutoff := 2
|
||||
receiptCutoff := 4
|
||||
|
||||
// Replace the receipts in the database with larger receipts.
|
||||
targetBlock := backend.chain.GetBlockByNumber(uint64(blockCutoff))
|
||||
receipts := backend.chain.GetReceiptsByHash(targetBlock.Hash())
|
||||
receiptSize := params.MaxTxGas / params.LogDataGas // ~2MiB per receipt
|
||||
for i := range receipts {
|
||||
payload := make([]byte, receiptSize)
|
||||
for j := range payload {
|
||||
payload[j] = byte(i + j)
|
||||
}
|
||||
receipts[i].Logs = []*types.Log{
|
||||
{
|
||||
Address: common.BytesToAddress([]byte{byte(i + 1)}),
|
||||
Data: payload,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
rawdb.WriteReceipts(backend.db, targetBlock.Hash(), targetBlock.NumberU64(), receipts)
|
||||
|
||||
peer, _ := newTestPeer("peer", uint(protocol), backend)
|
||||
defer peer.close()
|
||||
|
||||
var (
|
||||
hashes []common.Hash
|
||||
partialReceipt []*ReceiptList
|
||||
)
|
||||
for i := uint64(0); i <= backend.chain.CurrentBlock().Number.Uint64(); i++ {
|
||||
block := backend.chain.GetBlockByNumber(i)
|
||||
hashes = append(hashes, block.Hash())
|
||||
}
|
||||
for i := 0; i <= blockCutoff; i++ {
|
||||
block := backend.chain.GetBlockByNumber(uint64(i))
|
||||
trs := backend.chain.GetReceiptsByHash(block.Hash())
|
||||
limit := len(trs)
|
||||
if i == blockCutoff {
|
||||
limit = receiptCutoff
|
||||
}
|
||||
partialReceipt = append(partialReceipt, NewReceiptList(trs[:limit]))
|
||||
}
|
||||
|
||||
rawPartialReceipt, _ := rlp.EncodeToRawList(partialReceipt)
|
||||
|
||||
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket70{
|
||||
RequestId: 123,
|
||||
FirstBlockReceiptIndex: 0,
|
||||
GetReceiptsRequest: hashes,
|
||||
})
|
||||
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket70{
|
||||
RequestId: 123,
|
||||
LastBlockIncomplete: true,
|
||||
List: rawPartialReceipt,
|
||||
}); err != nil {
|
||||
t.Errorf("receipts mismatch: %v", err)
|
||||
}
|
||||
|
||||
// Simulate the continued request
|
||||
partialReceipt = []*ReceiptList{NewReceiptList(receipts[receiptCutoff:])}
|
||||
rawPartialReceipt, _ = rlp.EncodeToRawList(partialReceipt)
|
||||
|
||||
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket70{
|
||||
RequestId: 123,
|
||||
FirstBlockReceiptIndex: uint64(receiptCutoff),
|
||||
GetReceiptsRequest: []common.Hash{hashes[blockCutoff]},
|
||||
})
|
||||
|
||||
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket70{
|
||||
RequestId: 123,
|
||||
LastBlockIncomplete: false,
|
||||
List: rawPartialReceipt,
|
||||
}); err != nil {
|
||||
t.Errorf("receipts mismatch: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
type decoder struct {
|
||||
msg []byte
|
||||
}
|
||||
|
|
@ -670,10 +767,10 @@ func setup() (*testBackend, *testPeer) {
|
|||
}
|
||||
|
||||
func FuzzEthProtocolHandlers(f *testing.F) {
|
||||
handlers := eth69
|
||||
handlers := eth70
|
||||
backend, peer := setup()
|
||||
f.Fuzz(func(t *testing.T, code byte, msg []byte) {
|
||||
handler := handlers[uint64(code)%protocolLengths[ETH69]]
|
||||
handler := handlers[uint64(code)%protocolLengths[ETH70]]
|
||||
if handler == nil {
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -246,47 +246,51 @@ func ServiceGetBlockBodiesQuery(chain *core.BlockChain, query GetBlockBodiesRequ
|
|||
return bodies
|
||||
}
|
||||
|
||||
func handleGetReceipts(backend Backend, msg Decoder, peer *Peer) error {
|
||||
func handleGetReceipts69(backend Backend, msg Decoder, peer *Peer) error {
|
||||
// Decode the block receipts retrieval message
|
||||
var query GetReceiptsPacket
|
||||
var query GetReceiptsPacket69
|
||||
if err := msg.Decode(&query); err != nil {
|
||||
return err
|
||||
}
|
||||
response := ServiceGetReceiptsQuery(backend.Chain(), query.GetReceiptsRequest)
|
||||
return peer.ReplyReceiptsRLP(query.RequestId, response)
|
||||
response := ServiceGetReceiptsQuery69(backend.Chain(), query.GetReceiptsRequest)
|
||||
return peer.ReplyReceiptsRLP69(query.RequestId, response)
|
||||
}
|
||||
|
||||
// ServiceGetReceiptsQuery assembles the response to a receipt query.
|
||||
func handleGetReceipts70(backend Backend, msg Decoder, peer *Peer) error {
|
||||
var query GetReceiptsPacket70
|
||||
if err := msg.Decode(&query); err != nil {
|
||||
return err
|
||||
}
|
||||
response, lastBlockIncomplete := serviceGetReceiptsQuery70(backend.Chain(), query.GetReceiptsRequest, query.FirstBlockReceiptIndex)
|
||||
return peer.ReplyReceiptsRLP70(query.RequestId, response, lastBlockIncomplete)
|
||||
}
|
||||
|
||||
// ServiceGetReceiptsQuery69 assembles the response to a receipt query.
|
||||
// It does not send the bloom filters for the receipts. It is exposed
|
||||
// to allow external packages to test protocol behavior.
|
||||
func ServiceGetReceiptsQuery(chain *core.BlockChain, query GetReceiptsRequest) rlp.RawList[*ReceiptList] {
|
||||
// Gather state data until the fetch or network limits is reached
|
||||
func ServiceGetReceiptsQuery69(chain *core.BlockChain, query GetReceiptsRequest) rlp.RawList[*ReceiptList] {
|
||||
var (
|
||||
bytes int
|
||||
receipts rlp.RawList[*ReceiptList]
|
||||
)
|
||||
for lookups, hash := range query {
|
||||
if bytes >= softResponseLimit || receipts.Len() >= maxReceiptsServe ||
|
||||
lookups >= 2*maxReceiptsServe {
|
||||
if bytes >= softResponseLimit || receipts.Len() >= maxReceiptsServe || lookups >= 2*maxReceiptsServe {
|
||||
break
|
||||
}
|
||||
|
||||
// Retrieve the requested block's receipts
|
||||
results := chain.GetReceiptsRLP(hash)
|
||||
if results == nil {
|
||||
if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
body := chain.GetBodyRLP(hash)
|
||||
if body == nil {
|
||||
continue
|
||||
}
|
||||
var err error
|
||||
results, err = blockReceiptsToNetwork(results, body)
|
||||
if err != nil {
|
||||
log.Error("Error in block receipts conversion", "hash", hash, "err", err)
|
||||
continue
|
||||
}
|
||||
continue // Can't retrieve the receipts, so we just skip this block.
|
||||
}
|
||||
body := chain.GetBodyRLP(hash)
|
||||
if body == nil {
|
||||
continue // The block body is missing, we also have to skip.
|
||||
}
|
||||
results, _, err := blockReceiptsToNetwork(results, body, receiptQueryParams{})
|
||||
if err != nil {
|
||||
log.Error("Error in block receipts conversion", "hash", hash, "err", err)
|
||||
continue
|
||||
}
|
||||
receipts.AppendRaw(results)
|
||||
bytes += len(results)
|
||||
|
|
@ -294,6 +298,50 @@ func ServiceGetReceiptsQuery(chain *core.BlockChain, query GetReceiptsRequest) r
|
|||
return receipts
|
||||
}
|
||||
|
||||
// serviceGetReceiptsQuery70 assembles the response to a receipt query.
|
||||
// If the receipts exceed 10 MiB, it trims them and sets the
|
||||
// lastBlockIncomplete flag. Indices smaller than firstBlockReceiptIndex
|
||||
// are omitted from the first block receipt list.
|
||||
func serviceGetReceiptsQuery70(chain *core.BlockChain, query GetReceiptsRequest, firstBlockReceiptIndex uint64) (rlp.RawList[*ReceiptList], bool) {
|
||||
var (
|
||||
bytes int
|
||||
receipts rlp.RawList[*ReceiptList]
|
||||
)
|
||||
for i, hash := range query {
|
||||
if bytes >= softResponseLimit || receipts.Len() >= maxReceiptsServe {
|
||||
break
|
||||
}
|
||||
results := chain.GetReceiptsRLP(hash)
|
||||
if results == nil {
|
||||
continue // Can't retrieve the receipts, so we just skip this block.
|
||||
}
|
||||
body := chain.GetBodyRLP(hash)
|
||||
if body == nil {
|
||||
continue // The block body is missing, we also have to skip.
|
||||
}
|
||||
q := receiptQueryParams{sizeLimit: uint64(maxPacketSize - bytes)}
|
||||
if i == 0 {
|
||||
q.firstIndex = firstBlockReceiptIndex
|
||||
}
|
||||
results, incomplete, err := blockReceiptsToNetwork(results, body, q)
|
||||
if err != nil {
|
||||
log.Error("Error in block receipts conversion", "hash", hash, "err", err)
|
||||
continue
|
||||
}
|
||||
if results == nil {
|
||||
// This case triggers when the first receipt of the block receipts list doesn't
|
||||
// fit. We don't append anything to the response here and consider it finished.
|
||||
break
|
||||
}
|
||||
receipts.AppendRaw(results)
|
||||
bytes += len(results)
|
||||
if incomplete {
|
||||
return receipts, true
|
||||
}
|
||||
}
|
||||
return receipts, false
|
||||
}
|
||||
|
||||
func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
|
||||
// A batch of headers arrived to one of our previous requests
|
||||
res := new(BlockHeadersPacket)
|
||||
|
|
@ -435,9 +483,9 @@ func writeTxForHash(tx []byte, buf *bytes.Buffer) {
|
|||
}
|
||||
}
|
||||
|
||||
func handleReceipts(backend Backend, msg Decoder, peer *Peer) error {
|
||||
func handleReceipts69(backend Backend, msg Decoder, peer *Peer) error {
|
||||
// A batch of receipts arrived to one of our previous requests
|
||||
res := new(ReceiptsPacket)
|
||||
res := new(ReceiptsPacket69)
|
||||
if err := msg.Decode(res); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -452,6 +500,41 @@ func handleReceipts(backend Backend, msg Decoder, peer *Peer) error {
|
|||
return fmt.Errorf("Receipts: %w", err)
|
||||
}
|
||||
|
||||
return dispatchReceipts(res.RequestId, receiptLists, peer)
|
||||
}
|
||||
|
||||
func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error {
|
||||
res := new(ReceiptsPacket70)
|
||||
if err := msg.Decode(res); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tresp := tracker.Response{ID: res.RequestId, MsgCode: ReceiptsMsg, Size: res.List.Len()}
|
||||
if err := peer.tracker.Fulfil(tresp); err != nil {
|
||||
return fmt.Errorf("Receipts: %w", err)
|
||||
}
|
||||
receiptLists, err := res.List.Items()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Receipts: %w", err)
|
||||
}
|
||||
|
||||
err = peer.bufferReceipts(res.RequestId, receiptLists, res.LastBlockIncomplete, backend)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res.LastBlockIncomplete {
|
||||
// Request the remaining receipts from the same peer.
|
||||
return peer.requestPartialReceipts(res.RequestId)
|
||||
}
|
||||
if complete := peer.flushReceipts(res.RequestId); complete != nil {
|
||||
receiptLists = complete
|
||||
}
|
||||
|
||||
return dispatchReceipts(res.RequestId, receiptLists, peer)
|
||||
}
|
||||
|
||||
// dispatchReceipts submits a receipt response to the dispatcher.
|
||||
func dispatchReceipts(requestId uint64, receiptLists []*ReceiptList, peer *Peer) error {
|
||||
metadata := func() interface{} {
|
||||
hasher := trie.NewStackTrie(nil)
|
||||
hashes := make([]common.Hash, len(receiptLists))
|
||||
|
|
@ -470,7 +553,7 @@ func handleReceipts(backend Backend, msg Decoder, peer *Peer) error {
|
|||
enc = append(enc, encReceipts)
|
||||
}
|
||||
return peer.dispatchResponse(&Response{
|
||||
id: res.RequestId,
|
||||
id: requestId,
|
||||
code: ReceiptsMsg,
|
||||
Res: &enc,
|
||||
}, metadata)
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ func testHandshake(t *testing.T, protocol uint) {
|
|||
defer app.Close()
|
||||
defer net.Close()
|
||||
|
||||
peer := NewPeer(protocol, p2p.NewPeer(enode.ID{}, "peer", nil), net, nil)
|
||||
peer := NewPeer(protocol, p2p.NewPeer(enode.ID{}, "peer", nil), net, nil, nil)
|
||||
defer peer.Close()
|
||||
|
||||
// Send the junk test with one peer, check the handshake failure
|
||||
|
|
|
|||
|
|
@ -17,7 +17,10 @@
|
|||
package eth
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
|
@ -26,6 +29,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/p2p/tracker"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
|
|
@ -43,6 +47,15 @@ const (
|
|||
maxQueuedTxAnns = 4096
|
||||
)
|
||||
|
||||
// receiptRequest tracks the state of an in-flight receipt retrieval operation.
|
||||
type receiptRequest struct {
|
||||
request []common.Hash // block hashes corresponding to the requested receipts
|
||||
gasUsed []uint64 // block gas used corresponding to the requested receipts
|
||||
timestamps []uint64 // block timestamps corresponding to the requested receipts
|
||||
list []*ReceiptList // list of partially collected receipts
|
||||
lastLogSize uint64 // log size of last receipt list
|
||||
}
|
||||
|
||||
// Peer is a collection of relevant information we have about a `eth` peer.
|
||||
type Peer struct {
|
||||
*p2p.Peer // The embedded P2P package peer
|
||||
|
|
@ -63,28 +76,35 @@ type Peer struct {
|
|||
reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them
|
||||
resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them
|
||||
|
||||
chainConfig *params.ChainConfig // Chain configuration for fork-aware validation
|
||||
|
||||
receiptBuffer map[uint64]*receiptRequest // Previously requested receipts to buffer partial receipts
|
||||
receiptBufferLock sync.Mutex // Lock for protecting the receiptBuffer
|
||||
|
||||
term chan struct{} // Termination channel to stop the broadcasters
|
||||
}
|
||||
|
||||
// NewPeer creates a wrapper for a network connection and negotiated protocol
|
||||
// version.
|
||||
func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer {
|
||||
func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool, chainConfig *params.ChainConfig) *Peer {
|
||||
cap := p2p.Cap{Name: ProtocolName, Version: version}
|
||||
id := p.ID().String()
|
||||
peer := &Peer{
|
||||
id: id,
|
||||
Peer: p,
|
||||
rw: rw,
|
||||
version: version,
|
||||
knownTxs: newKnownCache(maxKnownTxs),
|
||||
txBroadcast: make(chan []common.Hash),
|
||||
txAnnounce: make(chan []common.Hash),
|
||||
tracker: tracker.New(cap, id, 5*time.Minute),
|
||||
reqDispatch: make(chan *request),
|
||||
reqCancel: make(chan *cancel),
|
||||
resDispatch: make(chan *response),
|
||||
txpool: txpool,
|
||||
term: make(chan struct{}),
|
||||
id: p.ID().String(),
|
||||
Peer: p,
|
||||
rw: rw,
|
||||
version: version,
|
||||
knownTxs: newKnownCache(maxKnownTxs),
|
||||
txBroadcast: make(chan []common.Hash),
|
||||
txAnnounce: make(chan []common.Hash),
|
||||
tracker: tracker.New(cap, id, 5*time.Minute),
|
||||
reqDispatch: make(chan *request),
|
||||
reqCancel: make(chan *cancel),
|
||||
resDispatch: make(chan *response),
|
||||
txpool: txpool,
|
||||
chainConfig: chainConfig,
|
||||
receiptBuffer: make(map[uint64]*receiptRequest),
|
||||
term: make(chan struct{}),
|
||||
}
|
||||
// Start up all the broadcasters
|
||||
go peer.broadcastTransactions()
|
||||
|
|
@ -214,14 +234,23 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
|
|||
})
|
||||
}
|
||||
|
||||
// ReplyReceiptsRLP is the response to GetReceipts.
|
||||
func (p *Peer) ReplyReceiptsRLP(id uint64, receipts rlp.RawList[*ReceiptList]) error {
|
||||
return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsPacket{
|
||||
// ReplyReceiptsRLP69 is the response to GetReceipts.
|
||||
func (p *Peer) ReplyReceiptsRLP69(id uint64, receipts rlp.RawList[*ReceiptList]) error {
|
||||
return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsPacket69{
|
||||
RequestId: id,
|
||||
List: receipts,
|
||||
})
|
||||
}
|
||||
|
||||
// ReplyReceiptsRLP70 is the response to GetReceipts.
|
||||
func (p *Peer) ReplyReceiptsRLP70(id uint64, receipts rlp.RawList[*ReceiptList], lastBlockIncomplete bool) error {
|
||||
return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsPacket70{
|
||||
RequestId: id,
|
||||
List: receipts,
|
||||
LastBlockIncomplete: lastBlockIncomplete,
|
||||
})
|
||||
}
|
||||
|
||||
// RequestOneHeader is a wrapper around the header query functions to fetch a
|
||||
// single header. It is used solely by the fetcher.
|
||||
func (p *Peer) RequestOneHeader(hash common.Hash, sink chan *Response) (*Request, error) {
|
||||
|
|
@ -330,20 +359,45 @@ func (p *Peer) RequestBodies(hashes []common.Hash, sink chan *Response) (*Reques
|
|||
}
|
||||
|
||||
// RequestReceipts fetches a batch of transaction receipts from a remote node.
|
||||
func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Request, error) {
|
||||
// `gasUsed` provides the total gas used per block, used to estimate the maximum
|
||||
// log byte size. `timestamps` provides the block timestamps for fork aware validation.
|
||||
func (p *Peer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, timestamps []uint64, sink chan *Response) (*Request, error) {
|
||||
p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
|
||||
id := rand.Uint64()
|
||||
|
||||
req := &Request{
|
||||
id: id,
|
||||
sink: sink,
|
||||
code: GetReceiptsMsg,
|
||||
want: ReceiptsMsg,
|
||||
numItems: len(hashes),
|
||||
data: &GetReceiptsPacket{
|
||||
RequestId: id,
|
||||
GetReceiptsRequest: hashes,
|
||||
},
|
||||
var req *Request
|
||||
if p.version > ETH69 {
|
||||
req = &Request{
|
||||
id: id,
|
||||
sink: sink,
|
||||
code: GetReceiptsMsg,
|
||||
want: ReceiptsMsg,
|
||||
numItems: len(hashes),
|
||||
data: &GetReceiptsPacket70{
|
||||
RequestId: id,
|
||||
FirstBlockReceiptIndex: 0,
|
||||
GetReceiptsRequest: hashes,
|
||||
},
|
||||
}
|
||||
p.receiptBufferLock.Lock()
|
||||
p.receiptBuffer[id] = &receiptRequest{
|
||||
request: hashes,
|
||||
gasUsed: gasUsed,
|
||||
timestamps: timestamps,
|
||||
}
|
||||
p.receiptBufferLock.Unlock()
|
||||
} else {
|
||||
req = &Request{
|
||||
id: id,
|
||||
sink: sink,
|
||||
code: GetReceiptsMsg,
|
||||
want: ReceiptsMsg,
|
||||
numItems: len(hashes),
|
||||
data: &GetReceiptsPacket69{
|
||||
RequestId: id,
|
||||
GetReceiptsRequest: hashes,
|
||||
},
|
||||
}
|
||||
}
|
||||
if err := p.dispatchRequest(req); err != nil {
|
||||
return nil, err
|
||||
|
|
@ -351,6 +405,153 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
|
|||
return req, nil
|
||||
}
|
||||
|
||||
// HandlePartialReceipts re-request partial receipts
|
||||
func (p *Peer) requestPartialReceipts(id uint64) error {
|
||||
p.receiptBufferLock.Lock()
|
||||
defer p.receiptBufferLock.Unlock()
|
||||
|
||||
// Do not re-request for the stale request
|
||||
if _, ok := p.receiptBuffer[id]; !ok {
|
||||
return nil
|
||||
}
|
||||
lastBlock := len(p.receiptBuffer[id].list) - 1
|
||||
lastReceipt := p.receiptBuffer[id].list[lastBlock].items.Len()
|
||||
|
||||
hashes := p.receiptBuffer[id].request[lastBlock:]
|
||||
|
||||
req := &Request{
|
||||
id: id,
|
||||
sink: nil,
|
||||
code: GetReceiptsMsg,
|
||||
want: ReceiptsMsg,
|
||||
data: &GetReceiptsPacket70{
|
||||
RequestId: id,
|
||||
FirstBlockReceiptIndex: uint64(lastReceipt),
|
||||
GetReceiptsRequest: hashes,
|
||||
},
|
||||
numItems: len(hashes),
|
||||
}
|
||||
return p.dispatchRequest(req)
|
||||
}
|
||||
|
||||
// bufferReceipts validates a receipt packet and buffer the incomplete packet.
|
||||
// If the request is completed, it appends previously collected receipts.
|
||||
func (p *Peer) bufferReceipts(requestId uint64, receiptLists []*ReceiptList, lastBlockIncomplete bool, backend Backend) error {
|
||||
p.receiptBufferLock.Lock()
|
||||
defer p.receiptBufferLock.Unlock()
|
||||
|
||||
buffer := p.receiptBuffer[requestId]
|
||||
|
||||
// Short circuit for the canceled response
|
||||
if buffer == nil {
|
||||
return nil
|
||||
}
|
||||
// If the response is empty, the peer likely does not have the requested receipts.
|
||||
// Forward the empty response to the internal handler regardless. However, note
|
||||
// that an empty response marked as incomplete is considered invalid.
|
||||
if len(receiptLists) == 0 {
|
||||
delete(p.receiptBuffer, requestId)
|
||||
|
||||
if lastBlockIncomplete {
|
||||
return errors.New("invalid empty receipt response with incomplete flag")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Buffer the last block when the response is incomplete.
|
||||
if lastBlockIncomplete {
|
||||
lastBlock := len(receiptLists) - 1
|
||||
if len(buffer.list) > 0 {
|
||||
lastBlock += len(buffer.list) - 1
|
||||
}
|
||||
gasUsed := buffer.gasUsed[lastBlock]
|
||||
timestamp := buffer.timestamps[lastBlock]
|
||||
logSize, err := p.validateLastBlockReceipt(receiptLists, requestId, gasUsed, timestamp)
|
||||
if err != nil {
|
||||
delete(p.receiptBuffer, requestId)
|
||||
return err
|
||||
}
|
||||
// Update the buffered data and trim the packet to exclude the incomplete block.
|
||||
if len(buffer.list) > 0 {
|
||||
// If the buffer is already allocated, it means that the previous response
|
||||
// was incomplete Append the first block receipts.
|
||||
buffer.list[len(buffer.list)-1].Append(receiptLists[0])
|
||||
buffer.list = append(buffer.list, receiptLists[1:]...)
|
||||
buffer.lastLogSize = logSize
|
||||
} else {
|
||||
buffer.list = receiptLists
|
||||
buffer.lastLogSize = logSize
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Short circuit if there is nothing cached previously.
|
||||
if len(buffer.list) == 0 {
|
||||
delete(p.receiptBuffer, requestId)
|
||||
return nil
|
||||
}
|
||||
// Aggregate the cached result into the packet.
|
||||
buffer.list[len(buffer.list)-1].Append(receiptLists[0])
|
||||
buffer.list = append(buffer.list, receiptLists[1:]...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// flushReceipts retrieves the merged receipt lists from the buffer
|
||||
// and removes the buffer entry. Returns nil if no buffered data exists.
|
||||
func (p *Peer) flushReceipts(requestId uint64) []*ReceiptList {
|
||||
p.receiptBufferLock.Lock()
|
||||
defer p.receiptBufferLock.Unlock()
|
||||
|
||||
buffer, ok := p.receiptBuffer[requestId]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
delete(p.receiptBuffer, requestId)
|
||||
return buffer.list
|
||||
}
|
||||
|
||||
// validateLastBlockReceipt validates receipts and return log size of last block receipt.
|
||||
// This function is called only when the `lastBlockincomplete == true`.
|
||||
//
|
||||
// Note that the last receipt response (which completes receiptLists of a pending block)
|
||||
// is not verified here. Those response doesn't need hueristics below since they can be
|
||||
// verified by its trie root.
|
||||
func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList, id uint64, gasUsed uint64, timestamp uint64) (uint64, error) {
|
||||
lastReceipts := receiptLists[len(receiptLists)-1]
|
||||
|
||||
// If the receipt is in the middle of retrieval, use the buffered data.
|
||||
// e.g. [[receipt1], [receipt1, receipt2], incomplete = true]
|
||||
// [[receipt3, receipt4], incomplete = true] <<--
|
||||
// [[receipt5], [receipt1], incomplete = false]
|
||||
// This case happens only if len(receiptLists) == 1 && incomplete == true && buffered before.
|
||||
var previousTxs int
|
||||
var previousLog uint64
|
||||
if buffer, ok := p.receiptBuffer[id]; ok && len(buffer.list) > 0 && len(receiptLists) == 1 {
|
||||
previousTxs = buffer.list[len(buffer.list)-1].items.Len()
|
||||
previousLog = buffer.lastLogSize
|
||||
}
|
||||
|
||||
// Verify that the total number of transactions delivered is under the limit.
|
||||
var minTxGas uint64
|
||||
if p.chainConfig != nil && p.chainConfig.AmsterdamTime != nil && *p.chainConfig.AmsterdamTime <= timestamp {
|
||||
minTxGas = 4500
|
||||
} else {
|
||||
minTxGas = 21000
|
||||
}
|
||||
if uint64(previousTxs+lastReceipts.items.Len()) > gasUsed/minTxGas {
|
||||
// should be dropped, don't clear the buffer
|
||||
return 0, fmt.Errorf("total number of tx exceeded limit")
|
||||
}
|
||||
// Count log size per receipt
|
||||
log, err := lastReceipts.LogsSize()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Verify that the overall downloaded receipt size does not exceed the block gas limit.
|
||||
if previousLog+log > gasUsed/params.LogDataGas {
|
||||
return 0, fmt.Errorf("total download receipt size exceeded the limit")
|
||||
}
|
||||
return previousLog + log, nil
|
||||
}
|
||||
|
||||
// RequestTxs fetches a batch of transactions from a remote node.
|
||||
func (p *Peer) RequestTxs(hashes []common.Hash) error {
|
||||
p.Log().Trace("Fetching batch of transactions", "count", len(hashes))
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ func newTestPeer(name string, version uint, backend Backend) (*testPeer, <-chan
|
|||
var id enode.ID
|
||||
rand.Read(id[:])
|
||||
|
||||
peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool())
|
||||
peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool(), nil)
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
defer app.Close()
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import (
|
|||
// Constants to match up protocol versions and messages
|
||||
const (
|
||||
ETH69 = 69
|
||||
ETH70 = 70
|
||||
)
|
||||
|
||||
// ProtocolName is the official short name of the `eth` protocol used during
|
||||
|
|
@ -38,11 +39,11 @@ const ProtocolName = "eth"
|
|||
|
||||
// ProtocolVersions are the supported versions of the `eth` protocol (first
|
||||
// is primary).
|
||||
var ProtocolVersions = []uint{ETH69}
|
||||
var ProtocolVersions = []uint{ETH70, ETH69}
|
||||
|
||||
// protocolLengths are the number of implemented message corresponding to
|
||||
// different protocol versions.
|
||||
var protocolLengths = map[uint]uint64{ETH69: 18}
|
||||
var protocolLengths = map[uint]uint64{ETH69: 18, ETH70: 18}
|
||||
|
||||
// maxMessageSize is the maximum cap on the size of a protocol message.
|
||||
const maxMessageSize = 10 * 1024 * 1024
|
||||
|
|
@ -211,22 +212,36 @@ type BlockBody struct {
|
|||
// GetReceiptsRequest represents a block receipts query.
|
||||
type GetReceiptsRequest []common.Hash
|
||||
|
||||
// GetReceiptsPacket represents a block receipts query with request ID wrapping.
|
||||
type GetReceiptsPacket struct {
|
||||
// GetReceiptsPacket69 represents a block receipts query with request ID wrapping.
|
||||
type GetReceiptsPacket69 struct {
|
||||
RequestId uint64
|
||||
GetReceiptsRequest
|
||||
}
|
||||
|
||||
// GetReceiptsPacket70 represents a block receipts query with request ID and
|
||||
// FirstBlockReceiptIndex wrapping.
|
||||
type GetReceiptsPacket70 struct {
|
||||
RequestId uint64
|
||||
FirstBlockReceiptIndex uint64
|
||||
GetReceiptsRequest
|
||||
}
|
||||
|
||||
// ReceiptsResponse is the network packet for block receipts distribution.
|
||||
type ReceiptsResponse []types.Receipts
|
||||
|
||||
// ReceiptsPacket is the network packet for block receipts distribution with
|
||||
// ReceiptsPacket69 is the network packet for block receipts distribution with
|
||||
// request ID wrapping.
|
||||
type ReceiptsPacket struct {
|
||||
type ReceiptsPacket69 struct {
|
||||
RequestId uint64
|
||||
List rlp.RawList[*ReceiptList]
|
||||
}
|
||||
|
||||
type ReceiptsPacket70 struct {
|
||||
RequestId uint64
|
||||
LastBlockIncomplete bool
|
||||
List rlp.RawList[*ReceiptList]
|
||||
}
|
||||
|
||||
// ReceiptsRLPResponse is used for receipts, when we already have it encoded
|
||||
type ReceiptsRLPResponse []rlp.RawValue
|
||||
|
||||
|
|
|
|||
|
|
@ -82,11 +82,10 @@ func TestEmptyMessages(t *testing.T) {
|
|||
GetBlockBodiesPacket{1111, nil},
|
||||
BlockBodiesRLPPacket{1111, nil},
|
||||
// Receipts
|
||||
GetReceiptsPacket{1111, nil},
|
||||
GetReceiptsPacket69{1111, nil},
|
||||
// Transactions
|
||||
GetPooledTransactionsPacket{1111, nil},
|
||||
PooledTransactionsRLPPacket{1111, nil},
|
||||
|
||||
// Headers
|
||||
BlockHeadersPacket{1111, encodeRL([]*types.Header{})},
|
||||
// Bodies
|
||||
|
|
@ -94,8 +93,8 @@ func TestEmptyMessages(t *testing.T) {
|
|||
BlockBodiesPacket{1111, encodeRL([]BlockBody{})},
|
||||
BlockBodiesRLPPacket{1111, BlockBodiesRLPResponse([]rlp.RawValue{})},
|
||||
// Receipts
|
||||
GetReceiptsPacket{1111, GetReceiptsRequest([]common.Hash{})},
|
||||
ReceiptsPacket{1111, encodeRL([]*ReceiptList{})},
|
||||
GetReceiptsPacket69{1111, GetReceiptsRequest([]common.Hash{})},
|
||||
ReceiptsPacket69{1111, encodeRL([]*ReceiptList{})},
|
||||
// Transactions
|
||||
GetPooledTransactionsPacket{1111, GetPooledTransactionsRequest([]common.Hash{})},
|
||||
PooledTransactionsPacket{1111, encodeRL([]*types.Transaction{})},
|
||||
|
|
@ -220,11 +219,11 @@ func TestMessages(t *testing.T) {
|
|||
common.FromHex("f902dc820457f902d6f902d3f8d2f867088504a817c8088302e2489435353535353535353535353535353535353535358202008025a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10f867098504a817c809830334509435353535353535353535353535353535353535358202d98025a052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afba052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afbf901fcf901f9a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000940000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008208ae820d0582115c8215b3821a0a827788a00000000000000000000000000000000000000000000000000000000000000000880000000000000000"),
|
||||
},
|
||||
{
|
||||
GetReceiptsPacket{1111, GetReceiptsRequest(hashes)},
|
||||
GetReceiptsPacket69{1111, GetReceiptsRequest(hashes)},
|
||||
common.FromHex("f847820457f842a000000000000000000000000000000000000000000000000000000000deadc0dea000000000000000000000000000000000000000000000000000000000feedbeef"),
|
||||
},
|
||||
{
|
||||
ReceiptsPacket{1111, encodeRL([]*ReceiptList{NewReceiptList(receipts)})},
|
||||
ReceiptsPacket69{1111, encodeRL([]*ReceiptList{NewReceiptList(receipts)})},
|
||||
common.FromHex("f8da820457f8d5f8d3f866808082014df85ff85d940000000000000000000000000000000000000011f842a0000000000000000000000000000000000000000000000000000000000000deada0000000000000000000000000000000000000000000000000000000000000beef830100fff86901018201bcf862f860940000000000000000000000000000000000000022f842a00000000000000000000000000000000000000000000000000000000000005668a0000000000000000000000000000000000000000000000000000000000000977386020f0f0f0608"),
|
||||
},
|
||||
{
|
||||
|
|
|
|||
|
|
@ -202,12 +202,55 @@ func (rl *ReceiptList) Derivable() types.DerivableList {
|
|||
})
|
||||
}
|
||||
|
||||
// blockReceiptsToNetwork takes a slice of rlp-encoded receipts, and transactions,
|
||||
// and re-encodes them for the network protocol.
|
||||
func blockReceiptsToNetwork(blockReceipts, blockBody rlp.RawValue) ([]byte, error) {
|
||||
// Append appends all items from another ReceiptList to this list.
|
||||
func (rl *ReceiptList) Append(other *ReceiptList) {
|
||||
rl.items.AppendList(&other.items)
|
||||
}
|
||||
|
||||
// LogsSize returns the total size of log data across all receipts of the list.
|
||||
func (rl *ReceiptList) LogsSize() (uint64, error) {
|
||||
var size uint64
|
||||
it := rl.items.ContentIterator()
|
||||
for it.Next() {
|
||||
// The encoded receipts are of the form:
|
||||
//
|
||||
// [txType, status, cumulativeGasUsed, [logs...]]
|
||||
//
|
||||
// We want to count the size of logs.
|
||||
// So we strip the outer list first:
|
||||
content, _, err := rlp.SplitList(it.Value())
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid receipt structure: %v", err)
|
||||
}
|
||||
// then skip over txType, status, cumulativeGasUsed:
|
||||
rest := content
|
||||
for range 3 {
|
||||
_, _, rest, err = rlp.Split(rest)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid receipt structure: %v", err)
|
||||
}
|
||||
}
|
||||
// and finally access the logs list to get its inner size:
|
||||
logsContent, _, err := rlp.SplitList(rest)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid receipt logs: %v", err)
|
||||
}
|
||||
size += uint64(len(logsContent))
|
||||
}
|
||||
return size, nil
|
||||
}
|
||||
|
||||
type receiptQueryParams struct {
|
||||
firstIndex uint64
|
||||
sizeLimit uint64
|
||||
}
|
||||
|
||||
// blockReceiptsToNetwork takes a slice of rlp-encoded receipts (in the 'storage' encoding),
|
||||
// and an encoded block body, and re-encodes the receipts for the network protocol.
|
||||
func blockReceiptsToNetwork(blockReceipts, blockBody rlp.RawValue, q receiptQueryParams) (output []byte, incomplete bool, err error) {
|
||||
txTypesIter, err := txTypesInBody(blockBody)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid block body: %v", err)
|
||||
return nil, false, fmt.Errorf("invalid block body: %v", err)
|
||||
}
|
||||
nextTxType, stopTxTypes := iter.Pull(txTypesIter)
|
||||
defer stopTxTypes()
|
||||
|
|
@ -219,8 +262,28 @@ func blockReceiptsToNetwork(blockReceipts, blockBody rlp.RawValue) ([]byte, erro
|
|||
)
|
||||
outer := enc.List()
|
||||
for i := 0; it.Next(); i++ {
|
||||
txType, _ := nextTxType()
|
||||
txType, ok := nextTxType()
|
||||
if !ok {
|
||||
return nil, false, fmt.Errorf("block has less txs than receipts (%d)", i)
|
||||
}
|
||||
// Skip receipts before the requested index.
|
||||
if uint64(i) < q.firstIndex {
|
||||
continue
|
||||
}
|
||||
content, _, _ := rlp.SplitList(it.Value())
|
||||
// Stop appending receipts when they would go over the size limit.
|
||||
// Note we rely on the assumption that the txType is encoded as a single byte,
|
||||
// which is always true because EIP-2718 does not allow tx types > 0x7f.
|
||||
size := rlp.ListSize(1 + uint64(len(content)))
|
||||
if q.sizeLimit > 0 && (uint64(enc.Size())+size) > q.sizeLimit {
|
||||
if uint(i) == uint(q.firstIndex) {
|
||||
// The first receipt doesn't fit into the size limit.
|
||||
return nil, false, nil
|
||||
}
|
||||
incomplete = true
|
||||
break
|
||||
}
|
||||
|
||||
receiptList := enc.List()
|
||||
enc.WriteUint64(uint64(txType))
|
||||
enc.Write(content)
|
||||
|
|
@ -228,7 +291,7 @@ func blockReceiptsToNetwork(blockReceipts, blockBody rlp.RawValue) ([]byte, erro
|
|||
}
|
||||
enc.ListEnd(outer)
|
||||
enc.Flush()
|
||||
return out.Bytes(), nil
|
||||
return out.Bytes(), incomplete, nil
|
||||
}
|
||||
|
||||
// txTypesInBody parses the transactions list of an encoded block body, returning just the types.
|
||||
|
|
|
|||
|
|
@ -105,10 +105,13 @@ func TestReceiptList(t *testing.T) {
|
|||
canonBody, _ := rlp.EncodeToBytes(blockBody)
|
||||
|
||||
// convert from storage encoding to network encoding
|
||||
network, err := blockReceiptsToNetwork(canonDB, canonBody)
|
||||
network, incomplete, err := blockReceiptsToNetwork(canonDB, canonBody, receiptQueryParams{})
|
||||
if err != nil {
|
||||
t.Fatalf("test[%d]: blockReceiptsToNetwork error: %v", i, err)
|
||||
}
|
||||
if incomplete {
|
||||
t.Fatalf("test[%d]: blockReceiptsToNetwork returned incomplete == true", i)
|
||||
}
|
||||
|
||||
// parse as Receipts response list from network encoding
|
||||
var rl ReceiptList
|
||||
|
|
|
|||
|
|
@ -50,8 +50,8 @@ func testSnapSyncDisabling(t *testing.T, ethVer uint, snapVer uint) {
|
|||
defer emptyPipeEth.Close()
|
||||
defer fullPipeEth.Close()
|
||||
|
||||
emptyPeerEth := eth.NewPeer(ethVer, p2p.NewPeer(enode.ID{1}, "", caps), emptyPipeEth, empty.txpool)
|
||||
fullPeerEth := eth.NewPeer(ethVer, p2p.NewPeer(enode.ID{2}, "", caps), fullPipeEth, full.txpool)
|
||||
emptyPeerEth := eth.NewPeer(ethVer, p2p.NewPeer(enode.ID{1}, "", caps), emptyPipeEth, empty.txpool, nil)
|
||||
fullPeerEth := eth.NewPeer(ethVer, p2p.NewPeer(enode.ID{2}, "", caps), fullPipeEth, full.txpool, nil)
|
||||
defer emptyPeerEth.Close()
|
||||
defer fullPeerEth.Close()
|
||||
|
||||
|
|
|
|||
2
rlp/rlpgen/testdata/pkgclash.in.txt
vendored
2
rlp/rlpgen/testdata/pkgclash.in.txt
vendored
|
|
@ -9,5 +9,5 @@ import (
|
|||
|
||||
type Test struct {
|
||||
A eth1.MinerAPI
|
||||
B eth2.GetReceiptsPacket
|
||||
B eth2.GetReceiptsPacket69
|
||||
}
|
||||
|
|
|
|||
2
rlp/rlpgen/testdata/pkgclash.out.txt
vendored
2
rlp/rlpgen/testdata/pkgclash.out.txt
vendored
|
|
@ -41,7 +41,7 @@ func (obj *Test) DecodeRLP(dec *rlp.Stream) error {
|
|||
}
|
||||
_tmp0.A = _tmp1
|
||||
// B:
|
||||
var _tmp2 eth1.GetReceiptsPacket
|
||||
var _tmp2 eth1.GetReceiptsPacket69
|
||||
{
|
||||
if _, err := dec.List(); err != nil {
|
||||
return err
|
||||
|
|
|
|||
Loading…
Reference in a new issue