From bcb2a1bcd5ec128e6406db8c6246c2e4bbd7c630 Mon Sep 17 00:00:00 2001 From: CPerezz Date: Sun, 8 Feb 2026 00:49:02 +0100 Subject: [PATCH] eth/downloader: add pivot freeze, second state sync, and backfiller guards Freeze the pivot header for partial state nodes to ensure stable state sync progress: - Suppress pivot movement in fetchHeaders() (beaconsync.go) - Suppress pivot movement in processSnapSyncContent() (downloader.go) - Reuse existing pivot across sync cycle restarts in syncToHead() After initial snap sync completes, bridge the gap from pivot to HEAD: - Import post-pivot blocks with receipts (no execution needed since untracked contracts have empty storage tries) - Run second state sync to download HEAD state root - Add AdvancePartialHead to update currentBlock without re-execution Guard the backfiller for partial state mode: - suspend() skips Cancel() during active snap sync to prevent constant cancel/restart cycles from beacon head updates - resume() skips new sync cycles after partial sync completes --- core/blockchain.go | 31 ++++++++++++++ eth/downloader/beaconsync.go | 75 +++++++++++++++++++++++---------- eth/downloader/downloader.go | 82 ++++++++++++++++++++++++++++++++++-- 3 files changed, 162 insertions(+), 26 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 8b576d2be1..5b574b9a33 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1361,6 +1361,37 @@ func (bc *BlockChain) SnapSyncComplete(hash common.Hash) error { return nil } +// AdvancePartialHead updates currentBlock to the given block hash without +// re-executing blocks. It is used by partial state mode after receipt-importing +// post-pivot blocks and re-syncing state at the new root. +// +// Unlike SnapSyncComplete, this does NOT rebuild snapshots (already done +// during the initial pivot commit), but DOES re-enable the trie DB for the +// new root (required for path-based trie to recognize the synced state). +func (bc *BlockChain) AdvancePartialHead(hash common.Hash) error { + block := bc.GetBlockByHash(hash) + if block == nil { + return fmt.Errorf("non existent block [%x..]", hash[:4]) + } + root := block.Root() + + // Enable the trie database for the new root (required for path-based trie) + if bc.triedb.Scheme() == rawdb.PathScheme { + if err := bc.triedb.Enable(root); err != nil { + return err + } + } + + if !bc.HasState(root) { + return fmt.Errorf("non existent state [%x..]", root[:4]) + } + bc.currentBlock.Store(block.Header()) + headBlockGauge.Update(int64(block.NumberU64())) + + log.Info("Advanced partial state head", "number", block.Number(), "hash", hash) + return nil +} + // Reset purges the entire blockchain, restoring it to its genesis state. func (bc *BlockChain) Reset() error { return bc.ResetWithGenesisBlock(bc.genesisBlock) diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index 56d096dd03..aeff0826cb 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -72,6 +72,20 @@ func (b *beaconBackfiller) suspend() *types.Header { // read this channel multiple times, it gets closed on startup. <-started + // For partial state nodes during snap sync, don't cancel the sync on every + // beacon head update. The state sync needs uninterrupted time to complete, + // otherwise the constant cancel/restart cycle prevents progress. + // We skip cancellation when: + // 1. We're in partial state mode (partialFilter is set) + // 2. We're in snap sync mode OR the second state sync (pivot→HEAD) is running + // 3. State sync is actively running (synchronising is true) + if b.downloader.partialFilter != nil && + (b.downloader.getMode() == ethconfig.SnapSync || b.downloader.partialHeadSyncing.Load()) && + b.downloader.synchronising.Load() { + log.Debug("Backfiller suspend: partial state snap sync in progress, skipping cancel") + return b.downloader.blockchain.CurrentSnapBlock() + } + // Now that we're sure the downloader successfully started up, we can cancel // it safely without running the risk of data races. b.downloader.Cancel() @@ -83,6 +97,15 @@ func (b *beaconBackfiller) suspend() *types.Header { // resume starts the downloader threads for backfilling state and chain data. func (b *beaconBackfiller) resume() { + // For partial state nodes, don't start new sync cycles after the initial + // snap sync completes. The partialSyncComplete flag is set after + // AdvancePartialHead succeeds, indicating new blocks should come via + // Engine API with BAL instead of sync. + if b.downloader.partialFilter != nil && b.downloader.partialSyncComplete.Load() { + log.Debug("Backfiller resume: partial state sync complete, skipping new cycle") + return + } + b.lock.Lock() if b.filling { // If a previous filling cycle is still running, just ignore this start @@ -306,32 +329,40 @@ func (d *Downloader) fetchHeaders(from uint64) error { d.pivotLock.Lock() if d.pivotHeader != nil { if head.Number.Uint64() > d.pivotHeader.Number.Uint64()+2*uint64(fsMinFullBlocks)-8 { - // Retrieve the next pivot header, either from skeleton chain - // or the filled chain - number := head.Number.Uint64() - uint64(fsMinFullBlocks) + // For partial state nodes, don't move the pivot. The state sync + // needs uninterrupted time to complete with a stable root. The + // second sync (pivot→HEAD) will handle the state gap afterward. + if d.partialFilter != nil { + log.Debug("Partial state: suppressing pivot move in fetchHeaders", + "current", d.pivotHeader.Number, "head", head.Number) + } else { + // Retrieve the next pivot header, either from skeleton chain + // or the filled chain + number := head.Number.Uint64() - uint64(fsMinFullBlocks) - log.Warn("Pivot seemingly stale, moving", "old", d.pivotHeader.Number, "new", number) - if d.pivotHeader = d.skeleton.Header(number); d.pivotHeader == nil { - if number < tail.Number.Uint64() { - dist := tail.Number.Uint64() - number - if len(localHeaders) >= int(dist) { - d.pivotHeader = localHeaders[dist-1] - log.Warn("Retrieved pivot header from local", "number", d.pivotHeader.Number, "hash", d.pivotHeader.Hash(), "latest", head.Number, "oldest", tail.Number) + log.Warn("Pivot seemingly stale, moving", "old", d.pivotHeader.Number, "new", number) + if d.pivotHeader = d.skeleton.Header(number); d.pivotHeader == nil { + if number < tail.Number.Uint64() { + dist := tail.Number.Uint64() - number + if len(localHeaders) >= int(dist) { + d.pivotHeader = localHeaders[dist-1] + log.Warn("Retrieved pivot header from local", "number", d.pivotHeader.Number, "hash", d.pivotHeader.Hash(), "latest", head.Number, "oldest", tail.Number) + } } } + // Print an error log and return directly in case the pivot header + // is still not found. It means the skeleton chain is not linked + // correctly with local chain. + if d.pivotHeader == nil { + log.Error("Pivot header is not found", "number", number) + d.pivotLock.Unlock() + return errNoPivotHeader + } + // Write out the pivot into the database so a rollback beyond + // it will reenable snap sync and update the state root that + // the state syncer will be downloading + rawdb.WriteLastPivotNumber(d.stateDB, d.pivotHeader.Number.Uint64()) } - // Print an error log and return directly in case the pivot header - // is still not found. It means the skeleton chain is not linked - // correctly with local chain. - if d.pivotHeader == nil { - log.Error("Pivot header is not found", "number", number) - d.pivotLock.Unlock() - return errNoPivotHeader - } - // Write out the pivot into the database so a rollback beyond - // it will reenable snap sync and update the state root that - // the state syncer will be downloading - rawdb.WriteLastPivotNumber(d.stateDB, d.pivotHeader.Number.Uint64()) } } d.pivotLock.Unlock() diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 3bb32893e2..0a7d8d89a1 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -130,6 +130,7 @@ type Downloader struct { chainCutoffNumber uint64 chainCutoffHash common.Hash chainRetention uint64 // Bodies/receipts retention window in blocks from HEAD (0 = keep all) + partialFilter partial.ContractFilter // If set, partial state mode is active (skip storage for untracked contracts) // Channels headerProcCh chan *headerTask // Channel to feed the header processor new tasks @@ -149,6 +150,16 @@ type Downloader struct { cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers cancelWg sync.WaitGroup // Make sure all fetcher goroutines have exited. + // partialHeadSyncing is set during the second state sync (pivot→HEAD) + // for partial state nodes. When true, beaconBackfiller.suspend() should + // not call Cancel(), allowing the sync to complete naturally. + partialHeadSyncing atomic.Bool + + // partialSyncComplete is set after the initial partial sync completes + // successfully (after AdvancePartialHead succeeds). When true, new sync + // cycles should be skipped - new blocks come via Engine API with BAL. + partialSyncComplete atomic.Bool + quitCh chan struct{} // Quit channel to signal termination quitLock sync.Mutex // Lock to prevent double closes @@ -228,6 +239,11 @@ type BlockChain interface { // HistoryPruningCutoff returns the configured history pruning point. // Block bodies along with the receipts will be skipped for synchronization. HistoryPruningCutoff() (uint64, common.Hash) + + // AdvancePartialHead updates currentBlock to the given block hash without + // re-executing blocks. Used by partial state mode after receipt-importing + // post-pivot blocks and re-syncing state at the new root. + AdvancePartialHead(common.Hash) error } // New creates a new downloader to fetch hashes and blocks from remote peers. @@ -243,6 +259,7 @@ func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, ch chainCutoffNumber: cutoffNumber, chainCutoffHash: cutoffHash, chainRetention: chainRetention, + partialFilter: partialFilter, dropPeer: dropPeer, headerProcCh: make(chan *headerTask, 1), quitCh: make(chan struct{}), @@ -618,7 +635,12 @@ func (d *Downloader) syncToHead() (err error) { } if mode == ethconfig.SnapSync { d.pivotLock.Lock() - d.pivotHeader = pivot + if d.partialFilter != nil && d.pivotHeader != nil { + log.Debug("Partial state: reusing existing pivot across sync restart", + "pivot", d.pivotHeader.Number.Uint64(), "new_would_be", pivot.Number.Uint64()) + } else { + d.pivotHeader = pivot + } d.pivotLock.Unlock() fetchers = append(fetchers, func() error { return d.processSnapSyncContent() }) @@ -950,6 +972,45 @@ func (d *Downloader) processSnapSyncContent() error { if len(results) == 0 { // If pivot sync is done, stop if d.committed.Load() { + // Partial state: bridge the gap from pivot state to HEAD state. + // After receipt-importing afterP blocks, the state trie exists at + // the pivot root but NOT at HEAD's root. Future BAL-based block + // processing needs the parent state at HEAD's root, so we run a + // second state sync to download it (no execution involved). + if d.partialFilter != nil { + snapHead := d.blockchain.CurrentSnapBlock() + currentHead := d.blockchain.CurrentBlock() + + if snapHead.Hash() != currentHead.Hash() { + log.Info("Partial state: syncing state to HEAD", + "pivot", currentHead.Number, "head", snapHead.Number) + + // Set flag to prevent beaconBackfiller.suspend() from + // cancelling us during this critical second state sync. + d.partialHeadSyncing.Store(true) + + sync.Cancel() + sync = d.syncState(snapHead.Root) + go closeOnErr(sync) + + err := sync.Wait() + d.partialHeadSyncing.Store(false) + + if err != nil { + // TODO: Consider explicit retry logic or state cleanup here. + // Currently relies on self-healing: next sync cycle detects + // snapHead != currentHead and retries second state sync. + log.Error("Partial state second sync failed, will retry", "pivot", currentHead.Number, "head", snapHead.Number, "err", err) + return err + } + if err := d.blockchain.AdvancePartialHead(snapHead.Hash()); err != nil { + return err + } + // Mark partial sync as complete - new blocks via Engine API only + d.partialSyncComplete.Store(true) + log.Info("Partial state initial sync complete") + } + } d.reportSnapSyncProgress(true) return sync.Cancel() } @@ -1014,9 +1075,22 @@ func (d *Downloader) processSnapSyncContent() error { continue } } - // Fast sync done, pivot commit done, full import - if err := d.importBlockResults(afterP); err != nil { - return err + // Fast sync done, pivot commit done, import remaining blocks. + if d.partialFilter != nil { + // Partial state mode ONLY: import afterP with receipts (no execution). + // Untracked contracts have empty storage tries, so full execution + // would fail. State will be brought to HEAD via a second state sync + // at the processSnapSyncContent exit path. + if len(afterP) > 0 { + if err := d.commitSnapSyncData(afterP, sync); err != nil { + return err + } + } + } else { + // Normal (full node) mode: execute afterP blocks to advance state. + if err := d.importBlockResults(afterP); err != nil { + return err + } } } }