eth/downloader: more context in errors (#21067)

This commit is contained in:
Daniel Liu 2025-02-24 17:49:13 +08:00
parent 3ee371454f
commit 2955a988ca
3 changed files with 52 additions and 30 deletions

View file

@ -315,13 +315,28 @@ func (d *Downloader) UnregisterPeer(id string) error {
// adding various sanity checks as well as wrapping it with various log entries.
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
err := d.synchronise(id, head, td, mode)
switch err {
case nil:
case errBusy, errCanceled:
switch err {
case nil, errBusy, errCanceled:
return err
}
if errors.Is(err, errInvalidChain) {
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
} else {
d.dropPeer(id)
}
return err
}
switch err {
case errTimeout, errBadPeer, errStallingPeer,
errEmptyHeaderSet, errPeersUnavailable, errTooOld,
errInvalidAncestor, errInvalidChain:
errInvalidAncestor:
log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
if d.dropPeer == nil {
// The dropPeer method is nil when `--copydb` is used for a local copy.
@ -696,7 +711,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header)
expectNumber := from + int64(i)*int64((skip+1))
if number := header.Number.Int64(); number != expectNumber {
p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number)
return 0, errInvalidChain
return 0, fmt.Errorf("%w: %v", errInvalidChain, errors.New("head headers broke chain ordering"))
}
}
// Check if a common ancestor was found
@ -908,7 +923,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64)
filled, proced, err := d.fillHeaderSkeleton(from, headers)
if err != nil {
p.log.Debug("Skeleton chain invalid", "err", err)
return errInvalidChain
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
headers = filled[proced:]
from += uint64(proced)
@ -1121,13 +1136,13 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
// Deliver the received chunk of data and check chain validity
accepted, err := deliver(packet)
if err == errInvalidChain {
if errors.Is(err, errInvalidChain) {
return err
}
// Unless a peer delivered something completely else than requested (usually
// caused by a timed out request which came through in the end), set it to
// idle. If the delivery's stale, the peer should have already been idled.
if err != errStaleDelivery {
if !errors.Is(err, errStaleDelivery) {
setIdle(peer, accepted)
}
// Issue a log to the user to see what's going on
@ -1388,7 +1403,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er
rollback = append(rollback, chunk[:n]...)
}
log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
return errInvalidChain
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
// All verifications passed, store newly found uncertain headers
rollback = append(rollback, unknown...)
@ -1515,7 +1530,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
// of the blocks delivered from the downloader, and the indexing will be off.
log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err)
}
return errInvalidChain
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
if d.handleProposedBlock != nil {
header := blocks[len(blocks)-1].Header()
@ -1666,7 +1681,7 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state
}
if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
return nil
}

View file

@ -318,27 +318,32 @@ func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int {
func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (int, error) {
dl.lock.Lock()
defer dl.lock.Unlock()
// Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors
if _, ok := dl.ownHeaders[headers[0].ParentHash]; !ok {
return 0, errors.New("unknown parent")
return 0, errors.New("InsertHeaderChain: unknown parent at first position")
}
var hashes []common.Hash
for i := 1; i < len(headers); i++ {
hash := headers[i-1].Hash()
if headers[i].ParentHash != headers[i-1].Hash() {
return i, errors.New("unknown parent")
return i, fmt.Errorf("non-contiguous import at position %d", i)
}
hashes = append(hashes, hash)
}
hashes = append(hashes, headers[len(headers)-1].Hash())
// Do a full insert if pre-checks passed
for i, header := range headers {
if _, ok := dl.ownHeaders[header.Hash()]; ok {
hash := hashes[i]
if _, ok := dl.ownHeaders[hash]; ok {
continue
}
if _, ok := dl.ownHeaders[header.ParentHash]; !ok {
return i, errors.New("unknown parent")
// This _should_ be impossible, due to precheck and induction
return i, fmt.Errorf("InsertHeaderChain: unknown parent at position %d", i)
}
dl.ownHashes = append(dl.ownHashes, header.Hash())
dl.ownHeaders[header.Hash()] = header
dl.ownChainTd[header.Hash()] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty)
dl.ownHashes = append(dl.ownHashes, hash)
dl.ownHeaders[hash] = header
dl.ownChainTd[hash] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty)
}
return len(headers), nil
}
@ -350,9 +355,9 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (int, error) {
for i, block := range blocks {
if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok {
return i, errors.New("unknown parent")
return i, fmt.Errorf("InsertChain: unknown parent at position %d / %d", i, len(blocks))
} else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil {
return i, fmt.Errorf("unknown parent state %x: %v", parent.Root(), err)
return i, fmt.Errorf("InsertChain: unknown parent state %x: %v", parent.Root(), err)
}
if _, ok := dl.ownHeaders[block.Hash()]; !ok {
dl.ownHashes = append(dl.ownHashes, block.Hash())
@ -376,7 +381,7 @@ func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []typ
return i, errors.New("unknown owner")
}
if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok {
return i, errors.New("unknown parent")
return i, errors.New("InsertReceiptChain: unknown parent")
}
dl.ownBlocks[blocks[i].Hash()] = blocks[i]
dl.ownReceipts[blocks[i].Hash()] = receipts[i]

View file

@ -510,7 +510,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
log.Error("index allocation went beyond available resultCache space", "index", index, "len.resultCache", len(q.resultCache), "blockNum", header.Number.Int64(), "resultOffset", q.resultOffset)
return nil, false, errInvalidChain
return nil, false, fmt.Errorf("%w: index allocation went beyond available resultCache space", errInvalidChain)
}
if q.resultCache[index] == nil {
components := 1
@ -865,14 +865,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
q.active.Signal()
}
// If none of the data was good, it's a stale delivery
switch {
case failure == nil || failure == errInvalidChain:
return accepted, failure
case useful:
return accepted, fmt.Errorf("partial failure: %v", failure)
default:
return accepted, errStaleDelivery
if failure == nil {
return accepted, nil
}
if errors.Is(failure, errInvalidChain) {
return accepted, failure
}
if useful {
return accepted, fmt.Errorf("partial failure: %v", failure)
}
return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery)
}
// Prepare configures the result cache to allow accepting and caching inbound