diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 9280d455fb..6d5d159631 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -96,6 +96,7 @@ func (dl *downloadTester) newPeer(id string, version uint, blocks []*types.Block id: id, chain: newTestBlockchain(blocks), withholdBodies: make(map[common.Hash]struct{}), + dropped: make(chan error, 1), } dl.peers[id] = peer @@ -121,8 +122,11 @@ func (dl *downloadTester) dropPeer(id string) { type downloadTesterPeer struct { dl *downloadTester withholdBodies map[common.Hash]struct{} + corruptBodies bool // if set, the peer serves incorrect blocks id string chain *core.BlockChain + + dropped chan error // signaled when res.Done receives an error } func unmarshalRlpHeaders(rlpdata []rlp.RawValue) []*types.Header { @@ -236,6 +240,11 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et txsHashes[i] = hash uncleHashes[i] = types.CalcUncleHash(body.Uncles) } + if dlp.corruptBodies { + for i := range txsHashes { + txsHashes[i] = common.Hash{0xff} + } + } req := ð.Request{ Peer: dlp.id, } @@ -248,10 +257,16 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et WithdrawalRoots: withdrawalHashes, }, Time: 1, - Done: make(chan error, 1), // Ignore the returned status + Done: make(chan error), } go func() { sink <- res + if err := <-res.Done; err != nil { + select { + case dlp.dropped <- err: + default: + } + } }() return req, nil } @@ -704,3 +719,21 @@ func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) { t.Fatalf("Failed to sync chain in three seconds") } } + +func TestInvalidBodyPeerDrop(t *testing.T) { + tester := newTester(t, FullSync) + defer tester.terminate() + + chain := testChainBase.shorten(blockCacheMaxItems - 15) + peer := tester.newPeer("corrupt", eth.ETH69, chain.blocks[1:]) + peer.corruptBodies = true + + if err := tester.downloader.BeaconSync(chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil { + t.Fatalf("failed to beacon-sync chain: %v", err) + } + select { + case <-peer.dropped: + case <-time.After(1 * time.Minute): + t.Fatal("peer was not dropped") + } +} diff --git a/eth/downloader/fetchers_concurrent.go b/eth/downloader/fetchers_concurrent.go index 9d8cd114c1..51bf3404bd 100644 --- a/eth/downloader/fetchers_concurrent.go +++ b/eth/downloader/fetchers_concurrent.go @@ -323,25 +323,32 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error { delete(pending, res.Req.Peer) delete(stales, res.Req.Peer) - // Signal the dispatcher that the round trip is done. We'll drop the - // peer if the data turns out to be junk. - res.Done <- nil - res.Req.Close() - // If the peer was previously banned and failed to deliver its pack // in a reasonable time frame, ignore its message. - if peer := d.peers.Peer(res.Req.Peer); peer != nil { - // Deliver the received chunk of data and check chain validity - accepted, err := queue.deliver(peer, res) - if errors.Is(err, errInvalidChain) { - return err - } - // Unless a peer delivered something completely else than requested (usually - // caused by a timed out request which came through in the end), set it to - // idle. If the delivery's stale, the peer should have already been idled. - if !errors.Is(err, errStaleDelivery) { - queue.updateCapacity(peer, accepted, res.Time) - } + peer := d.peers.Peer(res.Req.Peer) + if peer == nil { + res.Done <- nil + res.Req.Close() + continue + } + // Deliver the received chunk of data and check chain validity + accepted, err := queue.deliver(peer, res) + // Unless a peer delivered something completely else than requested (usually + // caused by a timed out request which came through in the end), set it to + // idle. If the delivery's stale, the peer should have already been idled. + if !errors.Is(err, errStaleDelivery) { + queue.updateCapacity(peer, accepted, res.Time) + } + res.Done <- validityErrorOfRequest(err) + res.Req.Close() + + if errors.Is(err, errInvalidChain) { + // errInvalidChain is the signal that processing of items failed internally, + // even though the items were validly encoded. + // + // This can be due to invalid blocks, or a database error. + // The sync cycle should be aborted for such errors, so we return it here. + return err } case cont := <-queue.waker(): @@ -352,3 +359,11 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error { } } } + +// validityErrorOfRequest returns err if it is related to block validity, and nil otherwise. +func validityErrorOfRequest(err error) error { + if errors.Is(err, errInvalidBody) || errors.Is(err, errInvalidReceipt) { + return err + } + return nil +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index c0cb9b174a..dd17b7f1ed 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -671,10 +671,10 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, } // Assemble each of the results with their headers and retrieved data parts var ( - accepted int - failure error - i int - hashes []common.Hash + accepted int + failure error + i int + foundStale bool ) for _, header := range request.Headers { // Short circuit assembly if no more fetch results are found @@ -686,42 +686,41 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, failure = err break } - hashes = append(hashes, header.Hash()) i++ } for _, header := range request.Headers[:i] { if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale { reconstruct(accepted, res) + accepted++ } else { - // else: between here and above, some other peer filled this result, + // Between here and above, some other peer filled this result, // or it was indeed a no-op. This should not happen, but if it does it's // not something to panic about log.Error("Delivery stale", "stale", stale, "number", header.Number.Uint64(), "err", err) - failure = errStaleDelivery + foundStale = true } // Clean up a successful fetch - delete(taskPool, hashes[accepted]) - accepted++ + delete(taskPool, header.Hash()) } resDropMeter.Mark(int64(results - accepted)) // Return all failed or missing fetches to the queue - for _, header := range request.Headers[accepted:] { + for _, header := range request.Headers[i:] { taskQueue.Push(header, -int64(header.Number.Uint64())) } // Wake up Results if accepted > 0 { q.active.Signal() } - if failure == nil { - return accepted, nil + if failure != nil { + return accepted, failure } // If none of the data was good, it's a stale delivery - if accepted > 0 { - return accepted, fmt.Errorf("partial failure: %v", failure) + if foundStale { + return accepted, errStaleDelivery } - return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery) + return accepted, nil } // Prepare configures the result cache to allow accepting and caching inbound