From a62ec06a1dd3bda4e336d2a952b3df163b05272a Mon Sep 17 00:00:00 2001 From: healthykim Date: Sun, 23 Nov 2025 17:59:03 -0500 Subject: [PATCH] fix: remove partial sink --- eth/downloader/fetchers_concurrent.go | 21 +- .../fetchers_concurrent_receipts.go | 2 +- eth/downloader/queue.go | 26 +-- eth/downloader/queue_test.go | 183 ++---------------- eth/protocols/eth/dispatcher.go | 27 +-- eth/protocols/eth/handlers.go | 11 +- eth/protocols/eth/peer.go | 109 +++++------ eth/protocols/eth/peer_test.go | 15 +- 8 files changed, 94 insertions(+), 300 deletions(-) diff --git a/eth/downloader/fetchers_concurrent.go b/eth/downloader/fetchers_concurrent.go index 0662200aef..9d8cd114c1 100644 --- a/eth/downloader/fetchers_concurrent.go +++ b/eth/downloader/fetchers_concurrent.go @@ -307,15 +307,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error { // reschedule the timeout timer. index, live := ordering[res.Req] if live { - req := timeouts.Remove(index) - delete(ordering, res.Req) - - if res.Partial { - ttl := d.peers.rates.TargetTimeout() - ordering[req] = timeouts.Size() - timeouts.Push(req, -time.Now().Add(ttl).UnixNano()) - } - + timeouts.Remove(index) if index == 0 { if !timeout.Stop() { <-timeout.C @@ -325,17 +317,16 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error { timeout.Reset(time.Until(time.Unix(0, -exp))) } } + delete(ordering, res.Req) } - if !res.Partial { - // Delete the pending request (if it still exists) and mark the peer idle - delete(pending, res.Req.Peer) - delete(stales, res.Req.Peer) + // Delete the pending request (if it still exists) and mark the peer idle + delete(pending, res.Req.Peer) + delete(stales, res.Req.Peer) - res.Req.Close() - } // Signal the dispatcher that the round trip is done. We'll drop the // peer if the data turns out to be junk. res.Done <- nil + res.Req.Close() // If the peer was previously banned and failed to deliver its pack // in a reasonable time frame, ignore its message. diff --git a/eth/downloader/fetchers_concurrent_receipts.go b/eth/downloader/fetchers_concurrent_receipts.go index 42ee5cf8ce..dbea30e881 100644 --- a/eth/downloader/fetchers_concurrent_receipts.go +++ b/eth/downloader/fetchers_concurrent_receipts.go @@ -91,7 +91,7 @@ func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int, receipts := *packet.Res.(*eth.ReceiptsRLPResponse) hashes := packet.Meta.([]common.Hash) // {receipt hashes} - accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes, packet.Partial, packet.From) + accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes) switch { case err == nil && len(receipts) == 0: peer.log.Trace("Requested receipts delivered") diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 4d8810b28b..9fe169d5f7 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -629,13 +629,13 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH result.SetBodyDone() } return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, - bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct, false, 0) + bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct) } // DeliverReceipts injects a receipt retrieval response into the results queue. // The method returns the number of transaction receipts accepted from the delivery // and also wakes any threads waiting for data delivery. -func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash, incomplete bool, from int) (int, error) { +func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -650,7 +650,7 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptLi result.SetReceiptsDone() } return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, - receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct, incomplete, from) + receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct) } // deliver injects a data retrieval response into the results queue. @@ -662,16 +662,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest, reqTimer *metrics.Timer, resInMeter, resDropMeter *metrics.Meter, results int, validate func(index int, header *types.Header) error, - reconstruct func(index int, result *fetchResult), incomplete bool, from int) (int, error) { + reconstruct func(index int, result *fetchResult)) (int, error) { // Short circuit if the data was never requested request := pendPool[id] if request == nil { resDropMeter.Mark(int64(results)) return 0, errNoFetchesPending } - if !incomplete { - delete(pendPool, id) - } + delete(pendPool, id) reqTimer.UpdateSince(request.Time) resInMeter.Mark(int64(results)) @@ -689,7 +687,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, i int hashes []common.Hash ) - for _, header := range request.Headers[from:] { + for _, header := range request.Headers { // Short circuit assembly if no more fetch results are found if i >= results { break @@ -703,7 +701,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, i++ } - for _, header := range request.Headers[from : from+i] { + for _, header := range request.Headers[:i] { if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale { reconstruct(accepted, res) } else { @@ -720,14 +718,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, resDropMeter.Mark(int64(results - accepted)) // Return all failed or missing fetches to the queue - if incomplete { - for _, header := range request.Headers[from+accepted : from+results] { - taskQueue.Push(header, -int64(header.Number.Uint64())) - } - } else { - for _, header := range request.Headers[from+accepted:] { - taskQueue.Push(header, -int64(header.Number.Uint64())) - } + for _, header := range request.Headers[accepted:] { + taskQueue.Push(header, -int64(header.Number.Uint64())) } // Wake up Results if accepted > 0 { diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go index 0e973bc84c..ca71a769de 100644 --- a/eth/downloader/queue_test.go +++ b/eth/downloader/queue_test.go @@ -32,45 +32,32 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" ) -type blockConfig struct { - txPeriod int - txCount int -} - -var emptyBlock = blockConfig{txPeriod: 0, txCount: 0} -var defaultBlock = blockConfig{txPeriod: 2, txCount: 1} - // makeChain creates a chain of n blocks starting at and including parent. // The returned hash chain is ordered head->parent. // If empty is false, every second block (i%2==0) contains one transaction. -// If config.txCount > 0, every config.txPeriod-th block contains config.txCount transactions. // No uncles are added. -func makeChain(n int, seed byte, parent *types.Block, config blockConfig) ([]*types.Block, []types.Receipts) { +func makeChain(n int, seed byte, parent *types.Block, empty bool) ([]*types.Block, []types.Receipts) { blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) { block.SetCoinbase(common.Address{seed}) - // Add transactions according to config - if config.txCount > 0 && i%config.txPeriod == 0 { - for range config.txCount { - signer := types.MakeSigner(params.TestChainConfig, block.Number(), block.Timestamp()) - tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey) - if err != nil { - panic(err) - } - block.AddTx(tx) + // Add one tx to every second block + if !empty && i%2 == 0 { + signer := types.MakeSigner(params.TestChainConfig, block.Number(), block.Timestamp()) + tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey) + if err != nil { + panic(err) } + block.AddTx(tx) } }) return blocks, receipts } type chainData struct { - blocks []*types.Block - receipts []types.Receipts - offset int + blocks []*types.Block + offset int } var chain *chainData @@ -79,11 +66,11 @@ var emptyChain *chainData func init() { // Create a chain of blocks to import targetBlocks := 128 - blocks, receipts := makeChain(targetBlocks, 0, testGenesis, defaultBlock) - chain = &chainData{blocks, receipts, 0} + blocks, _ := makeChain(targetBlocks, 0, testGenesis, false) + chain = &chainData{blocks, 0} - blocks, receipts = makeChain(targetBlocks, 0, testGenesis, emptyBlock) - emptyChain = &chainData{blocks, receipts, 0} + blocks, _ = makeChain(targetBlocks, 0, testGenesis, true) + emptyChain = &chainData{blocks, 0} } func (chain *chainData) headers() []*types.Header { @@ -274,149 +261,13 @@ func TestEmptyBlocks(t *testing.T) { } } -// TestPartialReceiptDelivery checks two points: -// 1. Receipts that fail validation should be re-requested from other peers. -// 2. Partial delivery should not expire. -func TestPartialReceiptDelivery(t *testing.T) { - blocks, receipts := makeChain(64, 0, testGenesis, blockConfig{txPeriod: 1, txCount: 5}) - chain := chainData{blocks: blocks, receipts: receipts, offset: 0} - - numBlock := len(chain.blocks) - - q := newQueue(10, 10) - if !q.Idle() { - t.Errorf("new queue should be idle") - } - q.Prepare(1, SnapSync) - if res := q.Results(false); len(res) != 0 { - t.Fatal("new queue should have 0 results") - } - - // Schedule a batch of headers - headers := chain.headers() - hashes := make([]common.Hash, len(headers)) - for i, header := range headers { - hashes[i] = header.Hash() - } - q.Schedule(headers, hashes, 1) - - peer := dummyPeer("peer-1") - req, _, _ := q.ReserveReceipts(peer, numBlock) - - t.Logf("request: length %d", len(req.Headers)) - - // 1. Deliver a partial receipt: this must not clear the remaining receipts from the pending list - firstCutoff := len(req.Headers) / 3 - receiptRLP, rcHashes := getPartialReceiptsDelivery(0, firstCutoff, receipts) - accepted, err := q.DeliverReceipts(peer.id, receiptRLP, rcHashes, true, 0) - if err != nil || accepted != firstCutoff { - t.Fatalf("delivery failed: err %v, accepted %d\n", err, accepted) - } - - if pending := q.PendingReceipts(); pending != numBlock-len(req.Headers) { - t.Fatalf("wrong pending receipt length, got %d, exp %d", pending, numBlock-len(req.Headers)) - } - for i := range firstCutoff { - headerNumber := req.Headers[i].Number.Uint64() - res, _, _, _, err := q.resultCache.getFetchResult(headerNumber) - if err != nil { - t.Fatalf("fetch result get failed: err %v", err) - } - if res == nil { - t.Fatalf("fetch result is nil: header number %d", headerNumber) - } - if !res.Done(receiptType) { - t.Fatalf("wrong result, block %d receipt not done", headerNumber) - } - } - if flight := q.InFlightReceipts(); !flight { - t.Fatalf("there should be in flight receipts") - } - - // 2. Deliver a partial receipt containing an invalid entry: the invalid receipt should be removed from the pending list - secondCutoff := firstCutoff + len(req.Headers)/3 - receiptRLP, rcHashes = getPartialReceiptsDelivery(firstCutoff, secondCutoff, receipts) - // one invalid receipt - rcHashes[len(rcHashes)-1] = common.Hash{} - accepted, err = q.DeliverReceipts(peer.id, receiptRLP, rcHashes, true, firstCutoff) - if accepted != len(rcHashes)-1 { - t.Fatalf("wrong accepted, got %d, exp %d", accepted, len(rcHashes)-1) - } - if err == nil { - t.Fatalf("delivery should fail") - } - - // The invalid receipt should be returned to the pending pool - if pending := q.PendingReceipts(); pending != numBlock-len(req.Headers)+1 { - t.Fatalf("wrong pending receipt length, got %d, exp %d", pending, numBlock-len(req.Headers)) - } - for i := range len(rcHashes) - 1 { - headerNumber := req.Headers[firstCutoff+i].Number.Uint64() - res, _, _, _, err := q.resultCache.getFetchResult(headerNumber) - if err != nil { - t.Fatalf("fetch result get failed: err %v", err) - } - if res == nil { - t.Fatalf("fetch result is nil: header number %d", headerNumber) - } - if !res.Done(receiptType) { - t.Fatalf("wrong result, block %d receipt not done", headerNumber) - } - } - - // 3. Deliver the remaining receipts to complete the request - thirdCutoff := len(req.Headers) - receiptRLP, rcHashes = getPartialReceiptsDelivery(secondCutoff, thirdCutoff, receipts) - accepted, err = q.DeliverReceipts(peer.id, receiptRLP, rcHashes, false, secondCutoff) - if accepted != len(rcHashes) { - t.Fatalf("wrong accepted, got %d, exp %d", accepted, len(rcHashes)-1) - } - if err != nil || accepted != thirdCutoff-secondCutoff { - t.Fatalf("delivery failed: err %v, accepted %d\n", err, accepted) - } - - for i := range len(rcHashes) { - headerNumber := req.Headers[secondCutoff+i].Number.Uint64() - res, _, _, _, err := q.resultCache.getFetchResult(headerNumber) - if err != nil { - t.Fatalf("fetch result get failed: err %v", err) - } - if res == nil { - t.Fatalf("fetch result is nil: header number %d", headerNumber) - } - if !res.Done(receiptType) { - t.Fatalf("wrong result, block %d receipt not done", headerNumber) - } - } - if q.InFlightReceipts() { - t.Fatal("there shouldn't be any remaning in-flight receipts") - } -} - -func getPartialReceiptsDelivery(from int, to int, receipts []types.Receipts) ([]rlp.RawValue, []common.Hash) { - if from < 0 { - from = 0 - } - if to > len(receipts) { - to = len(receipts) - } - - hasher := trie.NewStackTrie(nil) - rcHashes := make([]common.Hash, to-from) - for i, rc := range receipts[from:to] { - rcHashes[i] = types.DeriveSha(rc, hasher) - } - - return types.EncodeBlockReceiptLists(receipts[from:to]), rcHashes -} - // XTestDelivery does some more extensive testing of events that happen, // blocks that become known and peers that make reservations and deliveries. // disabled since it's not really a unit-test, but can be executed to test // some more advanced scenarios func XTestDelivery(t *testing.T) { // the outside network, holding blocks - blo, rec := makeChain(128, 0, testGenesis, defaultBlock) + blo, rec := makeChain(128, 0, testGenesis, false) world := newNetwork() world.receipts = rec world.chain = blo @@ -517,7 +368,7 @@ func XTestDelivery(t *testing.T) { for i, receipt := range rcs { hashes[i] = types.DeriveSha(receipt, hasher) } - _, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes, false, 0) + _, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes) if err != nil { fmt.Printf("delivered %d receipts %v\n", len(rcs), err) } @@ -593,7 +444,7 @@ func (n *network) progress(numBlocks int) { n.lock.Lock() defer n.lock.Unlock() //fmt.Printf("progressing...\n") - newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], emptyBlock) + newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], false) n.chain = append(n.chain, newBlocks...) n.receipts = append(n.receipts, newR...) n.cond.Broadcast() diff --git a/eth/protocols/eth/dispatcher.go b/eth/protocols/eth/dispatcher.go index 76546cdf67..8cc41e0904 100644 --- a/eth/protocols/eth/dispatcher.go +++ b/eth/protocols/eth/dispatcher.go @@ -54,8 +54,7 @@ type Request struct { Peer string // Demultiplexer if cross-peer requests are batched together Sent time.Time // Timestamp when the request was sent - reRequest bool - previous uint64 // id of previous index (to find sink) + continued bool } // Close aborts an in-flight request. Although there's no way to notify the @@ -108,9 +107,6 @@ type Response struct { Meta interface{} // Metadata generated locally on the receiver thread Time time.Duration // Time it took for the request to be served Done chan error // Channel to signal message handling to the reader - - From int - Partial bool } // response is a wrapper around a remote Response that has an error channel to @@ -207,17 +203,10 @@ func (p *Peer) dispatcher() { reqOp.fail <- err if err == nil { - // reuse sink if it is re-request - if req.reRequest { - if _, ok := pending[req.previous]; ok { - req.sink = pending[req.previous].sink - } else { - reqOp.fail <- fmt.Errorf("Cannot find previous request index") - continue - } - delete(pending, req.previous) + // do not overwrite if it is re-request + if _, ok := pending[req.id]; !ok { + pending[req.id] = req } - pending[req.id] = req } case cancelOp := <-p.reqCancel: @@ -231,7 +220,7 @@ func (p *Peer) dispatcher() { // Stop tracking the request delete(pending, cancelOp.id) - // Not sure if the request is about the receipt, but removing it anyway + // Not sure if the request is about the receipt, but remove it anyway delete(p.receiptBuffer, cancelOp.id) delete(p.requestedReceipts, cancelOp.id) @@ -264,12 +253,6 @@ func (p *Peer) dispatcher() { // it can wait for a handler response and dispatch the data. res.Time = res.recv.Sub(res.Req.Sent) resOp.fail <- nil - - // Stop tracking the request, the response dispatcher will deliver - // For partial response, pending should be removed after re-request - if res.Partial { - delete(pending, res.id) - } } case <-p.term: diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 55423be64e..1393c63572 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -527,16 +527,14 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error { return err } - from, err := peer.ReconstructReceiptsPacket(res) - if err != nil { + if err := peer.BufferReceiptsPacket(res); err != nil { return err } - if res.LastBlockIncomplete { - err := peer.RequestPartialReceipts(res.RequestId) - if err != nil { + if err := peer.RequestPartialReceipts(res.RequestId); err != nil { return err } + return nil } // Assign buffers shared between list elements @@ -563,9 +561,6 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error { id: res.RequestId, code: ReceiptsMsg, Res: &enc, - - From: from, - Partial: res.LastBlockIncomplete, }, metadata) } diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 4bb36511d8..324089f106 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -44,9 +44,8 @@ const ( ) type partialReceipt struct { - idx int // position in original request - list *ReceiptList69 // list of partially collected receipts - size uint64 // log size of list + list []*ReceiptList69 // list of partially collected receipts + lastLogSize uint64 // log size of last receipt list } // Peer is a collection of relevant information we have about a `eth` peer. @@ -379,9 +378,13 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ } // HandlePartialReceipts re-request partial receipts -func (p *Peer) RequestPartialReceipts(previousId uint64) error { - split := p.receiptBuffer[previousId].idx - id := rand.Uint64() +func (p *Peer) RequestPartialReceipts(id uint64) error { + if _, ok := p.receiptBuffer[id]; !ok { + return fmt.Errorf("No partial receipt retreival in progress with id %d", id) + } + + lastBlock := len(p.receiptBuffer[id].list) - 1 + lastReceipt := len(p.receiptBuffer[id].list[lastBlock].items) req := &Request{ id: id, @@ -390,90 +393,76 @@ func (p *Peer) RequestPartialReceipts(previousId uint64) error { want: ReceiptsMsg, data: &GetReceiptsPacket70{ RequestId: id, - GetReceiptsRequest: p.requestedReceipts[previousId][split:], - FirstBlockReceiptIndex: uint64(len(p.receiptBuffer[previousId].list.items)), + GetReceiptsRequest: p.requestedReceipts[id][lastBlock:], + FirstBlockReceiptIndex: uint64(lastReceipt), }, - reRequest: true, - previous: previousId, + continued: true, } - p.receiptBuffer[id] = p.receiptBuffer[previousId] - p.requestedReceipts[id] = p.requestedReceipts[previousId] - - delete(p.receiptBuffer, previousId) - delete(p.requestedReceipts, previousId) - return p.dispatchRequest(req) } -// ReconstructReceiptsPacket validates a receipt packet and checks whether a partial request is complete. -// It also mutates the packet in place, trimming the partial response or appending previously collected receipts. -func (p *Peer) ReconstructReceiptsPacket(packet *ReceiptsPacket70) (int, error) { - from := 0 +// BufferReceiptsPacket validates a receipt packet and buffer the incomplete packet. +// If the request is completed, it appends previously collected receipts. +func (p *Peer) BufferReceiptsPacket(packet *ReceiptsPacket70) error { requestId := packet.RequestId if len(packet.List) == 0 { - return 0, nil + return nil } - // Process the first block - // If the request was partially collected earlier, append the buffered data so this response completes it. - firstReceipt := packet.List[0] - if firstReceipt == nil { - return 0, fmt.Errorf("nil first receipt") - } - if _, ok := p.receiptBuffer[requestId]; ok { - // complete packet (hash validation will be performed later) - firstReceipt.items = append(p.receiptBuffer[requestId].list.items, firstReceipt.items...) - from = p.receiptBuffer[requestId].idx - delete(p.receiptBuffer, requestId) - if !packet.LastBlockIncomplete { - delete(p.requestedReceipts, requestId) - } - } - - // Trim and buffer the last block when the response is incomplete. + // Buffer the last block when the response is incomplete. if packet.LastBlockIncomplete { - lastReceipts := packet.List[len(packet.List)-1] - - logSize, err := p.validateLastBlockReceipt(lastReceipts, packet.RequestId) + logSize, err := p.validateLastBlockReceipt(packet.List, requestId) if err != nil { - return 0, err + return err } // Update the buffered data and trim the packet to exclude the incomplete block. if buffer, ok := p.receiptBuffer[requestId]; ok { - buffer.idx = buffer.idx + len(packet.List) - 1 - buffer.list.items = append(buffer.list.items, lastReceipts.items...) - buffer.size = buffer.size + logSize + buffer.list = append(buffer.list, packet.List...) + buffer.lastLogSize = logSize } else { p.receiptBuffer[requestId] = &partialReceipt{ - idx: len(packet.List) - 1, - list: lastReceipts, - size: logSize, + list: packet.List, + lastLogSize: logSize, } } - packet.List = packet.List[:len(packet.List)-1] + return nil } - return from, nil + // If the request is completed, append previously collected receipts + // to the packet and remove the buffered receipts. + if buffer, ok := p.receiptBuffer[requestId]; ok { + packet.List = append(buffer.list, packet.List...) + delete(p.receiptBuffer, requestId) + delete(p.requestedReceipts, requestId) + } + + return nil } -// validateLastBlockReceipt validates receipts and return log size of last block receipt -func (p *Peer) validateLastBlockReceipt(lastReceipts *ReceiptList69, id uint64) (uint64, error) { - if lastReceipts == nil { - return 0, fmt.Errorf("nil partial receipt") - } +// validateLastBlockReceipt validates receipts and return log size of last block receipt. +// This function is called only when the `lastBlockincomplete == true`. +// Note that the last receipt response (which completes receiptLists of a pending block) is not verified here. +// Those response doesn't need hueristics below since they can be verified by its trie root. +func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList69, id uint64) (uint64, error) { + lastReceipts := receiptLists[len(receiptLists)-1] + // If the receipt is in the middle of retreival, use the buffered data. + // e.g. [[receipt1], [receipt1, receipt2], incomplete = true] + // [[receipt3, receipt4], incomplete = true] <<-- + // [[receipt5], [receipt1], incomplete = false] + // This case happens only if len(receiptLists) == 1 && incomplete == true && buffered before. var previousTxs int var previousLog uint64 var log uint64 - if buffer, ok := p.receiptBuffer[id]; ok { - previousTxs = len(buffer.list.items) - previousLog = buffer.size + if buffer, ok := p.receiptBuffer[id]; ok && len(receiptLists) == 1 { + previousTxs = len(buffer.list[len(buffer.list)-1].items) + previousLog = buffer.lastLogSize } // 1. Verify that the total number of transactions delivered is under the limit. - if uint64(previousTxs+len(lastReceipts.items)) > lastReceipts.items[0].GasUsed/21_000 { + if uint64(previousTxs+len(lastReceipts.items)) > params.MaxGasLimit/21_000 { // should be dropped, don't clear the buffer return 0, fmt.Errorf("total number of tx exceeded limit") } @@ -490,7 +479,7 @@ func (p *Peer) validateLastBlockReceipt(lastReceipts *ReceiptList69, id uint64) return 0, fmt.Errorf("total download receipt size exceeded the limit") } - return log, nil + return previousLog + log, nil } // RequestTxs fetches a batch of transactions from a remote node. diff --git a/eth/protocols/eth/peer_test.go b/eth/protocols/eth/peer_test.go index b3ca35a343..9e259585aa 100644 --- a/eth/protocols/eth/peer_test.go +++ b/eth/protocols/eth/peer_test.go @@ -154,7 +154,7 @@ func TestPartialReceipt(t *testing.T) { }, }, } - if _, err := peer.ReconstructReceiptsPacket(delivery); err != nil { + if err := peer.BufferReceiptsPacket(delivery); err != nil { t.Fatalf("first ReconstructReceiptsPacket failed: %v", err) } @@ -169,19 +169,12 @@ func TestPartialReceipt(t *testing.T) { t.Fatalf("timeout waiting for re-request packet") } - if _, ok := peer.receiptBuffer[req.id]; ok { - t.Fatalf("receiptBuffer has stale request id") - } - if _, ok := peer.requestedReceipts[req.id]; ok { - t.Fatalf("requestedReceipts has stale request id") - } - buffer, ok := peer.receiptBuffer[rereq.RequestId] if !ok { t.Fatalf("receiptBuffer should buffer incomplete receipts") } - if rereq.FirstBlockReceiptIndex != uint64(len(buffer.list.items)) { - t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, len(buffer.list.items)) + if rereq.FirstBlockReceiptIndex != uint64(len(buffer.list[len(buffer.list)-1].items)) { + t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, len(buffer.list[len(buffer.list)-1].items)) } if _, ok := peer.requestedReceipts[rereq.RequestId]; !ok { t.Fatalf("requestedReceipts should buffer receipt hashes") @@ -208,7 +201,7 @@ func TestPartialReceipt(t *testing.T) { }, }, } - if _, err := peer.ReconstructReceiptsPacket(delivery); err != nil { + if err := peer.BufferReceiptsPacket(delivery); err != nil { t.Fatalf("second ReconstructReceiptsPacket failed: %v", err) } if _, ok := peer.receiptBuffer[rereq.RequestId]; ok {