feat: add handleGetReceipts70

This commit is contained in:
healthykim 2025-11-12 00:39:38 +09:00
parent 429066f2d9
commit bf16cd59ec
16 changed files with 593 additions and 105 deletions

View file

@ -66,9 +66,9 @@ func (s *Suite) dialAs(key *ecdsa.PrivateKey) (*Conn, error) {
return nil, err
}
conn.caps = []p2p.Cap{
{Name: "eth", Version: 69},
{Name: "eth", Version: 70},
}
conn.ourHighestProtoVersion = 69
conn.ourHighestProtoVersion = 70
return &conn, nil
}

View file

@ -81,7 +81,7 @@ func (s *Suite) EthTests() []utesting.Test {
{Name: "ZeroRequestID", Fn: s.TestZeroRequestID},
// get history
{Name: "GetBlockBodies", Fn: s.TestGetBlockBodies},
{Name: "GetReceipts", Fn: s.TestGetReceipts},
{Name: "GetReceipts70", Fn: s.TestGetReceipts},
// test transactions
{Name: "LargeTxRequest", Fn: s.TestLargeTxRequest, Slow: true},
{Name: "Transaction", Fn: s.TestTransaction},
@ -434,15 +434,16 @@ func (s *Suite) TestGetReceipts(t *utesting.T) {
}
// Create block bodies request.
req := &eth.GetReceiptsPacket{
RequestId: 66,
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
req := &eth.GetReceiptsPacket70{
RequestId: 66,
GetReceiptsRequest: (eth.GetReceiptsRequest)(hashes),
FirstBlockReceiptIndex: 0,
}
if err := conn.Write(ethProto, eth.GetReceiptsMsg, req); err != nil {
t.Fatalf("could not write to connection: %v", err)
}
// Wait for response.
resp := new(eth.ReceiptsPacket[*eth.ReceiptList69])
resp := new(eth.ReceiptsPacket70)
if err := conn.ReadMsg(ethProto, eth.ReceiptsMsg, &resp); err != nil {
t.Fatalf("error reading block bodies msg: %v", err)
}

View file

@ -307,7 +307,15 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
// reschedule the timeout timer.
index, live := ordering[res.Req]
if live {
timeouts.Remove(index)
req := timeouts.Remove(index)
delete(ordering, res.Req)
if res.Partial {
ttl := d.peers.rates.TargetTimeout()
ordering[req] = timeouts.Size()
timeouts.Push(req, -time.Now().Add(ttl).UnixNano())
}
if index == 0 {
if !timeout.Stop() {
<-timeout.C
@ -317,16 +325,17 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
timeout.Reset(time.Until(time.Unix(0, -exp)))
}
}
delete(ordering, res.Req)
}
// Delete the pending request (if it still exists) and mark the peer idle
delete(pending, res.Req.Peer)
delete(stales, res.Req.Peer)
if !res.Partial {
// Delete the pending request (if it still exists) and mark the peer idle
delete(pending, res.Req.Peer)
delete(stales, res.Req.Peer)
res.Req.Close()
}
// Signal the dispatcher that the round trip is done. We'll drop the
// peer if the data turns out to be junk.
res.Done <- nil
res.Req.Close()
// If the peer was previously banned and failed to deliver its pack
// in a reasonable time frame, ignore its message.

View file

@ -91,7 +91,7 @@ func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int,
receipts := *packet.Res.(*eth.ReceiptsRLPResponse)
hashes := packet.Meta.([]common.Hash) // {receipt hashes}
accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes)
accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes, packet.Partial, packet.From)
switch {
case err == nil && len(receipts) == 0:
peer.log.Trace("Requested receipts delivered")

View file

@ -629,13 +629,13 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH
result.SetBodyDone()
}
return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool,
bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct)
bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct, false, 0)
}
// DeliverReceipts injects a receipt retrieval response into the results queue.
// The method returns the number of transaction receipts accepted from the delivery
// and also wakes any threads waiting for data delivery.
func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash) (int, error) {
func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash, incomplete bool, from int) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
@ -650,7 +650,7 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptLi
result.SetReceiptsDone()
}
return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool,
receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct)
receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct, incomplete, from)
}
// deliver injects a data retrieval response into the results queue.
@ -662,14 +662,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest,
reqTimer *metrics.Timer, resInMeter, resDropMeter *metrics.Meter,
results int, validate func(index int, header *types.Header) error,
reconstruct func(index int, result *fetchResult)) (int, error) {
reconstruct func(index int, result *fetchResult), incomplete bool, from int) (int, error) {
// Short circuit if the data was never requested
request := pendPool[id]
if request == nil {
resDropMeter.Mark(int64(results))
return 0, errNoFetchesPending
}
delete(pendPool, id)
if !incomplete {
delete(pendPool, id)
}
reqTimer.UpdateSince(request.Time)
resInMeter.Mark(int64(results))
@ -677,7 +679,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
// If no data items were retrieved, mark them as unavailable for the origin peer
if results == 0 {
for _, header := range request.Headers {
request.Peer.MarkLacking(header.Hash())
request.Peer.MarkLacking(header.Hash()) //todo?
}
}
// Assemble each of the results with their headers and retrieved data parts
@ -687,7 +689,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
i int
hashes []common.Hash
)
for _, header := range request.Headers {
for _, header := range request.Headers[from:] {
// Short circuit assembly if no more fetch results are found
if i >= results {
break
@ -701,7 +703,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
i++
}
for _, header := range request.Headers[:i] {
for _, header := range request.Headers[from : from+i] {
if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale {
reconstruct(accepted, res)
} else {
@ -718,8 +720,15 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
resDropMeter.Mark(int64(results - accepted))
// Return all failed or missing fetches to the queue
for _, header := range request.Headers[accepted:] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
//todo
if incomplete {
for _, header := range request.Headers[from+accepted : from+results] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
} else {
for _, header := range request.Headers[from+accepted:] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
// Wake up Results
if accepted > 0 {

View file

@ -32,32 +32,45 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
type blockConfig struct {
txPeriod int
txCount int
}
var emptyBlock = blockConfig{txPeriod: 0, txCount: 0}
var defaultBlock = blockConfig{txPeriod: 2, txCount: 1}
// makeChain creates a chain of n blocks starting at and including parent.
// The returned hash chain is ordered head->parent.
// If empty is false, every second block (i%2==0) contains one transaction.
// If config.txCount > 0, every config.txPeriod-th block contains config.txCount transactions.
// No uncles are added.
func makeChain(n int, seed byte, parent *types.Block, empty bool) ([]*types.Block, []types.Receipts) {
func makeChain(n int, seed byte, parent *types.Block, config blockConfig) ([]*types.Block, []types.Receipts) {
blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) {
block.SetCoinbase(common.Address{seed})
// Add one tx to every second block
if !empty && i%2 == 0 {
signer := types.MakeSigner(params.TestChainConfig, block.Number(), block.Timestamp())
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey)
if err != nil {
panic(err)
// Add transactions according to config
if config.txCount > 0 && i%config.txPeriod == 0 {
for range config.txCount {
signer := types.MakeSigner(params.TestChainConfig, block.Number(), block.Timestamp())
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey)
if err != nil {
panic(err)
}
block.AddTx(tx)
}
block.AddTx(tx)
}
})
return blocks, receipts
}
type chainData struct {
blocks []*types.Block
offset int
blocks []*types.Block
receipts []types.Receipts
offset int
}
var chain *chainData
@ -66,11 +79,11 @@ var emptyChain *chainData
func init() {
// Create a chain of blocks to import
targetBlocks := 128
blocks, _ := makeChain(targetBlocks, 0, testGenesis, false)
chain = &chainData{blocks, 0}
blocks, receipts := makeChain(targetBlocks, 0, testGenesis, defaultBlock)
chain = &chainData{blocks, receipts, 0}
blocks, _ = makeChain(targetBlocks, 0, testGenesis, true)
emptyChain = &chainData{blocks, 0}
blocks, receipts = makeChain(targetBlocks, 0, testGenesis, emptyBlock)
emptyChain = &chainData{blocks, receipts, 0}
}
func (chain *chainData) headers() []*types.Header {
@ -261,13 +274,149 @@ func TestEmptyBlocks(t *testing.T) {
}
}
// TestPartialReceiptDelivery checks two points below
// 1. Receipts failed validation should be re requested to the other peers
// 2. Partial delivery should not be expired
func TestPartialReceiptDelivery(t *testing.T) {
blocks, receipts := makeChain(64, 0, testGenesis, blockConfig{txPeriod: 1, txCount: 5})
chain := chainData{blocks: blocks, receipts: receipts, offset: 0}
numBlock := len(chain.blocks)
q := newQueue(10, 10)
if !q.Idle() {
t.Errorf("new queue should be idle")
}
q.Prepare(1, SnapSync)
if res := q.Results(false); len(res) != 0 {
t.Fatal("new queue should have 0 results")
}
// Schedule a batch of headers
headers := chain.headers()
hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
q.Schedule(headers, hashes, 1)
peer := dummyPeer("peer-1")
req, _, _ := q.ReserveReceipts(peer, numBlock)
t.Logf("request: length %d", len(req.Headers))
// 1. Deliver partial receipt: should not clear the remaining receipts from pending list
firstCutoff := len(req.Headers) / 3
receiptRLP, rcHashes := getPartialReceiptsDelivery(0, firstCutoff, receipts)
accepted, err := q.DeliverReceipts(peer.id, receiptRLP, rcHashes, true, 0)
if err != nil || accepted != firstCutoff {
t.Fatalf("delivery failed: err %v, accepted %d\n", err, accepted)
}
if pending := q.PendingReceipts(); pending != numBlock-len(req.Headers) {
t.Fatalf("wrong pending receipt length, got %d, exp %d", pending, numBlock-len(req.Headers))
}
for i := range firstCutoff {
headerNumber := req.Headers[i].Number.Uint64()
res, _, _, _, err := q.resultCache.getFetchResult(headerNumber)
if err != nil {
t.Fatalf("fetch result get failed: err %v", err)
}
if res == nil {
t.Fatalf("fetch result is nil: header number %d", headerNumber)
}
if !res.Done(receiptType) {
t.Fatalf("wrong result, block %d receipt not done", headerNumber)
}
}
if flight := q.InFlightReceipts(); !flight {
t.Fatalf("there should be in flight receipts")
}
// 2. Deliver partial receipt with invalid receipt: should clear invalid receipt from pending list
secondCutoff := firstCutoff + len(req.Headers)/3
receiptRLP, rcHashes = getPartialReceiptsDelivery(firstCutoff, secondCutoff, receipts)
// one invalid receipt
rcHashes[len(rcHashes)-1] = common.Hash{}
accepted, err = q.DeliverReceipts(peer.id, receiptRLP, rcHashes, true, firstCutoff)
if accepted != len(rcHashes)-1 {
t.Fatalf("wrong accepted, got %d, exp %d", accepted, len(rcHashes)-1)
}
if err == nil {
t.Fatalf("delivery should fail")
}
// invalid receipt should returned back to the pending pool
if pending := q.PendingReceipts(); pending != numBlock-len(req.Headers)+1 {
t.Fatalf("wrong pending receipt length, got %d, exp %d", pending, numBlock-len(req.Headers))
}
for i := range len(rcHashes) - 1 {
headerNumber := req.Headers[firstCutoff+i].Number.Uint64()
res, _, _, _, err := q.resultCache.getFetchResult(headerNumber)
if err != nil {
t.Fatalf("fetch result get failed: err %v", err)
}
if res == nil {
t.Fatalf("fetch result is nil: header number %d", headerNumber)
}
if !res.Done(receiptType) {
t.Fatalf("wrong result, block %d receipt not done", headerNumber)
}
}
// 3. Deliver partial receipt to complete request
thirdCutoff := len(req.Headers)
receiptRLP, rcHashes = getPartialReceiptsDelivery(secondCutoff, thirdCutoff, receipts)
accepted, err = q.DeliverReceipts(peer.id, receiptRLP, rcHashes, false, secondCutoff)
if accepted != len(rcHashes) {
t.Fatalf("wrong accepted, got %d, exp %d", accepted, len(rcHashes)-1)
}
if err != nil || accepted != thirdCutoff-secondCutoff {
t.Fatalf("delivery failed: err %v, accepted %d\n", err, accepted)
}
for i := range len(rcHashes) {
headerNumber := req.Headers[secondCutoff+i].Number.Uint64()
res, _, _, _, err := q.resultCache.getFetchResult(headerNumber)
if err != nil {
t.Fatalf("fetch result get failed: err %v", err)
}
if res == nil {
t.Fatalf("fetch result is nil: header number %d", headerNumber)
}
if !res.Done(receiptType) {
t.Fatalf("wrong result, block %d receipt not done", headerNumber)
}
}
if q.InFlightReceipts() {
t.Fatal("there shouldn't be reamining in flight receipts")
}
}
func getPartialReceiptsDelivery(from int, to int, receipts []types.Receipts) ([]rlp.RawValue, []common.Hash) {
if from < 0 {
from = 0
}
if to > len(receipts) {
to = len(receipts)
}
hasher := trie.NewStackTrie(nil)
rcHashes := make([]common.Hash, to-from)
for i, rc := range receipts[from:to] {
rcHashes[i] = types.DeriveSha(rc, hasher)
}
return types.EncodeBlockReceiptLists(receipts[from:to]), rcHashes
}
// XTestDelivery does some more extensive testing of events that happen,
// blocks that become known and peers that make reservations and deliveries.
// disabled since it's not really a unit-test, but can be executed to test
// some more advanced scenarios
func XTestDelivery(t *testing.T) {
// the outside network, holding blocks
blo, rec := makeChain(128, 0, testGenesis, false)
blo, rec := makeChain(128, 0, testGenesis, defaultBlock)
world := newNetwork()
world.receipts = rec
world.chain = blo
@ -368,7 +517,7 @@ func XTestDelivery(t *testing.T) {
for i, receipt := range rcs {
hashes[i] = types.DeriveSha(receipt, hasher)
}
_, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes)
_, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes, false, 0)
if err != nil {
fmt.Printf("delivered %d receipts %v\n", len(rcs), err)
}
@ -444,7 +593,7 @@ func (n *network) progress(numBlocks int) {
n.lock.Lock()
defer n.lock.Unlock()
//fmt.Printf("progressing...\n")
newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], false)
newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], emptyBlock)
n.chain = append(n.chain, newBlocks...)
n.receipts = append(n.receipts, newR...)
n.cond.Broadcast()

View file

@ -53,6 +53,9 @@ type Request struct {
Peer string // Demultiplexer if cross-peer requests are batched together
Sent time.Time // Timestamp when the request was sent
reRequest bool
previous uint64 // id of previous index (to find sink)
}
// Close aborts an in-flight request. Although there's no way to notify the
@ -105,6 +108,9 @@ type Response struct {
Meta interface{} // Metadata generated locally on the receiver thread
Time time.Duration // Time it took for the request to be served
Done chan error // Channel to signal message handling to the reader
From int
Partial bool
}
// response is a wrapper around a remote Response that has an error channel to
@ -201,10 +207,17 @@ func (p *Peer) dispatcher() {
reqOp.fail <- err
if err == nil {
// do not overwrite if it is re-request
if _, ok := pending[req.id]; !ok {
pending[req.id] = req
// reuse sink if it is re-request
if req.reRequest {
if _, ok := pending[req.previous]; ok {
req.sink = pending[req.previous].sink
} else {
reqOp.fail <- fmt.Errorf("Cannot find previous request index")
continue
}
delete(pending, req.previous)
}
pending[req.id] = req
}
case cancelOp := <-p.reqCancel:
@ -248,7 +261,10 @@ func (p *Peer) dispatcher() {
resOp.fail <- nil
// Stop tracking the request, the response dispatcher will deliver
delete(pending, res.id)
// For partial response, pending should be removed after re-request
if res.Partial {
delete(pending, res.id)
}
}
case <-p.term:

View file

@ -49,6 +49,8 @@ const (
// containing 200+ transactions nowadays, the practical limit will always
// be softResponseLimit.
maxReceiptsServe = 1024
maxPacketSize = 10 * 1024 * 1024
)
// Handler is a callback to invoke from an outside runner after the boilerplate
@ -195,6 +197,20 @@ var eth69 = map[uint64]msgHandler{
BlockRangeUpdateMsg: handleBlockRangeUpdate,
}
var eth70 = map[uint64]msgHandler{
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
GetBlockHeadersMsg: handleGetBlockHeaders,
BlockHeadersMsg: handleBlockHeaders,
GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies,
GetReceiptsMsg: handleGetReceipts70,
ReceiptsMsg: handleReceipts70,
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
BlockRangeUpdateMsg: handleBlockRangeUpdate,
}
// handleMessage is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func handleMessage(backend Backend, peer *Peer) error {
@ -209,11 +225,14 @@ func handleMessage(backend Backend, peer *Peer) error {
defer msg.Discard()
var handlers map[uint64]msgHandler
if peer.version == ETH68 {
switch peer.version {
case ETH68:
handlers = eth68
} else if peer.version == ETH69 {
case ETH69:
handlers = eth69
} else {
case ETH70:
handlers = eth70
default:
return fmt.Errorf("unknown eth protocol version: %v", peer.version)
}

View file

@ -538,7 +538,7 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
}
// Send the hash request and verify the response
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket{
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket69{
RequestId: 123,
GetReceiptsRequest: hashes,
})
@ -550,6 +550,100 @@ func testGetBlockReceipts(t *testing.T, protocol uint) {
}
}
func TestGetBlockPartialReceipts(t *testing.T) { testGetBlockPartialReceipts(t, ETH70) }
func testGetBlockPartialReceipts(t *testing.T, protocol int) {
// First generate chain and overwrite receipts
generator := func(_ int, block *core.BlockGen) {
for j := 0; j < 5; j++ {
tx, err := types.SignTx(
types.NewTransaction(block.TxNonce(testAddr), testAddr, big.NewInt(1000), params.TxGas, block.BaseFee(), nil),
types.LatestSignerForChainID(params.TestChainConfig.ChainID),
testKey,
)
if err != nil {
t.Fatalf("failed to sign tx: %v", err)
}
block.AddTx(tx)
}
}
backend := newTestBackendWithGenerator(4, true, false, generator)
defer backend.close()
blockCutoff := 2
receiptCutoff := 4
// Replace receipts in DB with larger receipts
targetBlock := backend.chain.GetBlockByNumber(uint64(blockCutoff))
receipts := backend.chain.GetReceiptsByHash(targetBlock.Hash())
receiptSize := params.MaxTxGas / params.LogDataGas // ~2MiB per receipt
for i := range receipts {
payload := make([]byte, receiptSize)
for j := range payload {
payload[j] = byte(i + j)
}
receipts[i].Logs = []*types.Log{
{
Address: common.BytesToAddress([]byte{byte(i + 1)}),
Data: payload,
},
}
}
rawdb.WriteReceipts(backend.db, targetBlock.Hash(), targetBlock.NumberU64(), receipts)
peer, _ := newTestPeer("peer", uint(protocol), backend)
defer peer.close()
var (
hashes []common.Hash
partialReceipt []*ReceiptList69
)
for i := uint64(0); i <= backend.chain.CurrentBlock().Number.Uint64(); i++ {
block := backend.chain.GetBlockByNumber(i)
hashes = append(hashes, block.Hash())
}
for i := 0; i <= blockCutoff; i++ {
block := backend.chain.GetBlockByNumber(uint64(i))
trs := backend.chain.GetReceiptsByHash(block.Hash())
limit := len(trs)
if i == blockCutoff {
limit = receiptCutoff
}
partialReceipt = append(partialReceipt, NewReceiptList69(trs[:limit]))
}
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket70{
RequestId: 123,
GetReceiptsRequest: hashes,
FirstBlockReceiptIndex: 0,
})
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket70{
RequestId: 123,
List: partialReceipt,
LastBlockIncomplete: true,
}); err != nil {
t.Errorf("receipts mismatch: %v", err)
}
// Simulate re-request
partialReceipt = []*ReceiptList69{NewReceiptList69(receipts[receiptCutoff:])}
p2p.Send(peer.app, GetReceiptsMsg, &GetReceiptsPacket70{
RequestId: 123,
GetReceiptsRequest: []common.Hash{hashes[blockCutoff]},
FirstBlockReceiptIndex: uint64(receiptCutoff),
})
if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, &ReceiptsPacket70{
RequestId: 123,
List: partialReceipt,
LastBlockIncomplete: false,
}); err != nil {
t.Errorf("receipts mismatch: %v", err)
}
}
type decoder struct {
msg []byte
}
@ -612,10 +706,10 @@ func setup() (*testBackend, *testPeer) {
}
func FuzzEthProtocolHandlers(f *testing.F) {
handlers := eth69
handlers := eth70
backend, peer := setup()
f.Fuzz(func(t *testing.T, code byte, msg []byte) {
handler := handlers[uint64(code)%protocolLengths[ETH69]]
handler := handlers[uint64(code)%protocolLengths[ETH70]]
if handler == nil {
return
}

View file

@ -17,6 +17,7 @@
package eth
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@ -250,22 +251,31 @@ func ServiceGetBlockBodiesQuery(chain *core.BlockChain, query GetBlockBodiesRequ
func handleGetReceipts68(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message
var query GetReceiptsPacket
var query GetReceiptsPacket69
if err := msg.Decode(&query); err != nil {
return err
}
response := ServiceGetReceiptsQuery68(backend.Chain(), query.GetReceiptsRequest)
return peer.ReplyReceiptsRLP(query.RequestId, response)
return peer.ReplyReceiptsRLP69(query.RequestId, response)
}
func handleGetReceipts69(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message
var query GetReceiptsPacket
var query GetReceiptsPacket69
if err := msg.Decode(&query); err != nil {
return err
}
response := serviceGetReceiptsQuery69(backend.Chain(), query.GetReceiptsRequest)
return peer.ReplyReceiptsRLP(query.RequestId, response)
return peer.ReplyReceiptsRLP69(query.RequestId, response)
}
func handleGetReceipts70(backend Backend, msg Decoder, peer *Peer) error {
var query GetReceiptsPacket70
if err := msg.Decode(&query); err != nil {
return err
}
response, lastBlockIncomplete := serviceGetReceiptsQuery70(backend.Chain(), query.GetReceiptsRequest, query.FirstBlockReceiptIndex)
return peer.ReplyReceiptsRLP70(query.RequestId, response, lastBlockIncomplete)
}
// ServiceGetReceiptsQuery68 assembles the response to a receipt query. It is
@ -342,6 +352,86 @@ func serviceGetReceiptsQuery69(chain *core.BlockChain, query GetReceiptsRequest)
return receipts
}
// serviceGetReceiptsQuery70 assembles the response to a receipt query.
// If the size of receipts is larger than 10MB, it would cut it and flag lastBlockIncomplete
// It omits up to firstBlockReceiptIndex receipt from the first receipt list
func serviceGetReceiptsQuery70(chain *core.BlockChain, query GetReceiptsRequest, firstBlockReceiptIndex uint64) ([]rlp.RawValue, bool) {
var (
bytes int
receipts []rlp.RawValue
lastBlockIncomplete bool
)
for lookups, hash := range query {
if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe ||
lookups >= 2*maxReceiptsServe {
break
}
results := chain.GetReceiptsRLP(hash)
if results == nil {
if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
}
} else {
body := chain.GetBodyRLP(hash)
if body == nil {
continue
}
var err error
results, err = blockReceiptsToNetwork69(results, body)
if err != nil {
log.Error("Error in block receipts conversion", "hash", hash, "err", err)
continue
}
}
// todo: buffer
if firstBlockReceiptIndex > 0 && lookups == 0 {
results, lastBlockIncomplete = trimReceiptsRLP(results, int(firstBlockReceiptIndex))
} else if bytes+len(results) > maxPacketSize {
results, lastBlockIncomplete = trimReceiptsRLP(results, 0)
}
receipts = append(receipts, results)
bytes += len(results)
}
return receipts, lastBlockIncomplete
}
// trimReceiptsRLP trims raw value from `from` index until it exceeds limit
func trimReceiptsRLP(receiptsRLP rlp.RawValue, from int) (rlp.RawValue, bool) {
var (
out bytes.Buffer
buffer = rlp.NewEncoderBuffer(&out)
iter, _ = rlp.NewListIterator(receiptsRLP)
index int
bytes int
overflow bool
)
list := buffer.List()
for iter.Next() {
if index < from {
index++
continue
}
receipt := iter.Value()
if bytes+len(receipt) > maxPacketSize {
overflow = true
break
}
buffer.Write(receipt)
bytes += len(receipt)
index++
}
buffer.ListEnd(list)
buffer.Flush()
return out.Bytes(), overflow
}
func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error {
return errors.New("block announcements disallowed") // We dropped support for non-merge networks
}
@ -437,14 +527,13 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error {
return err
}
if res.LastBlockIncomplete {
return handlePartialReceipts(peer, res)
from, err := peer.ValidateReceipt(res)
if err != nil {
return err
}
if buf, ok := peer.receiptBuffer[res.RequestId]; ok {
res.List = append(buf, res.List...)
delete(peer.receiptBuffer, res.RequestId)
delete(peer.requestedReceipts, res.RequestId)
if res.LastBlockIncomplete {
peer.HandlePartialReceipts(res.RequestId)
}
// Assign buffers shared between list elements
@ -471,37 +560,12 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error {
id: res.RequestId,
code: ReceiptsMsg,
Res: &enc,
From: from,
Partial: res.LastBlockIncomplete,
}, metadata)
}
func handlePartialReceipts(peer *Peer, res *ReceiptsPacket70) error {
id := res.RequestId
peer.receiptBuffer[id] = append(peer.receiptBuffer[id], res.List...)
last := res.List[len(res.List)-1]
if !validatePartialReceipt(last) {
return fmt.Errorf("Receipts: validation error, should drop the peer")
}
req := &Request{
id: id,
sink: nil,
code: GetReceiptsMsg,
want: ReceiptsMsg,
data: &GetReceiptsPacket{
RequestId: id,
GetReceiptsRequest: peer.requestedReceipts[id][len(res.List)-1:],
},
}
return peer.dispatchRequest(req)
}
// TODO: position?
func validatePartialReceipt(receipt *ReceiptList69) bool {
return true
}
func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them

View file

@ -37,6 +37,8 @@ const (
// network IDs, difficulties, head and genesis blocks.
func (p *Peer) Handshake(networkID uint64, chain forkid.Blockchain, rangeMsg BlockRangeUpdatePacket) error {
switch p.version {
case ETH70:
return p.handshake69(networkID, chain, rangeMsg)
case ETH69:
return p.handshake69(networkID, chain, rangeMsg)
case ETH68:

View file

@ -17,6 +17,7 @@
package eth
import (
"fmt"
"math/rand"
"sync/atomic"
@ -24,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)
@ -41,6 +43,12 @@ const (
maxQueuedTxAnns = 4096
)
type partialReceipt struct {
idx int // position in original request
list *ReceiptList69 // list of partially collected receipts
size uint64 // log size of list
}
// Peer is a collection of relevant information we have about a `eth` peer.
type Peer struct {
id string // Unique ID for the peer, cached
@ -59,8 +67,8 @@ type Peer struct {
reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them
resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them
requestedReceipts map[uint64][]common.Hash
receiptBuffer map[uint64][]*ReceiptList69
requestedReceipts map[uint64][]common.Hash // requestId -> requested receipts map (can be removed if one peer cannot have more than one request in flight)
receiptBuffer map[uint64]*partialReceipt // requestId -> receiptlist map
term chan struct{} // Termination channel to stop the broadcasters
}
@ -81,7 +89,7 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe
resDispatch: make(chan *response),
txpool: txpool,
requestedReceipts: make(map[uint64][]common.Hash),
receiptBuffer: make(map[uint64][]*ReceiptList69),
receiptBuffer: make(map[uint64]*partialReceipt),
term: make(chan struct{}),
}
// Start up all the broadcasters
@ -213,13 +221,22 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
}
// ReplyReceiptsRLP is the response to GetReceipts.
func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsRLPPacket{
func (p *Peer) ReplyReceiptsRLP69(id uint64, receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsRLPPacket69{
RequestId: id,
ReceiptsRLPResponse: receipts,
})
}
// ReplyReceiptsRLP is the response to GetReceipts.
func (p *Peer) ReplyReceiptsRLP70(id uint64, receipts []rlp.RawValue, lastBlockIncomplete bool) error {
return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsRLPPacket70{
RequestId: id,
ReceiptsRLPResponse: receipts,
LastBlockIncomplete: lastBlockIncomplete,
})
}
// RequestOneHeader is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *Peer) RequestOneHeader(hash common.Hash, sink chan *Response) (*Request, error) {
@ -333,7 +350,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
sink: sink,
code: GetReceiptsMsg,
want: ReceiptsMsg,
data: &GetReceiptsPacket{
data: &GetReceiptsPacket69{
RequestId: id,
GetReceiptsRequest: hashes,
},
@ -346,6 +363,100 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
return req, nil
}
// handlePartialReceipts re-request partial receipts
func (p *Peer) HandlePartialReceipts(previousId uint64) error {
split := p.receiptBuffer[previousId].idx
id := rand.Uint64()
req := &Request{
id: id,
sink: nil,
code: GetReceiptsMsg,
want: ReceiptsMsg,
data: &GetReceiptsPacket70{
RequestId: id,
GetReceiptsRequest: p.requestedReceipts[previousId][split:],
FirstBlockReceiptIndex: uint64(len(p.receiptBuffer[previousId].list.items)),
},
reRequest: true,
previous: previousId,
}
return p.dispatchRequest(req)
}
// validateReceipt validate and check completion of partial request
// This function also modifies packet (trim partial response or append previously collected receipts)
func (p *Peer) ValidateReceipt(packet *ReceiptsPacket70) (int, error) {
from := 0
requestId := packet.RequestId
if len(packet.List) == 0 {
return 0, fmt.Errorf("receipt list size 0")
}
// process first block
// : partially collected before and completed by this response
firstReceipt := packet.List[0]
if firstReceipt == nil {
return 0, fmt.Errorf("nil first receipt")
}
if _, ok := p.receiptBuffer[requestId]; !ok {
// complete packet (hash validation will be performed later)
firstReceipt.items = append(p.receiptBuffer[requestId].list.items, firstReceipt.items...)
from = p.receiptBuffer[requestId].idx
delete(p.receiptBuffer, requestId)
}
// process last block
if packet.LastBlockIncomplete {
lastReceipts := packet.List[len(packet.List)-1]
if lastReceipts == nil {
return 0, fmt.Errorf("nil partial receipt")
}
var previousTxs int
var previousLog uint64
if buffer, ok := p.receiptBuffer[requestId]; ok {
previousTxs = len(buffer.list.items)
previousLog = buffer.size
}
// 1. Verify the total number of tx delivered
if uint64(previousTxs+len(lastReceipts.items)) > lastReceipts.items[0].GasUsed/21_000 {
// should be dropped, don't clear the buffer
return 0, fmt.Errorf("total number of tx exceeded limit")
}
// 2. Verify the size of each receipt against the gas limit of the corresponding transaction
for _, rc := range lastReceipts.items {
if uint64(len(rc.Logs)) > params.MaxTxGas/params.LogDataGas {
return 0, fmt.Errorf("total size of receipt exceeded limit")
}
previousLog += uint64(len(rc.Logs))
}
// 3. Verify the total download receipt size is no longer than allowed by the block gas limit
if previousLog > params.MaxGasLimit/params.LogDataGas {
return 0, fmt.Errorf("total download receipt size exceeded the limit")
}
// Update buffer & trim packet
if buffer, ok := p.receiptBuffer[requestId]; ok {
buffer.idx = buffer.idx + len(packet.List) - 1
buffer.list.items = append(buffer.list.items, lastReceipts.items...)
buffer.size = previousLog
} else {
p.receiptBuffer[requestId] = &partialReceipt{
idx: len(packet.List) - 1,
list: lastReceipts,
size: previousLog,
}
}
packet.List = packet.List[:len(packet.List)-1]
}
return from, nil
}
// RequestTxs fetches a batch of transactions from a remote node.
func (p *Peer) RequestTxs(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of transactions", "count", len(hashes))

View file

@ -32,6 +32,7 @@ import (
const (
ETH68 = 68
ETH69 = 69
ETH70 = 70
)
// ProtocolName is the official short name of the `eth` protocol used during
@ -40,11 +41,11 @@ const ProtocolName = "eth"
// ProtocolVersions are the supported versions of the `eth` protocol (first
// is primary).
var ProtocolVersions = []uint{ETH69, ETH68}
var ProtocolVersions = []uint{ETH69, ETH68, ETH70}
// protocolLengths are the number of implemented message corresponding to
// different protocol versions.
var protocolLengths = map[uint]uint64{ETH68: 17, ETH69: 18}
var protocolLengths = map[uint]uint64{ETH68: 17, ETH69: 18, ETH70: 18}
// maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 10 * 1024 * 1024
@ -259,11 +260,18 @@ func (p *BlockBodiesResponse) Unpack() ([][]*types.Transaction, [][]*types.Heade
type GetReceiptsRequest []common.Hash
// GetReceiptsPacket represents a block receipts query with request ID wrapping.
type GetReceiptsPacket struct {
type GetReceiptsPacket69 struct {
RequestId uint64
GetReceiptsRequest
}
// GetReceiptsPacket represents a block receipts query with request ID wrapping.
type GetReceiptsPacket70 struct {
RequestId uint64
GetReceiptsRequest
FirstBlockReceiptIndex uint64
}
// ReceiptsResponse is the network packet for block receipts distribution.
type ReceiptsResponse []types.Receipts
@ -292,11 +300,17 @@ type ReceiptsPacket70 struct {
type ReceiptsRLPResponse []rlp.RawValue
// ReceiptsRLPPacket is ReceiptsRLPResponse with request ID wrapping.
type ReceiptsRLPPacket struct {
type ReceiptsRLPPacket69 struct {
RequestId uint64
ReceiptsRLPResponse
}
type ReceiptsRLPPacket70 struct {
RequestId uint64
ReceiptsRLPResponse
LastBlockIncomplete bool
}
// NewPooledTransactionHashesPacket represents a transaction announcement packet on eth/68 and newer.
type NewPooledTransactionHashesPacket struct {
Types []byte

View file

@ -84,7 +84,7 @@ func TestEmptyMessages(t *testing.T) {
BlockBodiesPacket{1111, nil},
BlockBodiesRLPPacket{1111, nil},
// Receipts
GetReceiptsPacket{1111, nil},
GetReceiptsPacket69{1111, nil},
// Transactions
GetPooledTransactionsPacket{1111, nil},
PooledTransactionsPacket{1111, nil},
@ -97,7 +97,7 @@ func TestEmptyMessages(t *testing.T) {
BlockBodiesPacket{1111, BlockBodiesResponse([]*BlockBody{})},
BlockBodiesRLPPacket{1111, BlockBodiesRLPResponse([]rlp.RawValue{})},
// Receipts
GetReceiptsPacket{1111, GetReceiptsRequest([]common.Hash{})},
GetReceiptsPacket69{1111, GetReceiptsRequest([]common.Hash{})},
ReceiptsPacket[*ReceiptList68]{1111, []*ReceiptList68{}},
ReceiptsPacket[*ReceiptList69]{1111, []*ReceiptList69{}},
// Transactions
@ -227,7 +227,7 @@ func TestMessages(t *testing.T) {
common.FromHex("f902dc820457f902d6f902d3f8d2f867088504a817c8088302e2489435353535353535353535353535353535353535358202008025a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12a064b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10f867098504a817c809830334509435353535353535353535353535353535353535358202d98025a052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afba052f8f61201b2b11a78d6e866abc9c3db2ae8631fa656bfe5cb53668255367afbf901fcf901f9a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000940000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000b90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008208ae820d0582115c8215b3821a0a827788a00000000000000000000000000000000000000000000000000000000000000000880000000000000000"),
},
{
GetReceiptsPacket{1111, GetReceiptsRequest(hashes)},
GetReceiptsPacket69{1111, GetReceiptsRequest(hashes)},
common.FromHex("f847820457f842a000000000000000000000000000000000000000000000000000000000deadc0dea000000000000000000000000000000000000000000000000000000000feedbeef"),
},
{
@ -236,7 +236,7 @@ func TestMessages(t *testing.T) {
},
{
// Identical to the eth/68 encoding above.
ReceiptsRLPPacket{1111, ReceiptsRLPResponse([]rlp.RawValue{receiptsRlp})},
ReceiptsRLPPacket69{1111, ReceiptsRLPResponse([]rlp.RawValue{receiptsRlp})},
common.FromHex("f902e6820457f902e0f902ddf901688082014db9010000000000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000014000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000004000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000f85ff85d940000000000000000000000000000000000000011f842a0000000000000000000000000000000000000000000000000000000000000deada0000000000000000000000000000000000000000000000000000000000000beef830100ffb9016f01f9016b018201bcb9010000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000800000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000040000000001000000000000000000000000000000000000000000000000040000000000000000000000000004000000000000000000000000000000000000000000000000000000008000400000000000000000000000000000000000000000000000000000000000000000000000000000040f862f860940000000000000000000000000000000000000022f842a00000000000000000000000000000000000000000000000000000000000005668a0000000000000000000000000000000000000000000000000000000000000977386020f0f0f0608"),
},
{

View file

@ -9,5 +9,5 @@ import (
type Test struct {
A eth1.MinerAPI
B eth2.GetReceiptsPacket
B eth2.GetReceiptsPacket69
}

View file

@ -41,7 +41,7 @@ func (obj *Test) DecodeRLP(dec *rlp.Stream) error {
}
_tmp0.A = _tmp1
// B:
var _tmp2 eth1.GetReceiptsPacket
var _tmp2 eth1.GetReceiptsPacket69
{
if _, err := dec.List(); err != nil {
return err