eth/downloader: fix stale beacon header deletion (#33481)

In this PR, two things have been fixed:

--- 

(a) truncate the stale beacon headers with latest snap block

Originally, b.filled is used as the indicator for deleting stale beacon headers. 
This field is set only after synchronization has been scheduled, under the 
assumption that the skeleton chain is already linked to the local chain.

However, the local chain can be mutated via `debug_setHead`, which may
cause `b.filled` outdated. For instance, `b.filled` refers to the last head snap block 
in the last sync cycle while after `debug_setHead`, the head snap block has been 
rewounded to 1.

As a result, Geth can enter an unintended loop: it repeatedly downloads
the missing beacon headers for the skeleton chain and attempts to schedule the 
actual synchronization, but in the final step, all recently fetched headers are removed 
by `cleanStales` due to the stale `b.filled` value.

This issue is addressed by always using the latest snap block as the indicator, 
without relying on any cached value. However, note that before the skeleton
chain is linked to the local chain, the latest snap block will always be below
skeleton.tail, and this condition should not be treated as an error.

--- 

(b) merge the subchains once the skeleton chain links to local chain

Once the skeleton chain links with local one, it will try to schedule the 
synchronization by fetching the missing blocks and import them then. 
It's possible the last subchain already overwrites the previous subchain and 
results in having two subchains leftover. As a result, an error log will printed
https://github.com/ethereum/go-ethereum/blob/master/eth/downloader/skeleton.go#L1074
This commit is contained in:
rjl493456442 2025-12-29 16:13:30 +08:00 committed by GitHub
parent 27b3a6087e
commit 4531bfebec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 104 additions and 61 deletions

View file

@ -36,7 +36,6 @@ type beaconBackfiller struct {
downloader *Downloader // Downloader to direct via this callback implementation
success func() // Callback to run on successful sync cycle completion
filling bool // Flag whether the downloader is backfilling or not
filled *types.Header // Last header filled by the last terminated sync loop
started chan struct{} // Notification channel whether the downloader inited
lock sync.Mutex // Mutex protecting the sync lock
}
@ -56,13 +55,15 @@ func (b *beaconBackfiller) suspend() *types.Header {
// If no filling is running, don't waste cycles
b.lock.Lock()
filling := b.filling
filled := b.filled
started := b.started
b.lock.Unlock()
if !filling {
// Sync cycle was inactive, retrieve and return the latest snap block
// as the filled header.
log.Debug("Backfiller was inactive")
return filled // Return the filled header on the previous sync completion
return b.downloader.blockchain.CurrentSnapBlock()
}
// A previous filling should be running, though it may happen that it hasn't
// yet started (being done on a new goroutine). Many concurrent beacon head
@ -77,7 +78,6 @@ func (b *beaconBackfiller) suspend() *types.Header {
log.Debug("Backfiller has been suspended")
// Sync cycle was just terminated, retrieve and return the last filled header.
// Can't use `filled` as that contains a stale value from before cancellation.
return b.downloader.blockchain.CurrentSnapBlock()
}
@ -92,7 +92,6 @@ func (b *beaconBackfiller) resume() {
return
}
b.filling = true
b.filled = nil
b.started = make(chan struct{})
b.lock.Unlock()
@ -103,7 +102,6 @@ func (b *beaconBackfiller) resume() {
defer func() {
b.lock.Lock()
b.filling = false
b.filled = b.downloader.blockchain.CurrentSnapBlock()
b.lock.Unlock()
}()
// If the downloader fails, report an error as in beacon chain mode there
@ -113,7 +111,7 @@ func (b *beaconBackfiller) resume() {
return
}
// Synchronization succeeded. Since this happens async, notify the outer
// context to disable snap syncing and enable transaction propagation.
// context to enable transaction propagation.
if b.success != nil {
b.success()
}
@ -188,6 +186,8 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
log.Error("Failed to retrieve beacon bounds", "err", err)
return 0, err
}
log.Debug("Searching beacon ancestor", "local", number, "beaconhead", beaconHead.Number, "beacontail", beaconTail.Number)
var linked bool
switch d.getMode() {
case ethconfig.FullSync:
@ -241,6 +241,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
}
start = check
}
log.Debug("Found beacon ancestor", "number", start)
return start, nil
}

View file

@ -248,7 +248,7 @@ func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, ch
syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(),
}
// Create the post-merge skeleton syncer and start the process
dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success))
dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success), chain)
go dl.stateFetcher()
return dl

View file

@ -207,6 +207,7 @@ type backfiller interface {
type skeleton struct {
db ethdb.Database // Database backing the skeleton
filler backfiller // Chain syncer suspended/resumed by head events
chain chainReader // Underlying block chain
peers *peerSet // Set of peers we can sync from
idles map[string]*peerConnection // Set of idle peers in the current sync cycle
@ -231,12 +232,19 @@ type skeleton struct {
syncStarting func() // callback triggered after a sync cycle is inited but before started
}
// chainReader wraps the method to retrieve the head of the local chain.
type chainReader interface {
// CurrentSnapBlock retrieves the head snap block from the local chain.
CurrentSnapBlock() *types.Header
}
// newSkeleton creates a new sync skeleton that tracks a potentially dangling
// header chain until it's linked into an existing set of blocks.
func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton {
func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller, chain chainReader) *skeleton {
sk := &skeleton{
db: db,
filler: filler,
chain: chain,
peers: peers,
drop: drop,
requests: make(map[uint64]*headerRequest),
@ -354,6 +362,29 @@ func (s *skeleton) Sync(head *types.Header, final *types.Header, force bool) err
}
}
// linked returns the flag indicating whether the skeleton has been linked with
// the local chain.
func (s *skeleton) linked(number uint64, hash common.Hash) bool {
linked := rawdb.HasHeader(s.db, hash, number) &&
rawdb.HasBody(s.db, hash, number) &&
rawdb.HasReceipts(s.db, hash, number)
// Ensure the skeleton chain links to the local chain below the chain head.
// This accounts for edge cases where leftover chain segments above the head
// may still link to the skeleton chain. In such cases, synchronization is
// likely to fail due to potentially missing segments in the middle.
//
// You can try to produce the edge case by these steps:
// - sync the chain
// - debug.setHead(`0x1`)
// - kill the geth process (the chain segment will be left with chain head rewound)
// - restart
if s.chain.CurrentSnapBlock() != nil {
linked = linked && s.chain.CurrentSnapBlock().Number.Uint64() >= number
}
return linked
}
// sync is the internal version of Sync that executes a single sync cycle, either
// until some termination condition is reached, or until the current cycle merges
// with a previously aborted run.
@ -378,10 +409,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
// If the sync is already done, resume the backfiller. When the loop stops,
// terminate the backfiller too.
linked := len(s.progress.Subchains) == 1 &&
rawdb.HasHeader(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead)
linked := len(s.progress.Subchains) == 1 && s.linked(s.scratchHead, s.progress.Subchains[0].Next)
if linked {
s.filler.resume()
}
@ -497,12 +525,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
// is still running, it will pick it up. If it already terminated,
// a new cycle needs to be spun up.
if linked {
linked = len(s.progress.Subchains) == 1 &&
rawdb.HasHeader(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead)
if linked {
if len(s.progress.Subchains) == 1 && s.linked(s.scratchHead, s.progress.Subchains[0].Next) {
// The skeleton chain has been extended and is still linked with the local
// chain, try to re-schedule the backfiller if it's already terminated.
s.filler.resume()
@ -946,6 +969,45 @@ func (s *skeleton) revertRequest(req *headerRequest) {
s.scratchOwners[(s.scratchHead-req.head)/requestHeaders] = ""
}
// mergeSubchains is invoked once certain beacon headers have been persisted locally
// and the subchains should be merged in case there are some overlaps between. An
// indicator will be returned if the last subchain is merged with previous subchain.
func (s *skeleton) mergeSubchains() bool {
// If the subchain extended into the next subchain, we need to handle
// the overlap. Since there could be many overlaps, do this in a loop.
var merged bool
for len(s.progress.Subchains) > 1 && s.progress.Subchains[1].Head >= s.progress.Subchains[0].Tail {
// Extract some stats from the second subchain
head := s.progress.Subchains[1].Head
tail := s.progress.Subchains[1].Tail
next := s.progress.Subchains[1].Next
// Since we just overwrote part of the next subchain, we need to trim
// its head independent of matching or mismatching content
if s.progress.Subchains[1].Tail >= s.progress.Subchains[0].Tail {
// Fully overwritten, get rid of the subchain as a whole
log.Debug("Previous subchain fully overwritten", "head", head, "tail", tail, "next", next)
s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...)
continue
} else {
// Partially overwritten, trim the head to the overwritten size
log.Debug("Previous subchain partially overwritten", "head", head, "tail", tail, "next", next)
s.progress.Subchains[1].Head = s.progress.Subchains[0].Tail - 1
}
// If the old subchain is an extension of the new one, merge the two
// and let the skeleton syncer restart (to clean internal state)
if rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[1].Head).Hash() == s.progress.Subchains[0].Next {
log.Debug("Previous subchain merged", "head", head, "tail", tail, "next", next)
s.progress.Subchains[0].Tail = s.progress.Subchains[1].Tail
s.progress.Subchains[0].Next = s.progress.Subchains[1].Next
s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...)
merged = true
}
}
return merged
}
func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged bool) {
res.peer.log.Trace("Processing header response", "head", res.headers[0].Number, "hash", res.headers[0].Hash(), "count", len(res.headers))
@ -1019,10 +1081,9 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
// processing is done, so it's just one more "needless" check.
//
// The weird cascading checks are done to minimize the database reads.
linked = rawdb.HasHeader(s.db, header.ParentHash, header.Number.Uint64()-1) &&
rawdb.HasBody(s.db, header.ParentHash, header.Number.Uint64()-1) &&
rawdb.HasReceipts(s.db, header.ParentHash, header.Number.Uint64()-1)
linked = s.linked(header.Number.Uint64()-1, header.ParentHash)
if linked {
log.Debug("Primary subchain linked", "number", header.Number.Uint64()-1, "hash", header.ParentHash)
break
}
}
@ -1036,6 +1097,9 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
// If the beacon chain was linked to the local chain, completely swap out
// all internal progress and abort header synchronization.
if linked {
// Merge all overlapped subchains beforehand
s.mergeSubchains()
// Linking into the local chain should also mean that there are no
// leftover subchains, but in the case of importing the blocks via
// the engine API, we will not push the subchains forward. This will
@ -1093,41 +1157,10 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
s.scratchHead -= uint64(consumed)
// If the subchain extended into the next subchain, we need to handle
// the overlap. Since there could be many overlaps (come on), do this
// in a loop.
for len(s.progress.Subchains) > 1 && s.progress.Subchains[1].Head >= s.progress.Subchains[0].Tail {
// Extract some stats from the second subchain
head := s.progress.Subchains[1].Head
tail := s.progress.Subchains[1].Tail
next := s.progress.Subchains[1].Next
// Since we just overwrote part of the next subchain, we need to trim
// its head independent of matching or mismatching content
if s.progress.Subchains[1].Tail >= s.progress.Subchains[0].Tail {
// Fully overwritten, get rid of the subchain as a whole
log.Debug("Previous subchain fully overwritten", "head", head, "tail", tail, "next", next)
s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...)
continue
} else {
// Partially overwritten, trim the head to the overwritten size
log.Debug("Previous subchain partially overwritten", "head", head, "tail", tail, "next", next)
s.progress.Subchains[1].Head = s.progress.Subchains[0].Tail - 1
}
// If the old subchain is an extension of the new one, merge the two
// and let the skeleton syncer restart (to clean internal state)
if rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[1].Head).Hash() == s.progress.Subchains[0].Next {
log.Debug("Previous subchain merged", "head", head, "tail", tail, "next", next)
s.progress.Subchains[0].Tail = s.progress.Subchains[1].Tail
s.progress.Subchains[0].Next = s.progress.Subchains[1].Next
s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...)
merged = true
}
}
// If subchains were merged, all further available headers in the scratch
// space are invalid since we skipped ahead. Stop processing the scratch
// space to avoid dropping peers thinking they delivered invalid data.
merged = s.mergeSubchains()
if merged {
break
}
@ -1158,15 +1191,17 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo
// due to the downloader backfilling past the tracked tail.
func (s *skeleton) cleanStales(filled *types.Header) error {
number := filled.Number.Uint64()
log.Trace("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash())
log.Debug("Cleaning stale beacon headers", "filled", number, "hash", filled.Hash())
// If the filled header is below the linked subchain, something's corrupted
// internally. Report and error and refuse to do anything.
// If the filled header is below the subchain, it means the skeleton is not
// linked with local chain yet, don't bother to do cleanup.
if number+1 < s.progress.Subchains[0].Tail {
return fmt.Errorf("filled header below beacon header tail: %d < %d", number, s.progress.Subchains[0].Tail)
log.Debug("filled header below beacon header tail", "filled", number, "tail", s.progress.Subchains[0].Tail)
return nil
}
// If nothing in subchain is filled, don't bother to do cleanup.
if number+1 == s.progress.Subchains[0].Tail {
log.Debug("Skeleton chain not yet consumed", "filled", number, "hash", filled.Hash(), "tail", s.progress.Subchains[0].Tail)
return nil
}
// If the latest fill was on a different subchain, it means the backfiller

View file

@ -20,6 +20,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
"sync/atomic"
"testing"
@ -71,6 +72,12 @@ func (hf *hookedBackfiller) resume() {
}
}
type fakeChainReader struct{}
func (fc *fakeChainReader) CurrentSnapBlock() *types.Header {
return &types.Header{Number: big.NewInt(math.MaxInt64)}
}
// skeletonTestPeer is a mock peer that can only serve header requests from a
// pre-perated header chain (which may be arbitrarily wrong for testing).
//
@ -369,7 +376,7 @@ func TestSkeletonSyncInit(t *testing.T) {
// Create a skeleton sync and run a cycle
wait := make(chan struct{})
skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller())
skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller(), &fakeChainReader{})
skeleton.syncStarting = func() { close(wait) }
skeleton.Sync(tt.head, nil, true)
@ -472,7 +479,7 @@ func TestSkeletonSyncExtend(t *testing.T) {
// Create a skeleton sync and run a cycle
wait := make(chan struct{})
skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller())
skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller(), &fakeChainReader{})
skeleton.syncStarting = func() { close(wait) }
skeleton.Sync(tt.head, nil, true)
@ -885,7 +892,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
}
}
// Create a skeleton sync and run a cycle
skeleton := newSkeleton(db, peerset, drop, filler)
skeleton := newSkeleton(db, peerset, drop, filler, &fakeChainReader{})
skeleton.Sync(tt.head, nil, true)
// Wait a bit (bleah) for the initial sync loop to go to idle. This might