eth/downloader: fix error shadowing on stale delivery

This commit is contained in:
healthykim 2026-04-16 21:54:27 +02:00
parent f63e9f3a80
commit f26cb11603
2 changed files with 15 additions and 16 deletions

View file

@ -333,7 +333,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
if peer := d.peers.Peer(res.Req.Peer); peer != nil { if peer := d.peers.Peer(res.Req.Peer); peer != nil {
// Deliver the received chunk of data and check chain validity // Deliver the received chunk of data and check chain validity
accepted, err := queue.deliver(peer, res) accepted, err := queue.deliver(peer, res)
if errors.Is(err, errInvalidChain) { if errors.Is(err, errInvalidChain) || errors.Is(err, errInvalidBody) || errors.Is(err, errInvalidReceipt) {
return err return err
} }
// Unless a peer delivered something completely else than requested (usually // Unless a peer delivered something completely else than requested (usually

View file

@ -671,10 +671,10 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
} }
// Assemble each of the results with their headers and retrieved data parts // Assemble each of the results with their headers and retrieved data parts
var ( var (
accepted int accepted int
failure error failure error
i int i int
hashes []common.Hash foundStale bool
) )
for _, header := range request.Headers { for _, header := range request.Headers {
// Short circuit assembly if no more fetch results are found // Short circuit assembly if no more fetch results are found
@ -686,42 +686,41 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
failure = err failure = err
break break
} }
hashes = append(hashes, header.Hash())
i++ i++
} }
for _, header := range request.Headers[:i] { for _, header := range request.Headers[:i] {
if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale { if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale {
reconstruct(accepted, res) reconstruct(accepted, res)
accepted++
} else { } else {
// else: between here and above, some other peer filled this result, // Between here and above, some other peer filled this result,
// or it was indeed a no-op. This should not happen, but if it does it's // or it was indeed a no-op. This should not happen, but if it does it's
// not something to panic about // not something to panic about
log.Error("Delivery stale", "stale", stale, "number", header.Number.Uint64(), "err", err) log.Error("Delivery stale", "stale", stale, "number", header.Number.Uint64(), "err", err)
failure = errStaleDelivery foundStale = true
} }
// Clean up a successful fetch // Clean up a successful fetch
delete(taskPool, hashes[accepted]) delete(taskPool, header.Hash())
accepted++
} }
resDropMeter.Mark(int64(results - accepted)) resDropMeter.Mark(int64(results - accepted))
// Return all failed or missing fetches to the queue // Return all failed or missing fetches to the queue
for _, header := range request.Headers[accepted:] { for _, header := range request.Headers[i:] {
taskQueue.Push(header, -int64(header.Number.Uint64())) taskQueue.Push(header, -int64(header.Number.Uint64()))
} }
// Wake up Results // Wake up Results
if accepted > 0 { if accepted > 0 {
q.active.Signal() q.active.Signal()
} }
if failure == nil { if failure != nil {
return accepted, nil return accepted, failure
} }
// If none of the data was good, it's a stale delivery // If none of the data was good, it's a stale delivery
if accepted > 0 { if foundStale {
return accepted, fmt.Errorf("partial failure: %v", failure) return accepted, errStaleDelivery
} }
return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery) return accepted, nil
} }
// Prepare configures the result cache to allow accepting and caching inbound // Prepare configures the result cache to allow accepting and caching inbound