From e2164cc78c690367e2ca0b24cae982c3ab44bb4e Mon Sep 17 00:00:00 2001 From: Jonny Rhea <5555162+jrhea@users.noreply.github.com> Date: Mon, 15 Jun 2026 03:09:41 -0500 Subject: [PATCH] eth/downloader, eth/protocols/snap: freeze pivot once state is downloaded (#35155) --- core/rawdb/accessors_snapshot.go | 42 ------ core/rawdb/database.go | 2 - core/rawdb/schema.go | 9 -- eth/downloader/beaconsync.go | 6 +- eth/downloader/downloader.go | 28 +++- eth/protocols/snap/bal_apply.go | 6 +- eth/protocols/snap/bal_apply_test.go | 2 +- eth/protocols/snap/handler.go | 6 +- eth/protocols/snap/progress_test.go | 8 +- eth/protocols/snap/protocol.go | 9 +- eth/protocols/snap/syncer.go | 9 ++ eth/protocols/snap/syncv2.go | 192 +++++++++++++++++++-------- eth/protocols/snap/syncv2_test.go | 181 +++++++++++++++++-------- triedb/generate.go | 37 +----- triedb/generate_test.go | 91 ------------- 15 files changed, 317 insertions(+), 311 deletions(-) diff --git a/core/rawdb/accessors_snapshot.go b/core/rawdb/accessors_snapshot.go index 4573e08321..8872b00fc2 100644 --- a/core/rawdb/accessors_snapshot.go +++ b/core/rawdb/accessors_snapshot.go @@ -209,48 +209,6 @@ func WriteSnapshotSyncStatus(db ethdb.KeyValueWriter, status []byte) { } } -// ReadGenerateTriePartitionDone returns the raw subtree root blob for a -// partition that has previously completed. -func ReadGenerateTriePartitionDone(db ethdb.KeyValueReader, partition byte) ([]byte, bool) { - data, err := db.Get(generateTriePartitionDoneKey(partition)) - if err != nil { - return nil, false - } - if len(data) == 0 { - return nil, false - } - switch data[0] { - case 0x00: - // Partition is done and it is empty. - return nil, true - case 0x01: - // Partition is done and the blob follows. - return data[1:], true - default: - return nil, false - } -} - -// WriteGenerateTriePartitionDone records a completed partition. -func WriteGenerateTriePartitionDone(db ethdb.KeyValueWriter, partition byte, blob []byte) { - var value []byte - if blob == nil { - value = []byte{0x00} - } else { - value = append([]byte{0x01}, blob...) - } - if err := db.Put(generateTriePartitionDoneKey(partition), value); err != nil { - log.Crit("Failed to store generate-trie done marker", "err", err) - } -} - -// DeleteGenerateTriePartitionDone removes a partition's done marker. -func DeleteGenerateTriePartitionDone(db ethdb.KeyValueWriter, partition byte) { - if err := db.Delete(generateTriePartitionDoneKey(partition)); err != nil { - log.Crit("Failed to remove generate-trie done marker", "err", err) - } -} - // DeleteSnapshotSyncStatus removes the serialized sync status from the database. func DeleteSnapshotSyncStatus(db ethdb.KeyValueWriter) { if err := db.Delete(snapshotSyncStatusKey); err != nil { diff --git a/core/rawdb/database.go b/core/rawdb/database.go index bf19f41c44..8063bc6419 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -563,8 +563,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { } // Metadata keys - case bytes.HasPrefix(key, generateTriePartitionDonePrefix) && len(key) == len(generateTriePartitionDonePrefix)+1: - metadata.add(size) case slices.ContainsFunc(knownMetadataKeys, func(x []byte) bool { return bytes.Equal(x, key) }): metadata.add(size) diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index da5f9608bf..54c76143b4 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -104,10 +104,6 @@ var ( // snapSyncStatusFlagKey flags that status of snap sync. snapSyncStatusFlagKey = []byte("SnapSyncStatus") - // generateTriePartitionDonePrefix stores the subtree root hash of each - // triedb.GenerateTrie partition once it finishes. - generateTriePartitionDonePrefix = []byte("gtd") // generateTriePartitionDonePrefix + partition byte -> subtree root hash - // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td (deprecated) @@ -469,8 +465,3 @@ func trienodeHistoryIndexBlockKey(addressHash common.Hash, path []byte, blockID func transitionStateKey(hash common.Hash) []byte { return append(VerkleTransitionStatePrefix, hash.Bytes()...) } - -// generateTriePartitionDoneKey = generateTriePartitionDonePrefix + partition (single byte). -func generateTriePartitionDoneKey(partition byte) []byte { - return append(generateTriePartitionDonePrefix, partition) -} diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index 914e1dfada..23daafa8f6 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -297,9 +297,11 @@ func (d *Downloader) fetchHeaders(from uint64) error { return err } // If the pivot became stale (older than 2*64-8 (bit of wiggle room)), - // move it ahead to HEAD-64 + // move it ahead to HEAD-64. + // + // The state syncer is consulted first before the pivot movement. d.pivotLock.Lock() - if d.pivotHeader != nil { + if d.pivotHeader != nil && d.snapSyncer.FrozenPivot() == nil { if head.Number.Uint64() > d.pivotHeader.Number.Uint64()+2*uint64(fsMinFullBlocks)-8 { // Retrieve the next pivot header, either from skeleton chain // or the filled chain diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index e0a4ec6b6d..af4c4bb1f6 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -496,6 +496,18 @@ func (d *Downloader) syncToHead() (err error) { if mode == ethconfig.SnapSync && pivot == nil { pivot = d.blockchain.CurrentBlock() } + // If the snap syncer froze its pivot in a previous cycle, resume against + // the frozen header instead of a fresh one. + if mode == ethconfig.SnapSync && pivot != nil { + if frozen := d.snapSyncer.FrozenPivot(); frozen != nil { + if rawdb.ReadCanonicalHash(d.stateDB, frozen.Number.Uint64()) == frozen.Hash() { + log.Info("Resuming snap sync against frozen pivot", "number", frozen.Number, "hash", frozen.Hash()) + pivot = frozen + } else { + log.Warn("Frozen pivot is no longer canonical", "number", frozen.Number, "hash", frozen.Hash()) + } + } + } height := latest.Number.Uint64() // In beacon mode, use the skeleton chain for the ancestor lookup @@ -921,7 +933,9 @@ func (d *Downloader) processSnapSyncContent() error { // the results in the meantime. // // Note, there's no issue with memory piling up since after 64 blocks the - // pivot will forcefully move so these accumulators will be dropped. + // pivot will forcefully move so these accumulators will be dropped. The + // exception is snap/2 trie generation, where the pivot is frozen on + // purpose and results accumulate until the generation finishes. var ( oldPivot *fetchResult // Locked in pivot block, might change eventually oldTail []*fetchResult // Downloaded content after the pivot @@ -978,11 +992,15 @@ func (d *Downloader) processSnapSyncContent() error { return err } if P != nil { - // If new pivot block found, cancel old state retrieval and restart + // If new pivot block found, cancel old state retrieval and restart. if oldPivot != P { - sync.Cancel() - sync = d.syncState(P.Header) - go closeOnErr(sync) + // Skip the restart if the running sync already targets the + // pivot's root (e.g, no pivot block movement yet). + if sync.pivot.Root != P.Header.Root { + sync.Cancel() + sync = d.syncState(P.Header) + go closeOnErr(sync) + } oldPivot = P } // Wait for completion, occasionally checking for pivot staleness diff --git a/eth/protocols/snap/bal_apply.go b/eth/protocols/snap/bal_apply.go index 3e565662e7..d67015361b 100644 --- a/eth/protocols/snap/bal_apply.go +++ b/eth/protocols/snap/bal_apply.go @@ -93,7 +93,7 @@ func (s *syncerV2) isStorageFetched(accountHash, storageHash common.Hash) bool { // applyAccessList applies a single block's access list diffs to the flat state // in the database. For each account, it applies the post-block values (highest // TxIdx entry) for balance, nonce, code, and storage. The storageRoot field is -// intentionally left stale. It will be recomputed during the trie rebuild. +// intentionally left stale. It will be recomputed during the trie generation. func (s *syncerV2) applyAccessList(b *bal.BlockAccessList, batch ethdb.Batch) error { // Iterate over all accounts in the access list for _, access := range *b { @@ -113,7 +113,7 @@ func (s *syncerV2) applyAccessList(b *bal.BlockAccessList, batch ethdb.Batch) er rawdb.DeleteStorageSnapshot(batch, accountHash, storageHash) } else { // Store the slot in the same encoding the snapshot and the - // trie rebuild use: RLP of the minimal big-endian value + // trie generation use: RLP of the minimal big-endian value // (leading zeros trimmed), matching core/state's snapshot // writes. blob, _ := rlp.EncodeToBytes(value.Bytes()) @@ -176,7 +176,7 @@ func (s *syncerV2) applyAccessList(b *bal.BlockAccessList, batch ethdb.Batch) er case isEmpty && !isNew: // Existing account got fully drained (e.g., pre-funded // address that gets deployed to with init code that - // self-destructs). Delete the entry so the trie rebuild + // self-destructs). Delete the entry so the trie generation // doesn't pick it up as an empty leaf. rawdb.DeleteAccountSnapshot(batch, accountHash) default: diff --git a/eth/protocols/snap/bal_apply_test.go b/eth/protocols/snap/bal_apply_test.go index a9e7f789a5..e5c9188b1b 100644 --- a/eth/protocols/snap/bal_apply_test.go +++ b/eth/protocols/snap/bal_apply_test.go @@ -157,7 +157,7 @@ func TestAccessListApplication(t *testing.T) { // Verify storage updated. Slots are stored in the canonical snapshot // encoding (RLP of the value with leading zeros trimmed), the same form - // the download path writes and the trie rebuild consumes. + // the download path writes and the trie generation consumes. storageVal := rawdb.ReadStorageSnapshot(db, accountHash, slotHash) wantStorage, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(common.HexToHash("0x02").Bytes())) if !bytes.Equal(storageVal, wantStorage) { diff --git a/eth/protocols/snap/handler.go b/eth/protocols/snap/handler.go index 7adff5dc0f..ff5564ca65 100644 --- a/eth/protocols/snap/handler.go +++ b/eth/protocols/snap/handler.go @@ -84,8 +84,10 @@ type Backend interface { // otherwise only the default (snap/1) versions are offered on the wire. func MakeProtocols(backend Backend, snapV2 bool) []p2p.Protocol { versions := ProtocolVersions - if snapV2 { - versions = append([]uint{SNAP2}, versions...) + if !snapV2 { + // snap/2 is not safe to advertise unconditionally yet, so it is gated + // behind a feature flag. + versions = []uint{SNAP1} } protocols := make([]p2p.Protocol, len(versions)) for i, version := range versions { diff --git a/eth/protocols/snap/progress_test.go b/eth/protocols/snap/progress_test.go index 21ec36c2b5..25b304038a 100644 --- a/eth/protocols/snap/progress_test.go +++ b/eth/protocols/snap/progress_test.go @@ -186,8 +186,8 @@ func TestSyncProgressV1Discarded(t *testing.T) { syncer := newSyncerV2(db, rawdb.HashScheme) syncer.loadSyncStatus() - if syncer.previousPivot != nil { - t.Fatalf("expected previousPivot nil after discarding old format, got %+v", syncer.previousPivot) + if syncer.pivot != nil { + t.Fatalf("expected pivot nil after discarding old format, got %+v", syncer.pivot) } if len(syncer.tasks) != accountConcurrency { t.Fatalf("expected fresh task split of %d, got %d", accountConcurrency, len(syncer.tasks)) @@ -258,8 +258,8 @@ func TestSyncProgressCorruptPayload(t *testing.T) { syncer := newSyncerV2(db, rawdb.HashScheme) syncer.loadSyncStatus() - if syncer.previousPivot != nil { - t.Fatalf("expected previousPivot nil after corrupt payload, got %+v", syncer.previousPivot) + if syncer.pivot != nil { + t.Fatalf("expected pivot nil after corrupt payload, got %+v", syncer.pivot) } if len(syncer.tasks) != accountConcurrency { t.Fatalf("expected fresh task split of %d, got %d", accountConcurrency, len(syncer.tasks)) diff --git a/eth/protocols/snap/protocol.go b/eth/protocols/snap/protocol.go index e14ca1283d..d46607ff22 100644 --- a/eth/protocols/snap/protocol.go +++ b/eth/protocols/snap/protocol.go @@ -35,11 +35,10 @@ const ( // devp2p capability negotiation. const ProtocolName = "snap" -// ProtocolVersions are the supported versions of the `snap` protocol advertised -// by default (first is primary). snap/2 is not safe to advertise unconditionally -// yet, so it is gated behind a feature flag and appended in MakeProtocols rather -// than listed here. -var ProtocolVersions = []uint{SNAP1} +// ProtocolVersions are all the `snap` protocol versions this node implements +// (first is primary). What's actually advertised on the wire is decided by +// MakeProtocols, which gates snap/2 behind a feature flag. +var ProtocolVersions = []uint{SNAP2, SNAP1} // protocolLengths are the number of implemented messages corresponding to // different protocol versions. snap/2 adds GetAccessLists/AccessLists (0x08/0x09). diff --git a/eth/protocols/snap/syncer.go b/eth/protocols/snap/syncer.go index 0c3ea5caf2..5d1aa67ea7 100644 --- a/eth/protocols/snap/syncer.go +++ b/eth/protocols/snap/syncer.go @@ -58,6 +58,10 @@ type Syncer interface { OnTrieNodes(peer SyncPeerV2, id uint64, trienodes [][]byte) error OnAccessLists(peer SyncPeerV2, id uint64, lists rlp.RawList[rlp.RawValue]) error + // FrozenPivot returns the pivot header the syncer is bound to, or nil if + // the pivot may still be chosen and moved freely. + FrozenPivot() *types.Header + // Version is the snap protocol version this syncer implements. Version() uint } @@ -122,6 +126,11 @@ func (syncerV1Adapter) OnAccessLists(SyncPeerV2, uint64, rlp.RawList[rlp.RawValu // Version is SNAP1 func (syncerV1Adapter) Version() uint { return SNAP1 } +// FrozenPivot is always nil for snap/1: the sync target must keep tracking +// the chain head, ensuring the state is available in the network, so the +// pivot is never frozen. +func (syncerV1Adapter) FrozenPivot() *types.Header { return nil } + // syncerV2Adapter adapts the snap/2 *syncerV2 to Syncer. Its peer-facing methods // already take SyncPeerV2 and its Sync already takes a header, so only Progress // (different return type) and OnTrieNodes (absent) need wrapping. diff --git a/eth/protocols/snap/syncv2.go b/eth/protocols/snap/syncv2.go index 70f78e3ec8..ac84008697 100644 --- a/eth/protocols/snap/syncv2.go +++ b/eth/protocols/snap/syncv2.go @@ -26,6 +26,7 @@ import ( "math/rand" "sort" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -261,13 +262,34 @@ type storageTaskV2 struct { done bool // Flag whether the task can be removed } +// syncPhase tracks how far a snap/2 sync has progressed for the journaled +// pivot. The phases are strictly ordered: each one implies all previous +// ones have finished. +type syncPhase uint8 + +const ( + // phaseDownload covers the flat state (account, storage, bytecode) + // download. The requests target the pivot root, which remote peers + // only serve while it is recent, so the pivot must keep tracking the + // chain head (see FrozenPivot). + phaseDownload syncPhase = iota + + // phaseGenerate covers the local trie generation after the download + // has completed. It targets the exact pivot root it was started with, + // so pivot updates are refused from here on. + phaseGenerate + + // phaseComplete means the sync ran to completion for the pivot. + phaseComplete +) + // syncProgressV2 is a database entry to allow suspending and resuming a snapshot state // sync. Opposed to full and fast sync, there is no way to restart a suspended // snap sync without prior knowledge of the suspension point. type syncProgressV2 struct { - Pivot *types.Header // Pivot header being synced (for pivot move and reorg detection) - Tasks []*accountTaskV2 // The suspended account tasks (contract tasks within) - Complete bool // True once sync ran to completion for Pivot + Pivot *types.Header // Pivot header being synced (for pivot move and reorg detection) + Tasks []*accountTaskV2 // The suspended account tasks (contract tasks within) + Phase syncPhase // Phase is how far the sync has progressed for Pivot // Status report during syncing phase AccountSynced uint64 // Number of accounts downloaded @@ -313,7 +335,7 @@ type SyncPeerV2 interface { // syncerV2 is an Ethereum account and storage trie syncer based on the snap // protocol. It downloads all accounts, storage slots, and bytecodes from // remote peers as flat state, applies BAL diffs on pivot moves, -// and triggers a final trie rebuild once flat state is consistent. +// and triggers a final trie generation once flat state is consistent. // // Every network request has a variety of failure events: // - The peer disconnects after task assignment, failing to send the request @@ -322,14 +344,12 @@ type SyncPeerV2 interface { // - The peer delivers a stale response after a previous timeout // - The peer delivers a refusal to serve the requested state type syncerV2 struct { - db ethdb.Database // Database to store the trie nodes into (and dedup) - scheme string // Node scheme used in node database - - pivot *types.Header // Current pivot header being synced (lock needed) - previousPivot *types.Header // Pivot from previous sync run (for pivot move detection) - complete bool // Whether the persisted progress was a completed sync - tasks []*accountTaskV2 // Current account task set being synced - update chan struct{} // Notification channel for possible sync progression + db ethdb.Database // Database to store the trie nodes into (and dedup) + scheme string // Node scheme used in node database + pivot *types.Header // Current pivot header being synced (lock needed) + phase atomic.Uint32 // Current syncPhase; atomic so phase transitions are visible across goroutines + tasks []*accountTaskV2 // Current account task set being synced + update chan struct{} // Notification channel for possible sync progression peers map[string]SyncPeerV2 // Currently active peers to download from peerJoin *event.Feed // Event feed to react to peers joining @@ -370,7 +390,7 @@ type syncerV2 struct { // newSyncerV2 creates a new snapshot syncer to download the Ethereum state over the // snap protocol. func newSyncerV2(db ethdb.Database, scheme string) *syncerV2 { - return &syncerV2{ + s := &syncerV2{ db: db, scheme: scheme, @@ -393,6 +413,24 @@ func newSyncerV2(db ethdb.Database, scheme string) *syncerV2 { extProgress: new(syncProgressV2), } + if raw := rawdb.ReadSnapshotSyncStatus(db); len(raw) > 0 && raw[0] == syncProgressVersion { + var progress syncProgressV2 + if err := json.Unmarshal(raw[1:], &progress); err == nil { + s.pivot = progress.Pivot + s.setPhase(progress.Phase) + } + } + return s +} + +// getPhase returns the current sync phase. +func (s *syncerV2) getPhase() syncPhase { + return syncPhase(s.phase.Load()) +} + +// setPhase moves the sync to the given phase. +func (s *syncerV2) setPhase(phase syncPhase) { + s.phase.Store(uint32(phase)) } // Register injects a new data source into the syncer's peerset. @@ -452,19 +490,17 @@ func (s *syncerV2) Unregister(id string) error { // Sync starts (or resumes a previous) sync cycle to iterate over a state trie // with the given pivot header and reconstruct the nodes based on the snapshot // leaves. -func (s *syncerV2) Sync(pivot *types.Header, cancel chan struct{}) error { - if pivot == nil { +func (s *syncerV2) Sync(target *types.Header, cancel chan struct{}) error { + if target == nil { return errors.New("snap sync: pivot header is nil") } s.lock.Lock() - s.pivot = pivot - s.previousPivot = nil // loadSyncStatus overwrites when resuming from persisted progress s.statelessPeers = make(map[string]struct{}) s.lock.Unlock() if s.startTime.IsZero() { s.startTime = time.Now() } - root := pivot.Root + root := target.Root // Retrieve the previous sync status from DB. If there's no persisted // status, sync is either fresh or already complete. @@ -473,20 +509,24 @@ func (s *syncerV2) Sync(pivot *types.Header, cancel chan struct{}) error { // isPivotChanged is true when we have prior progress against a different // pivot. That means we need to roll forward via catchUp, or wipe and // restart if the prior pivot was reorged out. - isPivotChanged := s.previousPivot != nil && s.previousPivot.Hash() != s.pivot.Hash() + s.lock.RLock() + prevPivot := s.pivot + s.lock.RUnlock() + isPivotChanged := prevPivot != nil && prevPivot.Hash() != target.Hash() // Skip if we've already finished syncing this pivot. - if !isPivotChanged && s.complete { + if !isPivotChanged && s.getPhase() == phaseComplete { log.Info("Snap sync already complete for this pivot", "root", root) return nil } - // We're committing to running this sync. Clear the complete flag so a - // mid-run save (on cancel or error) doesn't persist a stale Complete=true - // status from a prior pivot. - s.lock.Lock() - s.complete = false - s.lock.Unlock() + // We're committing to running this sync. Demote a completed phase so a + // mid-run save (on cancel or error) doesn't persist a stale complete + // status from a prior pivot. The download remains done, only the trie + // generation must be redone against the new pivot. + if s.getPhase() == phaseComplete { + s.setPhase(phaseGenerate) + } defer func() { // Whether sync completed or not, disregard any future packets @@ -515,24 +555,25 @@ func (s *syncerV2) Sync(pivot *types.Header, cancel chan struct{}) error { // progress is still usable. If yes, roll forward via BAL catch-up. If not, // wipe everything and restart fresh. if isPivotChanged { - if isPivotReorged(s.db, s.previousPivot, s.pivot) { - log.Warn("Persisted progress unusable, restarting snap sync from scratch", - "number", s.previousPivot.Number, "oldHash", s.previousPivot.Hash()) + if isPivotReorged(s.db, prevPivot, target) { + log.Warn("Restarting snap sync from scratch", "oldnumber", prevPivot.Number, "oldHash", prevPivot.Hash()) s.resetSyncState() - } else if err := s.catchUp(cancel); err != nil { - return err + } else { + // A canonical pivot move past a frozen pivot should be impossible: + // the downloader both refuses moves (FrozenPivot) and resumes new + // cycles against the frozen header itself. Reaching this branch + // frozen indicates a bug on the downloader side; roll the flat + // state forward defensively and regenerate. + if s.getPhase() >= phaseGenerate { + log.Warn("Frozen pivot moved unexpectedly, rolling forward", "frozen", prevPivot.Number, "new", target.Number) + } + if err := s.catchUp(target, cancel); err != nil { + return err + } } } - - // Pin previousPivot to the current pivot before downloadState runs. - // This is what saveSyncStatus persists. If the download is interrupted - // and the next Sync gets a different pivot, this is how isPivotReorged - // recognizes the partial flat state belongs to the old pivot. Without - // it, isPivotReorged sees nil, skips the reorg branch, and downloadState - // would resume from the persisted task markers but mix the old pivot's - // already-downloaded accounts with the new pivot's data. s.lock.Lock() - s.previousPivot = s.pivot + s.pivot = target s.lock.Unlock() log.Info("Starting state download", "root", root) @@ -541,21 +582,47 @@ func (s *syncerV2) Sync(pivot *types.Header, cancel chan struct{}) error { } log.Info("State download complete", "root", root) + // Entering the generation phase makes the downloader stop moving the + // pivot (see FrozenPivot) until the pivot block is committed. The phase + // is persisted right away so the freeze also holds across a restart, + // before the generation has had a chance to finish. + if s.getPhase() < phaseGenerate { + s.setPhase(phaseGenerate) + s.saveSyncStatus() + } + log.Info("Starting trie generation", "root", root) + batch := s.db.NewBatch() + s.resetTrienodes(batch) + if err := batch.Write(); err != nil { + return err + } if _, err := triedb.GenerateTrie(s.db, s.scheme, root, cancel); err != nil { return err } log.Info("Trie generation complete", "root", root) - // Mark sync complete. The deferred saveSyncStatus persists this with - // Complete=true so a follow-up Sync call for the same pivot can skip - // the work entirely. - s.lock.Lock() - s.complete = true - s.lock.Unlock() + // Mark sync complete. The deferred saveSyncStatus persists this so a + // follow-up Sync call for the same pivot can skip the work entirely. + s.setPhase(phaseComplete) return nil } +// FrozenPivot returns the pivot header the sync is bound to, or nil while +// the pivot may still move freely. The pivot freezes once the state +// download completes. The remaining work (trie generation) and the pivot +// commit is purely local and targets the exact pivot root the download +// finished with, so from that point on the downloader must neither move the +// pivot nor start a new cycle against a different one. +func (s *syncerV2) FrozenPivot() *types.Header { + if s.getPhase() < phaseGenerate { + return nil + } + s.lock.RLock() + defer s.lock.RUnlock() + return s.pivot +} + // download runs the bulk flat-state download. It fetches // account ranges, storage slots, and bytecodes, writing flat state to disk. func (s *syncerV2) downloadState(cancel chan struct{}) error { @@ -660,10 +727,10 @@ func isPivotReorged(db ethdb.Database, prev, curr *types.Header) bool { // catchUp runs the BAL catch-up. When the pivot has moved, it fetches BALs // for the gap blocks, verifies them against block headers, and applies the // diffs to roll flat state forward. -func (s *syncerV2) catchUp(cancel chan struct{}) error { +func (s *syncerV2) catchUp(target *types.Header, cancel chan struct{}) error { s.lock.RLock() - from := s.previousPivot.Number.Uint64() + 1 - to := s.pivot.Number.Uint64() + from := s.pivot.Number.Uint64() + 1 + to := target.Number.Uint64() s.lock.RUnlock() log.Info("Starting BAL catch-up", "from", from, "to", to, "blocks", to-from+1) @@ -720,7 +787,7 @@ func (s *syncerV2) catchUp(cancel chan struct{}) error { // Persist incremental progress so a crash mid-catchUp can resume // from the next unapplied block. s.lock.Lock() - s.previousPivot = headers[hash] + s.pivot = headers[hash] s.lock.Unlock() s.saveSyncStatusWithDB(batch) @@ -952,8 +1019,8 @@ func (s *syncerV2) loadSyncStatus() { } task.StorageCompleted = nil } - s.previousPivot = progress.Pivot - s.complete = progress.Complete + s.pivot = progress.Pivot + s.setPhase(progress.Phase) s.accountSynced = progress.AccountSynced s.accountBytes = progress.AccountBytes s.bytecodeSynced = progress.BytecodeSynced @@ -1005,6 +1072,16 @@ func deleteRange(batch ethdb.Batch, prefix []byte) { } } +// resetTrienodes wipes all persisted trienodes if the path scheme is used. +// It's a defensive operation, ensuring all the leftover trie nodes are cleared +// before the new generation cycle. +func (s *syncerV2) resetTrienodes(batch ethdb.Batch) { + if s.scheme == rawdb.PathScheme { + deleteRange(batch, rawdb.TrieNodeAccountPrefix) + deleteRange(batch, rawdb.TrieNodeStoragePrefix) + } +} + // resetSyncState wipes all persisted snap-sync data (sync status, account // and storage snapshots) and re-initializes in-memory state with a fresh // chunking of the account hash range. @@ -1013,14 +1090,15 @@ func (s *syncerV2) resetSyncState() { rawdb.DeleteSnapshotSyncStatus(batch) deleteRange(batch, rawdb.SnapshotAccountPrefix) deleteRange(batch, rawdb.SnapshotStoragePrefix) + s.resetTrienodes(batch) batch.Write() s.lock.Lock() defer s.lock.Unlock() s.tasks = nil - s.previousPivot = nil - s.complete = false + s.pivot = nil + s.setPhase(phaseDownload) s.accountSynced, s.accountBytes = 0, 0 s.bytecodeSynced, s.bytecodeBytes = 0, 0 s.storageSynced, s.storageBytes = 0, 0 @@ -1069,9 +1147,9 @@ func (s *syncerV2) saveSyncStatusWithDB(db ethdb.KeyValueWriter) { } // Store the actual progress markers. progress := &syncProgressV2{ - Pivot: s.previousPivot, + Pivot: s.pivot, Tasks: s.tasks, - Complete: s.complete, + Phase: s.getPhase(), AccountSynced: s.accountSynced, AccountBytes: s.accountBytes, BytecodeSynced: s.bytecodeSynced, @@ -2028,7 +2106,7 @@ func (s *syncerV2) forwardAccountTask(task *accountTaskV2) { // Persist the received account segments. These flat state maybe // outdated during the sync, but it can be fixed later during the - // trie rebuild. + // trie generation. oldAccountBytes := s.accountBytes batch := ethdb.HookedBatch{ diff --git a/eth/protocols/snap/syncv2_test.go b/eth/protocols/snap/syncv2_test.go index d3c1b61800..aed7aac5f2 100644 --- a/eth/protocols/snap/syncv2_test.go +++ b/eth/protocols/snap/syncv2_test.go @@ -547,6 +547,65 @@ func testSyncV2(t *testing.T, scheme string) { verifyAdoptedSyncedState(scheme, syncer.db, sourceAccountTrie.Hash(), elems, t) } +// TestSyncV2FrozenPivot checks the pivot freeze signal around the sync +// lifecycle. The pivot is unfrozen while flat state is downloading, frozen +// once the download completes, stays frozen after the sync returns so the +// downloader resumes against it until the pivot block is committed, and +// unfreezes again after a state reset. +func TestSyncV2FrozenPivot(t *testing.T) { + t.Parallel() + testSyncV2FrozenPivot(t, rawdb.HashScheme) + testSyncV2FrozenPivot(t, rawdb.PathScheme) +} + +func testSyncV2FrozenPivot(t *testing.T, scheme string) { + var ( + once sync.Once + cancel = make(chan struct{}) + term = func() { once.Do(func() { close(cancel) }) } + ) + nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, scheme) + + source := newTestPeerV2("source", t, term) + source.accountTrie = sourceAccountTrie.Copy() + source.accountValues = elems + + syncer := setupSyncerV2(nodeScheme, source) + pivot := mkPivot(0, sourceAccountTrie.Hash()) + + // The handler runs while account ranges are still being served, so it + // can observe the signal mid download. + source.accountRequestV2Handler = func(p *testPeerV2, requestId uint64, root common.Hash, origin common.Hash, limit common.Hash, cap int) error { + if syncer.FrozenPivot() != nil { + t.Error("pivot frozen during flat state download") + } + return defaultAccountRequestHandlerV2(p, requestId, root, origin, limit, cap) + } + if syncer.FrozenPivot() != nil { + t.Fatal("pivot frozen before sync started") + } + if err := syncer.Sync(pivot, cancel); err != nil { + t.Fatalf("sync failed: %v", err) + } + if frozen := syncer.FrozenPivot(); frozen == nil || frozen.Hash() != pivot.Hash() { + t.Fatal("pivot not frozen at the synced header after download completed") + } + // A restart must not lose the freeze: a fresh syncer instance on the same + // database derives it from the persisted journal, before any Sync call. + restarted := newSyncerV2(syncer.db, nodeScheme) + if frozen := restarted.FrozenPivot(); frozen == nil || frozen.Hash() != pivot.Hash() { + t.Fatal("pivot freeze lost after restart") + } + syncer.resetSyncState() + if syncer.FrozenPivot() != nil { + t.Fatal("pivot still frozen after state reset") + } + // The reset deletes the journal, so a restarted instance is unfrozen too. + if restarted := newSyncerV2(syncer.db, nodeScheme); restarted.FrozenPivot() != nil { + t.Fatal("pivot still frozen after restart following a state reset") + } +} + // verifyAdoptedSyncedState exercises the snap/2 completion contract end-to-end: // after a real sync, opening a fresh triedb and calling AdoptSyncedState must // (a) succeed and (b) leave flat-state reads serving immediately, with no @@ -1326,9 +1385,9 @@ func TestIsPivotReorged(t *testing.T) { // canonical header at block 100 has a different hash. Sync is then called with // a new pivot at the same height. // -// If isPivotReorged works, loadSyncStatus restores previousPivot, the check -// flags it as reorged, resetSyncState clears previousPivot, catchUp is -// skipped, and the fresh download proceeds to completion. +// If isPivotReorged works, loadSyncStatus restores the persisted pivot, the +// check flags it as reorged, resetSyncState clears it, catchUp is skipped, +// and the fresh download proceeds to completion. // // If detection doesn't fire, the pivot-move check would call catchUp with // from = 101 and to = 100 — the inverted-range guard surfaces that as an @@ -1347,10 +1406,8 @@ func TestSyncDetectsPivotReorged(t *testing.T) { // and non-zero counter so the reset path has something to clean up. orphanPivot := mkPivot(100, common.HexToHash("0xdead")) seed := newSyncerV2(db, nodeScheme) - // previousPivot reflects where flat state matches and it is what - // saveSyncStatus persists. Set it to simulate a prior sync reaching - // orphanPivot. - seed.previousPivot = orphanPivot + // pivot reflects where flat state matches and it is what saveSyncStatus + // persists. Set it to simulate a prior sync reaching orphanPivot. seed.pivot = orphanPivot seed.accountSynced = 42 seed.tasks = []*accountTaskV2{{ @@ -1391,14 +1448,14 @@ func TestSyncDetectsPivotReorged(t *testing.T) { if err := syncer.Sync(newPivot, cancel); err != nil { t.Fatalf("sync failed (reorg detection likely broken): %v", err) } - // After successful completion, status should be marked Complete=true + // After successful completion, status should reach the complete phase // against the new (canonical) pivot. loader := newSyncerV2(db, nodeScheme) loader.loadSyncStatus() - if !loader.complete { - t.Fatal("sync status should be marked Complete=true after successful completion") + if loader.getPhase() != phaseComplete { + t.Fatal("sync status should reach the complete phase after successful completion") } - if loader.previousPivot == nil || loader.previousPivot.Hash() != newPivot.Hash() { + if loader.pivot == nil || loader.pivot.Hash() != newPivot.Hash() { t.Fatalf("expected persisted pivot to match new pivot") } if data := rawdb.ReadAccountSnapshot(db, orphanAccountHash); len(data) != 0 { @@ -1445,9 +1502,8 @@ func testInterruptedDownloadRecovery(t *testing.T, scheme string) { syncer1.Register(src1) src1.remote = syncer1 pivot := mkPivot(0, root) - syncer1.pivot = pivot - syncer1.previousPivot = pivot // Sync sets this before downloadState syncer1.loadSyncStatus() + syncer1.pivot = pivot // Sync pins this before downloadState syncer1.downloadState(cancel1) // Save progress @@ -1483,9 +1539,8 @@ func testInterruptedDownloadRecovery(t *testing.T, scheme string) { syncer2.Register(src2) src2.remote = syncer2 pivot2 := mkPivot(0, root) - syncer2.pivot = pivot2 - syncer2.previousPivot = pivot2 // Sync sets this before downloadState syncer2.loadSyncStatus() + syncer2.pivot = pivot2 // Sync pins this before downloadState if err := syncer2.downloadState(cancel2); err != nil { t.Fatalf("resumed download failed: %v", err) } @@ -1499,10 +1554,10 @@ func testInterruptedDownloadRecovery(t *testing.T, scheme string) { } // TestSyncPersistsPivotDuringDownload verifies that after a fresh Sync is -// interrupted mid-download, the persisted previousPivot equals the current -// pivot (not nil). Without this, a follow-up Sync at a different pivot -// would not see that the partial flat state belongs to the old pivot, and -// would mix old-pivot accounts with new-pivot data. +// interrupted mid-download, the persisted pivot equals the current pivot +// (not nil). Without this, a follow-up Sync at a different pivot would not +// see that the partial flat state belongs to the old pivot, and would mix +// old-pivot accounts with new-pivot data. func TestSyncPersistsPivotDuringDownload(t *testing.T) { t.Parallel() nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, rawdb.HashScheme) @@ -1532,15 +1587,15 @@ func TestSyncPersistsPivotDuringDownload(t *testing.T) { // Sync should be interrupted by the cancel after a couple of responses. _ = syncer.Sync(pivot, cancel) - // Persisted previousPivot must equal the pivot, so a follow-up Sync at a - // different pivot can recognize the partial flat state belongs to this one. + // Persisted pivot must equal the pivot, so a follow-up Sync at a different + // pivot can recognize the partial flat state belongs to this one. loader := newSyncerV2(db, nodeScheme) loader.loadSyncStatus() - if loader.previousPivot == nil { - t.Fatal("expected persisted previousPivot to be set after interrupted download, got nil") + if loader.pivot == nil { + t.Fatal("expected persisted pivot to be set after interrupted download, got nil") } - if loader.previousPivot.Hash() != pivot.Hash() { - t.Errorf("persisted previousPivot mismatch: got %v, want %v", loader.previousPivot.Hash(), pivot.Hash()) + if loader.pivot.Hash() != pivot.Hash() { + t.Errorf("persisted pivot mismatch: got %v, want %v", loader.pivot.Hash(), pivot.Hash()) } } @@ -1702,7 +1757,7 @@ func testPivotMovement(t *testing.T, scheme string, pivotMoves int) { } // TestCatchUpPersistsIncrementally verifies that catchUp updates and persists -// previousPivot after each successfully applied BAL. If a later block in the +// the pivot after each successfully applied BAL. If a later block in the // gap fails to apply, the persisted state reflects the last successful block, // so a follow-up Sync can resume from there rather than reapplying everything. func TestCatchUpPersistsIncrementally(t *testing.T) { @@ -1776,7 +1831,7 @@ func testCatchUpPersistsIncrementally(t *testing.T, scheme string) { blocks[i] = balBlock{header: header, bal: buf.Bytes()} } - // First sync: complete sync to A so persisted state has previousPivot=A, + // First sync: complete sync to A so persisted state has pivot=A, // flat state covers all accounts. { var ( @@ -1826,22 +1881,22 @@ func testCatchUpPersistsIncrementally(t *testing.T, scheme string) { t.Fatal("expected Sync to fail when applyAccessList hits corrupt flat state") } - // Persisted previousPivot should now reflect the last successfully applied + // Persisted pivot should now reflect the last successfully applied // block (A+2). Without per-iteration saves, it would still be at A. loader := newSyncerV2(db, nodeScheme) loader.loadSyncStatus() - if loader.previousPivot == nil { - t.Fatal("expected persisted previousPivot to be set after partial catchUp") + if loader.pivot == nil { + t.Fatal("expected persisted pivot to be set after partial catchUp") } wantHash := blocks[1].header.Hash() - if loader.previousPivot.Hash() != wantHash { - t.Errorf("persisted previousPivot mismatch after partial catchUp: got %v, want %v (block A+2)", - loader.previousPivot.Hash(), wantHash) + if loader.pivot.Hash() != wantHash { + t.Errorf("persisted pivot mismatch after partial catchUp: got %v, want %v (block A+2)", + loader.pivot.Hash(), wantHash) } } // TestSyncStatusMarkedCompleteAfterCompletion verifies that after a full sync -// completes, the persisted sync status has Complete=true. This lets a +// completes, the persisted sync status reaches the complete phase. This lets a // subsequent Sync call distinguish "already done" from "fresh node" and skip. func TestSyncStatusMarkedCompleteAfterCompletion(t *testing.T) { t.Parallel() @@ -1870,13 +1925,13 @@ func testSyncStatusMarkedCompleteAfterCompletion(t *testing.T, scheme string) { } // After successful sync, persisted status should be present with - // Complete=true and the pivot we synced to. + // the complete phase and the pivot we synced to. loader := newSyncerV2(syncer.db, nodeScheme) loader.loadSyncStatus() - if !loader.complete { - t.Fatal("expected persisted status to have Complete=true after successful sync") + if loader.getPhase() != phaseComplete { + t.Fatal("expected persisted status to reach the complete phase after successful sync") } - if loader.previousPivot == nil || loader.previousPivot.Hash() != pivot.Hash() { + if loader.pivot == nil || loader.pivot.Hash() != pivot.Hash() { t.Fatalf("expected persisted pivot to match synced pivot") } } @@ -1906,7 +1961,7 @@ func TestSyncSkipsIfAlreadyComplete(t *testing.T) { t.Fatalf("first sync failed: %v", err) } - // Wipe the flat state. The persisted status (with Complete=true) stays. + // Wipe the flat state. The persisted status (in the complete phase) stays. if err := syncer.db.DeleteRange(rawdb.SnapshotAccountPrefix, []byte{rawdb.SnapshotAccountPrefix[0] + 1}); err != nil { t.Fatalf("failed to wipe account snapshot: %v", err) } @@ -1922,17 +1977,17 @@ func TestSyncSkipsIfAlreadyComplete(t *testing.T) { } } -// TestInterruptedRebuildRecovery verifies that if sync is interrupted after -// download completes but before trie rebuild finishes, the next Sync() call -// re-runs the download (which completes immediately) and rebuild. -func TestInterruptedRebuildRecovery(t *testing.T) { +// TestInterruptedGenerationRecovery verifies that if sync is interrupted after +// download completes but before trie generation finishes, the next Sync() call +// re-runs the download (which completes immediately) and generation. +func TestInterruptedGenerationRecovery(t *testing.T) { t.Parallel() nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, rawdb.HashScheme) root := sourceAccountTrie.Hash() // First run: complete download, save status, simulate interruption - // before rebuild by calling downloadState() directly + // before generation by calling downloadState() directly var ( once1 sync.Once cancel1 = make(chan struct{}) @@ -1946,9 +2001,8 @@ func TestInterruptedRebuildRecovery(t *testing.T) { syncer1.Register(src1) src1.remote = syncer1 pivot := mkPivot(0, root) - syncer1.pivot = pivot - syncer1.previousPivot = pivot // Sync sets this before downloadState syncer1.loadSyncStatus() + syncer1.pivot = pivot // Sync pins this before downloadState if err := syncer1.downloadState(cancel1); err != nil { t.Fatalf("download failed: %v", err) @@ -1960,11 +2014,11 @@ func TestInterruptedRebuildRecovery(t *testing.T) { syncer1.cleanAccountTasks() syncer1.saveSyncStatus() - // Status should exist (rebuild hasn't run yet) + // Status should exist (generation hasn't run yet) if rawdb.ReadSnapshotSyncStatus(db) == nil { t.Fatal("sync status should exist after download") } - // Second run: full Sync should detect tasks are done, run rebuild + // Second run: full Sync should detect tasks are done, run generation var ( once2 sync.Once cancel2 = make(chan struct{}) @@ -1980,11 +2034,16 @@ func TestInterruptedRebuildRecovery(t *testing.T) { if err := syncer2.Sync(mkPivot(0, root), cancel2); err != nil { t.Fatalf("resumed sync failed: %v", err) } - // After rebuild completes, status should be marked Complete=true. + // The resumed run re-arms the pivot freeze once its no-op download + // completes, the downloader relies on it until the pivot block commits. + if syncer2.FrozenPivot() == nil { + t.Fatal("pivot not frozen after resumed sync") + } + // After generation completes, status should reach the complete phase. loader := newSyncerV2(db, nodeScheme) loader.loadSyncStatus() - if !loader.complete { - t.Fatal("sync status should be marked Complete=true after rebuild completes") + if loader.getPhase() != phaseComplete { + t.Fatal("sync status should reach the complete phase after generation completes") } } @@ -2462,7 +2521,7 @@ func TestCatchUpRetriesOnBadBAL(t *testing.T) { // makeStorageTrieFromSlots builds a storage trie for owner from raw slot // key->value pairs, using the exact on-disk encoding the flat snapshot and the -// trie rebuild expect: each leaf is keyed by keccak256(slotKey) and its value is +// trie generation expect: each leaf is keyed by keccak256(slotKey) and its value is // rlp(TrimLeftZeroes(value)). Zero-valued slots are skipped (an unset slot has // no leaf). It returns the storage root, the dirty node set, and the sorted // snapshot leaves (which a test peer serves verbatim). @@ -2529,12 +2588,12 @@ func makeStateWithStorageContract(scheme string, plain []*kv, contractAddr commo // slot, an overwrite of an existing slot, a write of zero (deletion), and a // multi-tx write where the post-block value wins. // -// It fully syncs pivot A (flat-state download + trie rebuild), then moves the +// It fully syncs pivot A (flat-state download + trie generation), then moves the // pivot to A+1. The move triggers catchUp, which fetches the A+1 BAL, applies -// the storage diffs to the flat state, and rebuilds the trie. The rebuild +// the storage diffs to the flat state, and generates the trie. The generation // verifies the recomputed root against the pivot's expected post-catch-up root, // so a successful Sync proves the storage mutations were applied in the exact -// encoding the trie rebuild consumes. verifyTrie re-walks the result as an +// encoding the trie generation consumes. verifyTrie re-walks the result as an // independent confirmation. func TestCatchUpAppliesStorageBALs(t *testing.T) { t.Parallel() @@ -2620,7 +2679,7 @@ func testCatchUpAppliesStorageBALs(t *testing.T, scheme string) { // Sync, so the follow-up Sync's reorg check sees A as still-canonical and // runs catchUp instead of resetting. The A+1 header carries the BAL hash // (verified during catch-up) and the expected post-catch-up state root - // (verified by the trie rebuild). + // (verified by the trie generation). db := rawdb.NewMemoryDatabase() numA := uint64(128) emptyH := common.Hash{} @@ -2644,7 +2703,7 @@ func testCatchUpAppliesStorageBALs(t *testing.T, scheme string) { rawdb.WriteHeader(db, hdrB) rawdb.WriteCanonicalHash(db, hdrB.Hash(), numA+1) - // Sync 1: full flat-state download + trie rebuild against pivot A. + // Sync 1: full flat-state download + trie generation against pivot A. { var ( once sync.Once @@ -2665,7 +2724,7 @@ func testCatchUpAppliesStorageBALs(t *testing.T, scheme string) { } close(done) } - // Sanity: the rebuilt trie for pivot A is complete and matches rootA. This + // Sanity: the generated trie for pivot A is complete and matches rootA. This // also confirms the test fixture itself is internally consistent. verifyTrie(scheme, db, rootA, t) @@ -2690,6 +2749,12 @@ func testCatchUpAppliesStorageBALs(t *testing.T, scheme string) { if err := syncer.Sync(hdrB, cancel); err != nil { t.Fatalf("pivot A+1 catch-up sync failed: %v", err) } + // The freeze must re-arm on a pivot-moved cycle too, the downloader + // relies on it from download completion until commit, and it must + // point at the new pivot the catch-up rolled forward to. + if frozen := syncer.FrozenPivot(); frozen == nil || frozen.Hash() != hdrB.Hash() { + t.Fatal("pivot not frozen at the new header after catch-up sync") + } close(done) } diff --git a/triedb/generate.go b/triedb/generate.go index b6906f4c63..8d5a128aa1 100644 --- a/triedb/generate.go +++ b/triedb/generate.go @@ -337,7 +337,7 @@ func hashRanges(total int) [][2]common.Hash { return ranges } -// GenerateTrie rebuilds all tries (storage + account) from flat snapshot +// GenerateTrie builds all tries (storage + account) from flat snapshot // data in the database. The account hash space is partitioned into 16 // slices aligned with the first-nibble branching of the MPT root. Each // partition is processed by its own goroutine, which walks its slice, @@ -346,10 +346,8 @@ func hashRanges(total int) [][2]common.Hash { // trie. Once every partition has produced its subtree root, the top-level // branch is assembled and its hash verified against the expected root. // -// Resume: on entry, any partition that has a "done" marker from a -// previous run is skipped. Its subtree blob is read from the marker -// and handed to assembleRoot directly. On a mid-run crash, only the -// in-flight partition(s) are redone. +// Generation is all or nothing: an interrupted run leaves no resume +// state and the next run builds every partition from scratch. func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-chan struct{}) (GenerateStats, error) { var ( start = time.Now() @@ -366,9 +364,8 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c go tickProgress(progressDone, start, &scanned, &updated, &progress) defer close(progressDone) - // For each partition, either skip (prior done marker found) or run - // it. Prior runs can leave the partition's raw root blob in the done - // marker. We recover it here so assembleRoot has everything it needs. + // Run every partition concurrently, each producing the subtree root + // blob that assembleRoot needs. var ( ranges = hashRanges(numPartitions) eg, ctx = errgroup.WithContext(context.Background()) @@ -376,11 +373,6 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c for i, r := range ranges { partition := byte(i) rangeStart, rangeEnd := r[0], r[1] - if blob, ok := rawdb.ReadGenerateTriePartitionDone(db, partition); ok { - partitionBlobs[partition] = blob - progress[partition].Store(partitionFinished) - continue - } eg.Go(func() error { start := time.Now() blob, err := generatePartition(ctx, cancel, db, scheme, partition, rangeStart, rangeEnd, &scanned, &updated, &deleted, &progress[partition]) @@ -391,11 +383,6 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c progress[partition].Store(partitionFinished) partitionBlobs[partition] = blob - - // Record completion only after the partition's batch has - // flushed inside generatePartition, so this marker appears - // on disk only when every write the partition did is durable. - rawdb.WriteGenerateTriePartitionDone(db, partition, blob) return nil }) } @@ -405,9 +392,8 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c return GenerateStats{}, err } - // Assemble the top-level root from the partition blobs, verify it - // matches the expected root, and clear all partition markers on - // success. + // Assemble the top-level root from the partition blobs and verify it + // matches the expected root. got, err := assembleRoot(db, scheme, partitionBlobs) if err != nil { return GenerateStats{}, fmt.Errorf("assemble root: %w", err) @@ -415,15 +401,6 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c if got != root { return GenerateStats{}, fmt.Errorf("state root mismatch: got %x, want %x", got, root) } - - // Clear the partition progress marker, ending the generation process. - batch := db.NewBatch() - for i := range numPartitions { - rawdb.DeleteGenerateTriePartitionDone(batch, byte(i)) - } - if err := batch.Write(); err != nil { - return GenerateStats{}, fmt.Errorf("clear partition markers: %w", err) - } log.Info("Generated state trie", "scanned", scanned.Load(), "updated", updated.Load(), "dangling-slots", deleted.Load(), "elapsed", common.PrettyDuration(time.Since(start))) return GenerateStats{ Scanned: scanned.Load(), diff --git a/triedb/generate_test.go b/triedb/generate_test.go index 2a7d98bfaf..bbbf6e14bc 100644 --- a/triedb/generate_test.go +++ b/triedb/generate_test.go @@ -370,97 +370,6 @@ func TestGenerateTrieOrphanStorage(t *testing.T) { } } -// TestGenerateTriePartialResume proves that the resume path actually -// fires when a partition's done marker is present. -func TestGenerateTriePartialResume(t *testing.T) { - // Build the account set. Empty storage keeps the test focused on the - // account-trie resume path. - const n = 200 - accounts := make([]testAccount, 0, n) - for i := 0; i < n; i++ { - addr := common.BytesToAddress([]byte{byte(i >> 8), byte(i)}) - hash := crypto.Keccak256Hash(addr[:]) - accounts = append(accounts, testAccount{ - hash: hash, - account: types.StateAccount{ - Nonce: uint64(i), - Balance: uint256.NewInt(uint64(i + 1)), - Root: types.EmptyRootHash, - CodeHash: types.EmptyCodeHash.Bytes(), - }, - }) - } - expectedRoot := buildExpectedRoot(t, accounts) - - for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} { - t.Run(scheme, func(t *testing.T) { - db := rawdb.NewMemoryDatabase() - - // Step 1: write the account snapshots for this run. - for _, a := range accounts { - rawdb.WriteAccountSnapshot(db, a.hash, types.SlimAccountRLP(a.account)) - } - - // Step 2: run every partition once to populate trie nodes on disk - // and capture each partition's raw root blob. - var ( - scanned atomic.Int64 - updated atomic.Int64 - deleted atomic.Int64 - ) - ranges := hashRanges(numPartitions) - blobs := make([][]byte, numPartitions) - for i, r := range ranges { - var pos atomic.Uint64 - blob, err := generatePartition(context.Background(), nil, db, scheme, byte(i), r[0], r[1], &scanned, &updated, &deleted, &pos) - if err != nil { - t.Fatalf("pre-run partition %d: %v", i, err) - } - blobs[i] = blob - } - - // Step 3: pre-seed done markers for even partitions only. - for i := 0; i < numPartitions; i++ { - if i%2 == 0 { - rawdb.WriteGenerateTriePartitionDone(db, byte(i), blobs[i]) - } - } - - // Step 4: delete flat-state account snapshots for every account that - // lives in an even partition. After this, rerunning generatePartition for - // an even partition would find no accounts and produce a nil blob, - // so a correct final root requires the resume path. - numDeleted := 0 - for _, a := range accounts { - if (a.hash[0]>>4)%2 == 0 { - rawdb.DeleteAccountSnapshot(db, a.hash) - numDeleted++ - } - } - if numDeleted == 0 { - t.Fatal("test setup failure: no accounts fell in even partitions") - } - - // Step 5: run GenerateTrie. Success implies resume actually consulted - // the markers. Without it, even partitions would yield nil blobs and - // the root check inside GenerateTrie would fail. - if _, err := GenerateTrie(db, scheme, expectedRoot, nil); err != nil { - t.Fatalf("partial-resume GenerateTrie failed: %v", err) - } - - // All markers cleared on success. - for i := 0; i < numPartitions; i++ { - if _, ok := rawdb.ReadGenerateTriePartitionDone(db, byte(i)); ok { - t.Errorf("partition %d marker not cleared after successful resume", i) - } - } - if scheme == rawdb.PathScheme { - assertCanonicalNodes(t, db, accounts) - } - }) - } -} - // TestHashRanges checks that hashRanges fully and contiguously covers the // 256-bit hash space, with the last range absorbing the rounding remainder. func TestHashRanges(t *testing.T) {