eth/downloader: handle skipped response

This commit is contained in:
healthykim 2026-04-17 11:10:37 +02:00
parent 92cef0a740
commit a94f0a8ac8
2 changed files with 38 additions and 25 deletions

View file

@ -65,6 +65,7 @@ var (
errInvalidChain = errors.New("retrieved hash chain is invalid") errInvalidChain = errors.New("retrieved hash chain is invalid")
errInvalidBody = errors.New("retrieved block body is invalid") errInvalidBody = errors.New("retrieved block body is invalid")
errInvalidReceipt = errors.New("retrieved receipt is invalid") errInvalidReceipt = errors.New("retrieved receipt is invalid")
errSkippedResponse = errors.New("retrieved response is skipped by peer")
errCancelStateFetch = errors.New("state data download canceled (requested)") errCancelStateFetch = errors.New("state data download canceled (requested)")
errCancelContentProcessing = errors.New("content processing canceled (requested)") errCancelContentProcessing = errors.New("content processing canceled (requested)")
errCanceled = errors.New("syncing canceled (requested)") errCanceled = errors.New("syncing canceled (requested)")

View file

@ -562,11 +562,15 @@ func (q *queue) DeliverBodies(id string, hashes eth.BlockBodyHashes, bodies []et
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
var txLists [][]*types.Transaction txLists := make([][]*types.Transaction, len(bodies))
var uncleLists [][]*types.Header uncleLists := make([][]*types.Header, len(bodies))
var withdrawalLists [][]*types.Withdrawal withdrawalLists := make([][]*types.Withdrawal, len(bodies))
validate := func(index int, header *types.Header) error { validate := func(index int, header *types.Header) error {
// Detect skipped response: the peer returned an empty body for a non-empty block
if !header.EmptyBody() && hashes.TransactionRoots[index] == types.EmptyTxsHash && hashes.UncleHashes[index] == types.EmptyUncleHash {
return errSkippedResponse
}
if hashes.TransactionRoots[index] != header.TxHash { if hashes.TransactionRoots[index] != header.TxHash {
return errInvalidBody return errInvalidBody
} }
@ -592,20 +596,18 @@ func (q *queue) DeliverBodies(id string, hashes eth.BlockBodyHashes, bodies []et
if err != nil { if err != nil {
return fmt.Errorf("%w: bad transactions: %v", errInvalidBody, err) return fmt.Errorf("%w: bad transactions: %v", errInvalidBody, err)
} }
txLists = append(txLists, txs) txLists[index] = txs
uncles, err := bodies[index].Uncles.Items() uncles, err := bodies[index].Uncles.Items()
if err != nil { if err != nil {
return fmt.Errorf("%w: bad uncles: %v", errInvalidBody, err) return fmt.Errorf("%w: bad uncles: %v", errInvalidBody, err)
} }
uncleLists = append(uncleLists, uncles) uncleLists[index] = uncles
if bodies[index].Withdrawals != nil { if bodies[index].Withdrawals != nil {
withdrawals, err := bodies[index].Withdrawals.Items() withdrawals, err := bodies[index].Withdrawals.Items()
if err != nil { if err != nil {
return fmt.Errorf("%w: bad withdrawals: %v", errInvalidBody, err) return fmt.Errorf("%w: bad withdrawals: %v", errInvalidBody, err)
} }
withdrawalLists = append(withdrawalLists, withdrawals) withdrawalLists[index] = withdrawals
} else {
withdrawalLists = append(withdrawalLists, nil)
} }
return nil return nil
} }
@ -616,9 +618,8 @@ func (q *queue) DeliverBodies(id string, hashes eth.BlockBodyHashes, bodies []et
result.Withdrawals = withdrawalLists[index] result.Withdrawals = withdrawalLists[index]
result.SetBodyDone() result.SetBodyDone()
} }
nresults := len(hashes.TransactionRoots)
return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool,
bodyReqTimer, bodyInMeter, bodyDropMeter, nresults, validate, reconstruct) bodyReqTimer, bodyInMeter, bodyDropMeter, len(bodies), validate, reconstruct)
} }
// DeliverReceipts injects a receipt retrieval response into the results queue. // DeliverReceipts injects a receipt retrieval response into the results queue.
@ -629,6 +630,9 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptLi
defer q.lock.Unlock() defer q.lock.Unlock()
validate := func(index int, header *types.Header) error { validate := func(index int, header *types.Header) error {
if header.ReceiptHash != types.EmptyReceiptsHash && receiptListHashes[index] == types.EmptyReceiptsHash {
return errSkippedResponse
}
if receiptListHashes[index] != header.ReceiptHash { if receiptListHashes[index] != header.ReceiptHash {
return errInvalidReceipt return errInvalidReceipt
} }
@ -671,28 +675,40 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
} }
// Assemble each of the results with their headers and retrieved data parts // Assemble each of the results with their headers and retrieved data parts
var ( var (
valid []int
accepted int accepted int
failure error failure error
i int cutoff = len(request.Headers)
hashes []common.Hash
) )
for _, header := range request.Headers { for i, header := range request.Headers {
// Short circuit assembly if no more fetch results are found // Short circuit assembly if no more fetch results are found
if i >= results { if i >= results {
break taskQueue.Push(header, -int64(header.Number.Uint64()))
continue
} }
// Validate the fields // Validate the fields
if err := validate(i, header); err != nil { if err := validate(i, header); err != nil {
failure = err if err != errSkippedResponse {
break failure = err
cutoff = i
break
}
// Return skipped hashes to the queue
request.Peer.MarkLacking(header.Hash())
taskQueue.Push(header, -int64(header.Number.Uint64()))
continue
} }
hashes = append(hashes, header.Hash()) valid = append(valid, i)
i++ }
// Return headers after the validation failure point to the queue
for _, header := range request.Headers[cutoff:] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
} }
for _, header := range request.Headers[:i] { for _, index := range valid {
header := request.Headers[index]
if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale { if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale {
reconstruct(accepted, res) reconstruct(index, res)
} else { } else {
// else: between here and above, some other peer filled this result, // else: between here and above, some other peer filled this result,
// or it was indeed a no-op. This should not happen, but if it does it's // or it was indeed a no-op. This should not happen, but if it does it's
@ -701,15 +717,11 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
failure = errStaleDelivery failure = errStaleDelivery
} }
// Clean up a successful fetch // Clean up a successful fetch
delete(taskPool, hashes[accepted]) delete(taskPool, header.Hash())
accepted++ accepted++
} }
resDropMeter.Mark(int64(results - accepted)) resDropMeter.Mark(int64(results - accepted))
// Return all failed or missing fetches to the queue
for _, header := range request.Headers[accepted:] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
// Wake up Results // Wake up Results
if accepted > 0 { if accepted > 0 {
q.active.Signal() q.active.Signal()