eth/downloader: drop peers sending invalid bodies or receipts (#34745)

- Fixes an error shadowing issue in the deliver() function, where a
stale result from GetDeliverySlot caused the original failure to be
overwritten by errStaleDelivery.
- Adds errInvalidBody and errInvalidReceipt to the downloader error
checks to properly drop peers who sent invalid responses.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
Bosul Mun 2026-04-30 17:55:26 +02:00 committed by GitHub
parent 01036bed83
commit 75a64ee341
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 80 additions and 33 deletions

View file

@ -96,6 +96,7 @@ func (dl *downloadTester) newPeer(id string, version uint, blocks []*types.Block
id: id,
chain: newTestBlockchain(blocks),
withholdBodies: make(map[common.Hash]struct{}),
dropped: make(chan error, 1),
}
dl.peers[id] = peer
@ -121,8 +122,11 @@ func (dl *downloadTester) dropPeer(id string) {
type downloadTesterPeer struct {
dl *downloadTester
withholdBodies map[common.Hash]struct{}
corruptBodies bool // if set, the peer serves incorrect blocks
id string
chain *core.BlockChain
dropped chan error // signaled when res.Done receives an error
}
func unmarshalRlpHeaders(rlpdata []rlp.RawValue) []*types.Header {
@ -236,6 +240,11 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
txsHashes[i] = hash
uncleHashes[i] = types.CalcUncleHash(body.Uncles)
}
if dlp.corruptBodies {
for i := range txsHashes {
txsHashes[i] = common.Hash{0xff}
}
}
req := &eth.Request{
Peer: dlp.id,
}
@ -248,10 +257,16 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
WithdrawalRoots: withdrawalHashes,
},
Time: 1,
Done: make(chan error, 1), // Ignore the returned status
Done: make(chan error),
}
go func() {
sink <- res
if err := <-res.Done; err != nil {
select {
case dlp.dropped <- err:
default:
}
}
}()
return req, nil
}
@ -704,3 +719,21 @@ func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
t.Fatalf("Failed to sync chain in three seconds")
}
}
func TestInvalidBodyPeerDrop(t *testing.T) {
tester := newTester(t, FullSync)
defer tester.terminate()
chain := testChainBase.shorten(blockCacheMaxItems - 15)
peer := tester.newPeer("corrupt", eth.ETH69, chain.blocks[1:])
peer.corruptBodies = true
if err := tester.downloader.BeaconSync(chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil {
t.Fatalf("failed to beacon-sync chain: %v", err)
}
select {
case <-peer.dropped:
case <-time.After(1 * time.Minute):
t.Fatal("peer was not dropped")
}
}

View file

@ -323,25 +323,32 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
delete(pending, res.Req.Peer)
delete(stales, res.Req.Peer)
// Signal the dispatcher that the round trip is done. We'll drop the
// peer if the data turns out to be junk.
res.Done <- nil
res.Req.Close()
// If the peer was previously banned and failed to deliver its pack
// in a reasonable time frame, ignore its message.
if peer := d.peers.Peer(res.Req.Peer); peer != nil {
// Deliver the received chunk of data and check chain validity
accepted, err := queue.deliver(peer, res)
if errors.Is(err, errInvalidChain) {
return err
}
// Unless a peer delivered something completely else than requested (usually
// caused by a timed out request which came through in the end), set it to
// idle. If the delivery's stale, the peer should have already been idled.
if !errors.Is(err, errStaleDelivery) {
queue.updateCapacity(peer, accepted, res.Time)
}
peer := d.peers.Peer(res.Req.Peer)
if peer == nil {
res.Done <- nil
res.Req.Close()
continue
}
// Deliver the received chunk of data and check chain validity
accepted, err := queue.deliver(peer, res)
// Unless a peer delivered something completely else than requested (usually
// caused by a timed out request which came through in the end), set it to
// idle. If the delivery's stale, the peer should have already been idled.
if !errors.Is(err, errStaleDelivery) {
queue.updateCapacity(peer, accepted, res.Time)
}
res.Done <- validityErrorOfRequest(err)
res.Req.Close()
if errors.Is(err, errInvalidChain) {
// errInvalidChain is the signal that processing of items failed internally,
// even though the items were validly encoded.
//
// This can be due to invalid blocks, or a database error.
// The sync cycle should be aborted for such errors, so we return it here.
return err
}
case cont := <-queue.waker():
@ -352,3 +359,11 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
}
}
}
// validityErrorOfRequest returns err if it is related to block validity, and nil otherwise.
func validityErrorOfRequest(err error) error {
if errors.Is(err, errInvalidBody) || errors.Is(err, errInvalidReceipt) {
return err
}
return nil
}

View file

@ -671,10 +671,10 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
}
// Assemble each of the results with their headers and retrieved data parts
var (
accepted int
failure error
i int
hashes []common.Hash
accepted int
failure error
i int
foundStale bool
)
for _, header := range request.Headers {
// Short circuit assembly if no more fetch results are found
@ -686,42 +686,41 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
failure = err
break
}
hashes = append(hashes, header.Hash())
i++
}
for _, header := range request.Headers[:i] {
if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale {
reconstruct(accepted, res)
accepted++
} else {
// else: between here and above, some other peer filled this result,
// Between here and above, some other peer filled this result,
// or it was indeed a no-op. This should not happen, but if it does it's
// not something to panic about
log.Error("Delivery stale", "stale", stale, "number", header.Number.Uint64(), "err", err)
failure = errStaleDelivery
foundStale = true
}
// Clean up a successful fetch
delete(taskPool, hashes[accepted])
accepted++
delete(taskPool, header.Hash())
}
resDropMeter.Mark(int64(results - accepted))
// Return all failed or missing fetches to the queue
for _, header := range request.Headers[accepted:] {
for _, header := range request.Headers[i:] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
// Wake up Results
if accepted > 0 {
q.active.Signal()
}
if failure == nil {
return accepted, nil
if failure != nil {
return accepted, failure
}
// If none of the data was good, it's a stale delivery
if accepted > 0 {
return accepted, fmt.Errorf("partial failure: %v", failure)
if foundStale {
return accepted, errStaleDelivery
}
return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery)
return accepted, nil
}
// Prepare configures the result cache to allow accepting and caching inbound