From a94f0a8ac87ec56aaf00f29ef282f1c2d56c1cc6 Mon Sep 17 00:00:00 2001 From: healthykim Date: Fri, 17 Apr 2026 11:10:37 +0200 Subject: [PATCH] eth/downloader: handle skipped response --- eth/downloader/downloader.go | 1 + eth/downloader/queue.go | 62 +++++++++++++++++++++--------------- 2 files changed, 38 insertions(+), 25 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 1de0933842..427be8fb04 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -65,6 +65,7 @@ var ( errInvalidChain = errors.New("retrieved hash chain is invalid") errInvalidBody = errors.New("retrieved block body is invalid") errInvalidReceipt = errors.New("retrieved receipt is invalid") + errSkippedResponse = errors.New("retrieved response is skipped by peer") errCancelStateFetch = errors.New("state data download canceled (requested)") errCancelContentProcessing = errors.New("content processing canceled (requested)") errCanceled = errors.New("syncing canceled (requested)") diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index c0cb9b174a..4bd4fdac8a 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -562,11 +562,15 @@ func (q *queue) DeliverBodies(id string, hashes eth.BlockBodyHashes, bodies []et q.lock.Lock() defer q.lock.Unlock() - var txLists [][]*types.Transaction - var uncleLists [][]*types.Header - var withdrawalLists [][]*types.Withdrawal + txLists := make([][]*types.Transaction, len(bodies)) + uncleLists := make([][]*types.Header, len(bodies)) + withdrawalLists := make([][]*types.Withdrawal, len(bodies)) validate := func(index int, header *types.Header) error { + // Detect skipped response: the peer returned an empty body for a non-empty block + if !header.EmptyBody() && hashes.TransactionRoots[index] == types.EmptyTxsHash && hashes.UncleHashes[index] == types.EmptyUncleHash { + return errSkippedResponse + } if hashes.TransactionRoots[index] != header.TxHash { return errInvalidBody } @@ -592,20 +596,18 @@ func (q *queue) DeliverBodies(id string, hashes eth.BlockBodyHashes, bodies []et if err != nil { return fmt.Errorf("%w: bad transactions: %v", errInvalidBody, err) } - txLists = append(txLists, txs) + txLists[index] = txs uncles, err := bodies[index].Uncles.Items() if err != nil { return fmt.Errorf("%w: bad uncles: %v", errInvalidBody, err) } - uncleLists = append(uncleLists, uncles) + uncleLists[index] = uncles if bodies[index].Withdrawals != nil { withdrawals, err := bodies[index].Withdrawals.Items() if err != nil { return fmt.Errorf("%w: bad withdrawals: %v", errInvalidBody, err) } - withdrawalLists = append(withdrawalLists, withdrawals) - } else { - withdrawalLists = append(withdrawalLists, nil) + withdrawalLists[index] = withdrawals } return nil } @@ -616,9 +618,8 @@ func (q *queue) DeliverBodies(id string, hashes eth.BlockBodyHashes, bodies []et result.Withdrawals = withdrawalLists[index] result.SetBodyDone() } - nresults := len(hashes.TransactionRoots) return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, - bodyReqTimer, bodyInMeter, bodyDropMeter, nresults, validate, reconstruct) + bodyReqTimer, bodyInMeter, bodyDropMeter, len(bodies), validate, reconstruct) } // DeliverReceipts injects a receipt retrieval response into the results queue. @@ -629,6 +630,9 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptLi defer q.lock.Unlock() validate := func(index int, header *types.Header) error { + if header.ReceiptHash != types.EmptyReceiptsHash && receiptListHashes[index] == types.EmptyReceiptsHash { + return errSkippedResponse + } if receiptListHashes[index] != header.ReceiptHash { return errInvalidReceipt } @@ -671,28 +675,40 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, } // Assemble each of the results with their headers and retrieved data parts var ( + valid []int accepted int failure error - i int - hashes []common.Hash + cutoff = len(request.Headers) ) - for _, header := range request.Headers { + for i, header := range request.Headers { // Short circuit assembly if no more fetch results are found if i >= results { - break + taskQueue.Push(header, -int64(header.Number.Uint64())) + continue } // Validate the fields if err := validate(i, header); err != nil { - failure = err - break + if err != errSkippedResponse { + failure = err + cutoff = i + break + } + // Return skipped hashes to the queue + request.Peer.MarkLacking(header.Hash()) + taskQueue.Push(header, -int64(header.Number.Uint64())) + continue } - hashes = append(hashes, header.Hash()) - i++ + valid = append(valid, i) + } + // Return headers after the validation failure point to the queue + for _, header := range request.Headers[cutoff:] { + taskQueue.Push(header, -int64(header.Number.Uint64())) } - for _, header := range request.Headers[:i] { + for _, index := range valid { + header := request.Headers[index] if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale { - reconstruct(accepted, res) + reconstruct(index, res) } else { // else: between here and above, some other peer filled this result, // or it was indeed a no-op. This should not happen, but if it does it's @@ -701,15 +717,11 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, failure = errStaleDelivery } // Clean up a successful fetch - delete(taskPool, hashes[accepted]) + delete(taskPool, header.Hash()) accepted++ } resDropMeter.Mark(int64(results - accepted)) - // Return all failed or missing fetches to the queue - for _, header := range request.Headers[accepted:] { - taskQueue.Push(header, -int64(header.Number.Uint64())) - } // Wake up Results if accepted > 0 { q.active.Signal()