eth/downloader: add pivot freeze, second state sync, and backfiller guards

Freeze the pivot header for partial state nodes to ensure stable state
sync progress:
- Suppress pivot movement in fetchHeaders() (beaconsync.go)
- Suppress pivot movement in processSnapSyncContent() (downloader.go)
- Reuse existing pivot across sync cycle restarts in syncToHead()

After initial snap sync completes, bridge the gap from pivot to HEAD:
- Import post-pivot blocks with receipts (no execution needed since
  untracked contracts have empty storage tries)
- Run second state sync to download HEAD state root
- Add AdvancePartialHead to update currentBlock without re-execution

Guard the backfiller for partial state mode:
- suspend() skips Cancel() during active snap sync to prevent
  constant cancel/restart cycles from beacon head updates
- resume() skips new sync cycles after partial sync completes
This commit is contained in:
CPerezz 2026-02-08 00:49:02 +01:00
parent 2a1747c07e
commit bcb2a1bcd5
No known key found for this signature in database
GPG key ID: 62045F34B97177DD
3 changed files with 162 additions and 26 deletions

View file

@ -1361,6 +1361,37 @@ func (bc *BlockChain) SnapSyncComplete(hash common.Hash) error {
return nil
}
// AdvancePartialHead updates currentBlock to the given block hash without
// re-executing blocks. It is used by partial state mode after receipt-importing
// post-pivot blocks and re-syncing state at the new root.
//
// Unlike SnapSyncComplete, this does NOT rebuild snapshots (already done
// during the initial pivot commit), but DOES re-enable the trie DB for the
// new root (required for path-based trie to recognize the synced state).
func (bc *BlockChain) AdvancePartialHead(hash common.Hash) error {
block := bc.GetBlockByHash(hash)
if block == nil {
return fmt.Errorf("non existent block [%x..]", hash[:4])
}
root := block.Root()
// Enable the trie database for the new root (required for path-based trie)
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Enable(root); err != nil {
return err
}
}
if !bc.HasState(root) {
return fmt.Errorf("non existent state [%x..]", root[:4])
}
bc.currentBlock.Store(block.Header())
headBlockGauge.Update(int64(block.NumberU64()))
log.Info("Advanced partial state head", "number", block.Number(), "hash", hash)
return nil
}
// Reset purges the entire blockchain, restoring it to its genesis state.
func (bc *BlockChain) Reset() error {
return bc.ResetWithGenesisBlock(bc.genesisBlock)

View file

@ -72,6 +72,20 @@ func (b *beaconBackfiller) suspend() *types.Header {
// read this channel multiple times, it gets closed on startup.
<-started
// For partial state nodes during snap sync, don't cancel the sync on every
// beacon head update. The state sync needs uninterrupted time to complete,
// otherwise the constant cancel/restart cycle prevents progress.
// We skip cancellation when:
// 1. We're in partial state mode (partialFilter is set)
// 2. We're in snap sync mode OR the second state sync (pivot→HEAD) is running
// 3. State sync is actively running (synchronising is true)
if b.downloader.partialFilter != nil &&
(b.downloader.getMode() == ethconfig.SnapSync || b.downloader.partialHeadSyncing.Load()) &&
b.downloader.synchronising.Load() {
log.Debug("Backfiller suspend: partial state snap sync in progress, skipping cancel")
return b.downloader.blockchain.CurrentSnapBlock()
}
// Now that we're sure the downloader successfully started up, we can cancel
// it safely without running the risk of data races.
b.downloader.Cancel()
@ -83,6 +97,15 @@ func (b *beaconBackfiller) suspend() *types.Header {
// resume starts the downloader threads for backfilling state and chain data.
func (b *beaconBackfiller) resume() {
// For partial state nodes, don't start new sync cycles after the initial
// snap sync completes. The partialSyncComplete flag is set after
// AdvancePartialHead succeeds, indicating new blocks should come via
// Engine API with BAL instead of sync.
if b.downloader.partialFilter != nil && b.downloader.partialSyncComplete.Load() {
log.Debug("Backfiller resume: partial state sync complete, skipping new cycle")
return
}
b.lock.Lock()
if b.filling {
// If a previous filling cycle is still running, just ignore this start
@ -306,32 +329,40 @@ func (d *Downloader) fetchHeaders(from uint64) error {
d.pivotLock.Lock()
if d.pivotHeader != nil {
if head.Number.Uint64() > d.pivotHeader.Number.Uint64()+2*uint64(fsMinFullBlocks)-8 {
// Retrieve the next pivot header, either from skeleton chain
// or the filled chain
number := head.Number.Uint64() - uint64(fsMinFullBlocks)
// For partial state nodes, don't move the pivot. The state sync
// needs uninterrupted time to complete with a stable root. The
// second sync (pivot→HEAD) will handle the state gap afterward.
if d.partialFilter != nil {
log.Debug("Partial state: suppressing pivot move in fetchHeaders",
"current", d.pivotHeader.Number, "head", head.Number)
} else {
// Retrieve the next pivot header, either from skeleton chain
// or the filled chain
number := head.Number.Uint64() - uint64(fsMinFullBlocks)
log.Warn("Pivot seemingly stale, moving", "old", d.pivotHeader.Number, "new", number)
if d.pivotHeader = d.skeleton.Header(number); d.pivotHeader == nil {
if number < tail.Number.Uint64() {
dist := tail.Number.Uint64() - number
if len(localHeaders) >= int(dist) {
d.pivotHeader = localHeaders[dist-1]
log.Warn("Retrieved pivot header from local", "number", d.pivotHeader.Number, "hash", d.pivotHeader.Hash(), "latest", head.Number, "oldest", tail.Number)
log.Warn("Pivot seemingly stale, moving", "old", d.pivotHeader.Number, "new", number)
if d.pivotHeader = d.skeleton.Header(number); d.pivotHeader == nil {
if number < tail.Number.Uint64() {
dist := tail.Number.Uint64() - number
if len(localHeaders) >= int(dist) {
d.pivotHeader = localHeaders[dist-1]
log.Warn("Retrieved pivot header from local", "number", d.pivotHeader.Number, "hash", d.pivotHeader.Hash(), "latest", head.Number, "oldest", tail.Number)
}
}
}
// Print an error log and return directly in case the pivot header
// is still not found. It means the skeleton chain is not linked
// correctly with local chain.
if d.pivotHeader == nil {
log.Error("Pivot header is not found", "number", number)
d.pivotLock.Unlock()
return errNoPivotHeader
}
// Write out the pivot into the database so a rollback beyond
// it will reenable snap sync and update the state root that
// the state syncer will be downloading
rawdb.WriteLastPivotNumber(d.stateDB, d.pivotHeader.Number.Uint64())
}
// Print an error log and return directly in case the pivot header
// is still not found. It means the skeleton chain is not linked
// correctly with local chain.
if d.pivotHeader == nil {
log.Error("Pivot header is not found", "number", number)
d.pivotLock.Unlock()
return errNoPivotHeader
}
// Write out the pivot into the database so a rollback beyond
// it will reenable snap sync and update the state root that
// the state syncer will be downloading
rawdb.WriteLastPivotNumber(d.stateDB, d.pivotHeader.Number.Uint64())
}
}
d.pivotLock.Unlock()

View file

@ -130,6 +130,7 @@ type Downloader struct {
chainCutoffNumber uint64
chainCutoffHash common.Hash
chainRetention uint64 // Bodies/receipts retention window in blocks from HEAD (0 = keep all)
partialFilter partial.ContractFilter // If set, partial state mode is active (skip storage for untracked contracts)
// Channels
headerProcCh chan *headerTask // Channel to feed the header processor new tasks
@ -149,6 +150,16 @@ type Downloader struct {
cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers
cancelWg sync.WaitGroup // Make sure all fetcher goroutines have exited.
// partialHeadSyncing is set during the second state sync (pivot→HEAD)
// for partial state nodes. When true, beaconBackfiller.suspend() should
// not call Cancel(), allowing the sync to complete naturally.
partialHeadSyncing atomic.Bool
// partialSyncComplete is set after the initial partial sync completes
// successfully (after AdvancePartialHead succeeds). When true, new sync
// cycles should be skipped - new blocks come via Engine API with BAL.
partialSyncComplete atomic.Bool
quitCh chan struct{} // Quit channel to signal termination
quitLock sync.Mutex // Lock to prevent double closes
@ -228,6 +239,11 @@ type BlockChain interface {
// HistoryPruningCutoff returns the configured history pruning point.
// Block bodies along with the receipts will be skipped for synchronization.
HistoryPruningCutoff() (uint64, common.Hash)
// AdvancePartialHead updates currentBlock to the given block hash without
// re-executing blocks. Used by partial state mode after receipt-importing
// post-pivot blocks and re-syncing state at the new root.
AdvancePartialHead(common.Hash) error
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
@ -243,6 +259,7 @@ func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, ch
chainCutoffNumber: cutoffNumber,
chainCutoffHash: cutoffHash,
chainRetention: chainRetention,
partialFilter: partialFilter,
dropPeer: dropPeer,
headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}),
@ -618,7 +635,12 @@ func (d *Downloader) syncToHead() (err error) {
}
if mode == ethconfig.SnapSync {
d.pivotLock.Lock()
d.pivotHeader = pivot
if d.partialFilter != nil && d.pivotHeader != nil {
log.Debug("Partial state: reusing existing pivot across sync restart",
"pivot", d.pivotHeader.Number.Uint64(), "new_would_be", pivot.Number.Uint64())
} else {
d.pivotHeader = pivot
}
d.pivotLock.Unlock()
fetchers = append(fetchers, func() error { return d.processSnapSyncContent() })
@ -950,6 +972,45 @@ func (d *Downloader) processSnapSyncContent() error {
if len(results) == 0 {
// If pivot sync is done, stop
if d.committed.Load() {
// Partial state: bridge the gap from pivot state to HEAD state.
// After receipt-importing afterP blocks, the state trie exists at
// the pivot root but NOT at HEAD's root. Future BAL-based block
// processing needs the parent state at HEAD's root, so we run a
// second state sync to download it (no execution involved).
if d.partialFilter != nil {
snapHead := d.blockchain.CurrentSnapBlock()
currentHead := d.blockchain.CurrentBlock()
if snapHead.Hash() != currentHead.Hash() {
log.Info("Partial state: syncing state to HEAD",
"pivot", currentHead.Number, "head", snapHead.Number)
// Set flag to prevent beaconBackfiller.suspend() from
// cancelling us during this critical second state sync.
d.partialHeadSyncing.Store(true)
sync.Cancel()
sync = d.syncState(snapHead.Root)
go closeOnErr(sync)
err := sync.Wait()
d.partialHeadSyncing.Store(false)
if err != nil {
// TODO: Consider explicit retry logic or state cleanup here.
// Currently relies on self-healing: next sync cycle detects
// snapHead != currentHead and retries second state sync.
log.Error("Partial state second sync failed, will retry", "pivot", currentHead.Number, "head", snapHead.Number, "err", err)
return err
}
if err := d.blockchain.AdvancePartialHead(snapHead.Hash()); err != nil {
return err
}
// Mark partial sync as complete - new blocks via Engine API only
d.partialSyncComplete.Store(true)
log.Info("Partial state initial sync complete")
}
}
d.reportSnapSyncProgress(true)
return sync.Cancel()
}
@ -1014,9 +1075,22 @@ func (d *Downloader) processSnapSyncContent() error {
continue
}
}
// Fast sync done, pivot commit done, full import
if err := d.importBlockResults(afterP); err != nil {
return err
// Fast sync done, pivot commit done, import remaining blocks.
if d.partialFilter != nil {
// Partial state mode ONLY: import afterP with receipts (no execution).
// Untracked contracts have empty storage tries, so full execution
// would fail. State will be brought to HEAD via a second state sync
// at the processSnapSyncContent exit path.
if len(afterP) > 0 {
if err := d.commitSnapSyncData(afterP, sync); err != nil {
return err
}
}
} else {
// Normal (full node) mode: execute afterP blocks to advance state.
if err := d.importBlockResults(afterP); err != nil {
return err
}
}
}
}