eth/downloader, eth/protocols/snap: freeze pivot once state is downloaded (#35155)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

This commit is contained in:
Jonny Rhea 2026-06-15 03:09:41 -05:00 committed by GitHub
parent 23483010a4
commit e2164cc78c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 317 additions and 311 deletions

View file

@ -209,48 +209,6 @@ func WriteSnapshotSyncStatus(db ethdb.KeyValueWriter, status []byte) {
} }
} }
// ReadGenerateTriePartitionDone returns the raw subtree root blob for a
// partition that has previously completed.
func ReadGenerateTriePartitionDone(db ethdb.KeyValueReader, partition byte) ([]byte, bool) {
data, err := db.Get(generateTriePartitionDoneKey(partition))
if err != nil {
return nil, false
}
if len(data) == 0 {
return nil, false
}
switch data[0] {
case 0x00:
// Partition is done and it is empty.
return nil, true
case 0x01:
// Partition is done and the blob follows.
return data[1:], true
default:
return nil, false
}
}
// WriteGenerateTriePartitionDone records a completed partition.
func WriteGenerateTriePartitionDone(db ethdb.KeyValueWriter, partition byte, blob []byte) {
var value []byte
if blob == nil {
value = []byte{0x00}
} else {
value = append([]byte{0x01}, blob...)
}
if err := db.Put(generateTriePartitionDoneKey(partition), value); err != nil {
log.Crit("Failed to store generate-trie done marker", "err", err)
}
}
// DeleteGenerateTriePartitionDone removes a partition's done marker.
func DeleteGenerateTriePartitionDone(db ethdb.KeyValueWriter, partition byte) {
if err := db.Delete(generateTriePartitionDoneKey(partition)); err != nil {
log.Crit("Failed to remove generate-trie done marker", "err", err)
}
}
// DeleteSnapshotSyncStatus removes the serialized sync status from the database. // DeleteSnapshotSyncStatus removes the serialized sync status from the database.
func DeleteSnapshotSyncStatus(db ethdb.KeyValueWriter) { func DeleteSnapshotSyncStatus(db ethdb.KeyValueWriter) {
if err := db.Delete(snapshotSyncStatusKey); err != nil { if err := db.Delete(snapshotSyncStatusKey); err != nil {

View file

@ -563,8 +563,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
} }
// Metadata keys // Metadata keys
case bytes.HasPrefix(key, generateTriePartitionDonePrefix) && len(key) == len(generateTriePartitionDonePrefix)+1:
metadata.add(size)
case slices.ContainsFunc(knownMetadataKeys, func(x []byte) bool { return bytes.Equal(x, key) }): case slices.ContainsFunc(knownMetadataKeys, func(x []byte) bool { return bytes.Equal(x, key) }):
metadata.add(size) metadata.add(size)

View file

@ -104,10 +104,6 @@ var (
// snapSyncStatusFlagKey flags that status of snap sync. // snapSyncStatusFlagKey flags that status of snap sync.
snapSyncStatusFlagKey = []byte("SnapSyncStatus") snapSyncStatusFlagKey = []byte("SnapSyncStatus")
// generateTriePartitionDonePrefix stores the subtree root hash of each
// triedb.GenerateTrie partition once it finishes.
generateTriePartitionDonePrefix = []byte("gtd") // generateTriePartitionDonePrefix + partition byte -> subtree root hash
// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td (deprecated) headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td (deprecated)
@ -469,8 +465,3 @@ func trienodeHistoryIndexBlockKey(addressHash common.Hash, path []byte, blockID
func transitionStateKey(hash common.Hash) []byte { func transitionStateKey(hash common.Hash) []byte {
return append(VerkleTransitionStatePrefix, hash.Bytes()...) return append(VerkleTransitionStatePrefix, hash.Bytes()...)
} }
// generateTriePartitionDoneKey = generateTriePartitionDonePrefix + partition (single byte).
func generateTriePartitionDoneKey(partition byte) []byte {
return append(generateTriePartitionDonePrefix, partition)
}

View file

@ -297,9 +297,11 @@ func (d *Downloader) fetchHeaders(from uint64) error {
return err return err
} }
// If the pivot became stale (older than 2*64-8 (bit of wiggle room)), // If the pivot became stale (older than 2*64-8 (bit of wiggle room)),
// move it ahead to HEAD-64 // move it ahead to HEAD-64.
//
// The state syncer is consulted first before the pivot movement.
d.pivotLock.Lock() d.pivotLock.Lock()
if d.pivotHeader != nil { if d.pivotHeader != nil && d.snapSyncer.FrozenPivot() == nil {
if head.Number.Uint64() > d.pivotHeader.Number.Uint64()+2*uint64(fsMinFullBlocks)-8 { if head.Number.Uint64() > d.pivotHeader.Number.Uint64()+2*uint64(fsMinFullBlocks)-8 {
// Retrieve the next pivot header, either from skeleton chain // Retrieve the next pivot header, either from skeleton chain
// or the filled chain // or the filled chain

View file

@ -496,6 +496,18 @@ func (d *Downloader) syncToHead() (err error) {
if mode == ethconfig.SnapSync && pivot == nil { if mode == ethconfig.SnapSync && pivot == nil {
pivot = d.blockchain.CurrentBlock() pivot = d.blockchain.CurrentBlock()
} }
// If the snap syncer froze its pivot in a previous cycle, resume against
// the frozen header instead of a fresh one.
if mode == ethconfig.SnapSync && pivot != nil {
if frozen := d.snapSyncer.FrozenPivot(); frozen != nil {
if rawdb.ReadCanonicalHash(d.stateDB, frozen.Number.Uint64()) == frozen.Hash() {
log.Info("Resuming snap sync against frozen pivot", "number", frozen.Number, "hash", frozen.Hash())
pivot = frozen
} else {
log.Warn("Frozen pivot is no longer canonical", "number", frozen.Number, "hash", frozen.Hash())
}
}
}
height := latest.Number.Uint64() height := latest.Number.Uint64()
// In beacon mode, use the skeleton chain for the ancestor lookup // In beacon mode, use the skeleton chain for the ancestor lookup
@ -921,7 +933,9 @@ func (d *Downloader) processSnapSyncContent() error {
// the results in the meantime. // the results in the meantime.
// //
// Note, there's no issue with memory piling up since after 64 blocks the // Note, there's no issue with memory piling up since after 64 blocks the
// pivot will forcefully move so these accumulators will be dropped. // pivot will forcefully move so these accumulators will be dropped. The
// exception is snap/2 trie generation, where the pivot is frozen on
// purpose and results accumulate until the generation finishes.
var ( var (
oldPivot *fetchResult // Locked in pivot block, might change eventually oldPivot *fetchResult // Locked in pivot block, might change eventually
oldTail []*fetchResult // Downloaded content after the pivot oldTail []*fetchResult // Downloaded content after the pivot
@ -978,11 +992,15 @@ func (d *Downloader) processSnapSyncContent() error {
return err return err
} }
if P != nil { if P != nil {
// If new pivot block found, cancel old state retrieval and restart // If new pivot block found, cancel old state retrieval and restart.
if oldPivot != P { if oldPivot != P {
sync.Cancel() // Skip the restart if the running sync already targets the
sync = d.syncState(P.Header) // pivot's root (e.g, no pivot block movement yet).
go closeOnErr(sync) if sync.pivot.Root != P.Header.Root {
sync.Cancel()
sync = d.syncState(P.Header)
go closeOnErr(sync)
}
oldPivot = P oldPivot = P
} }
// Wait for completion, occasionally checking for pivot staleness // Wait for completion, occasionally checking for pivot staleness

View file

@ -93,7 +93,7 @@ func (s *syncerV2) isStorageFetched(accountHash, storageHash common.Hash) bool {
// applyAccessList applies a single block's access list diffs to the flat state // applyAccessList applies a single block's access list diffs to the flat state
// in the database. For each account, it applies the post-block values (highest // in the database. For each account, it applies the post-block values (highest
// TxIdx entry) for balance, nonce, code, and storage. The storageRoot field is // TxIdx entry) for balance, nonce, code, and storage. The storageRoot field is
// intentionally left stale. It will be recomputed during the trie rebuild. // intentionally left stale. It will be recomputed during the trie generation.
func (s *syncerV2) applyAccessList(b *bal.BlockAccessList, batch ethdb.Batch) error { func (s *syncerV2) applyAccessList(b *bal.BlockAccessList, batch ethdb.Batch) error {
// Iterate over all accounts in the access list // Iterate over all accounts in the access list
for _, access := range *b { for _, access := range *b {
@ -113,7 +113,7 @@ func (s *syncerV2) applyAccessList(b *bal.BlockAccessList, batch ethdb.Batch) er
rawdb.DeleteStorageSnapshot(batch, accountHash, storageHash) rawdb.DeleteStorageSnapshot(batch, accountHash, storageHash)
} else { } else {
// Store the slot in the same encoding the snapshot and the // Store the slot in the same encoding the snapshot and the
// trie rebuild use: RLP of the minimal big-endian value // trie generation use: RLP of the minimal big-endian value
// (leading zeros trimmed), matching core/state's snapshot // (leading zeros trimmed), matching core/state's snapshot
// writes. // writes.
blob, _ := rlp.EncodeToBytes(value.Bytes()) blob, _ := rlp.EncodeToBytes(value.Bytes())
@ -176,7 +176,7 @@ func (s *syncerV2) applyAccessList(b *bal.BlockAccessList, batch ethdb.Batch) er
case isEmpty && !isNew: case isEmpty && !isNew:
// Existing account got fully drained (e.g., pre-funded // Existing account got fully drained (e.g., pre-funded
// address that gets deployed to with init code that // address that gets deployed to with init code that
// self-destructs). Delete the entry so the trie rebuild // self-destructs). Delete the entry so the trie generation
// doesn't pick it up as an empty leaf. // doesn't pick it up as an empty leaf.
rawdb.DeleteAccountSnapshot(batch, accountHash) rawdb.DeleteAccountSnapshot(batch, accountHash)
default: default:

View file

@ -157,7 +157,7 @@ func TestAccessListApplication(t *testing.T) {
// Verify storage updated. Slots are stored in the canonical snapshot // Verify storage updated. Slots are stored in the canonical snapshot
// encoding (RLP of the value with leading zeros trimmed), the same form // encoding (RLP of the value with leading zeros trimmed), the same form
// the download path writes and the trie rebuild consumes. // the download path writes and the trie generation consumes.
storageVal := rawdb.ReadStorageSnapshot(db, accountHash, slotHash) storageVal := rawdb.ReadStorageSnapshot(db, accountHash, slotHash)
wantStorage, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(common.HexToHash("0x02").Bytes())) wantStorage, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(common.HexToHash("0x02").Bytes()))
if !bytes.Equal(storageVal, wantStorage) { if !bytes.Equal(storageVal, wantStorage) {

View file

@ -84,8 +84,10 @@ type Backend interface {
// otherwise only the default (snap/1) versions are offered on the wire. // otherwise only the default (snap/1) versions are offered on the wire.
func MakeProtocols(backend Backend, snapV2 bool) []p2p.Protocol { func MakeProtocols(backend Backend, snapV2 bool) []p2p.Protocol {
versions := ProtocolVersions versions := ProtocolVersions
if snapV2 { if !snapV2 {
versions = append([]uint{SNAP2}, versions...) // snap/2 is not safe to advertise unconditionally yet, so it is gated
// behind a feature flag.
versions = []uint{SNAP1}
} }
protocols := make([]p2p.Protocol, len(versions)) protocols := make([]p2p.Protocol, len(versions))
for i, version := range versions { for i, version := range versions {

View file

@ -186,8 +186,8 @@ func TestSyncProgressV1Discarded(t *testing.T) {
syncer := newSyncerV2(db, rawdb.HashScheme) syncer := newSyncerV2(db, rawdb.HashScheme)
syncer.loadSyncStatus() syncer.loadSyncStatus()
if syncer.previousPivot != nil { if syncer.pivot != nil {
t.Fatalf("expected previousPivot nil after discarding old format, got %+v", syncer.previousPivot) t.Fatalf("expected pivot nil after discarding old format, got %+v", syncer.pivot)
} }
if len(syncer.tasks) != accountConcurrency { if len(syncer.tasks) != accountConcurrency {
t.Fatalf("expected fresh task split of %d, got %d", accountConcurrency, len(syncer.tasks)) t.Fatalf("expected fresh task split of %d, got %d", accountConcurrency, len(syncer.tasks))
@ -258,8 +258,8 @@ func TestSyncProgressCorruptPayload(t *testing.T) {
syncer := newSyncerV2(db, rawdb.HashScheme) syncer := newSyncerV2(db, rawdb.HashScheme)
syncer.loadSyncStatus() syncer.loadSyncStatus()
if syncer.previousPivot != nil { if syncer.pivot != nil {
t.Fatalf("expected previousPivot nil after corrupt payload, got %+v", syncer.previousPivot) t.Fatalf("expected pivot nil after corrupt payload, got %+v", syncer.pivot)
} }
if len(syncer.tasks) != accountConcurrency { if len(syncer.tasks) != accountConcurrency {
t.Fatalf("expected fresh task split of %d, got %d", accountConcurrency, len(syncer.tasks)) t.Fatalf("expected fresh task split of %d, got %d", accountConcurrency, len(syncer.tasks))

View file

@ -35,11 +35,10 @@ const (
// devp2p capability negotiation. // devp2p capability negotiation.
const ProtocolName = "snap" const ProtocolName = "snap"
// ProtocolVersions are the supported versions of the `snap` protocol advertised // ProtocolVersions are all the `snap` protocol versions this node implements
// by default (first is primary). snap/2 is not safe to advertise unconditionally // (first is primary). What's actually advertised on the wire is decided by
// yet, so it is gated behind a feature flag and appended in MakeProtocols rather // MakeProtocols, which gates snap/2 behind a feature flag.
// than listed here. var ProtocolVersions = []uint{SNAP2, SNAP1}
var ProtocolVersions = []uint{SNAP1}
// protocolLengths are the number of implemented messages corresponding to // protocolLengths are the number of implemented messages corresponding to
// different protocol versions. snap/2 adds GetAccessLists/AccessLists (0x08/0x09). // different protocol versions. snap/2 adds GetAccessLists/AccessLists (0x08/0x09).

View file

@ -58,6 +58,10 @@ type Syncer interface {
OnTrieNodes(peer SyncPeerV2, id uint64, trienodes [][]byte) error OnTrieNodes(peer SyncPeerV2, id uint64, trienodes [][]byte) error
OnAccessLists(peer SyncPeerV2, id uint64, lists rlp.RawList[rlp.RawValue]) error OnAccessLists(peer SyncPeerV2, id uint64, lists rlp.RawList[rlp.RawValue]) error
// FrozenPivot returns the pivot header the syncer is bound to, or nil if
// the pivot may still be chosen and moved freely.
FrozenPivot() *types.Header
// Version is the snap protocol version this syncer implements. // Version is the snap protocol version this syncer implements.
Version() uint Version() uint
} }
@ -122,6 +126,11 @@ func (syncerV1Adapter) OnAccessLists(SyncPeerV2, uint64, rlp.RawList[rlp.RawValu
// Version is SNAP1 // Version is SNAP1
func (syncerV1Adapter) Version() uint { return SNAP1 } func (syncerV1Adapter) Version() uint { return SNAP1 }
// FrozenPivot is always nil for snap/1: the sync target must keep tracking
// the chain head, ensuring the state is available in the network, so the
// pivot is never frozen.
func (syncerV1Adapter) FrozenPivot() *types.Header { return nil }
// syncerV2Adapter adapts the snap/2 *syncerV2 to Syncer. Its peer-facing methods // syncerV2Adapter adapts the snap/2 *syncerV2 to Syncer. Its peer-facing methods
// already take SyncPeerV2 and its Sync already takes a header, so only Progress // already take SyncPeerV2 and its Sync already takes a header, so only Progress
// (different return type) and OnTrieNodes (absent) need wrapping. // (different return type) and OnTrieNodes (absent) need wrapping.

View file

@ -26,6 +26,7 @@ import (
"math/rand" "math/rand"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -261,13 +262,34 @@ type storageTaskV2 struct {
done bool // Flag whether the task can be removed done bool // Flag whether the task can be removed
} }
// syncPhase tracks how far a snap/2 sync has progressed for the journaled
// pivot. The phases are strictly ordered: each one implies all previous
// ones have finished.
type syncPhase uint8
const (
// phaseDownload covers the flat state (account, storage, bytecode)
// download. The requests target the pivot root, which remote peers
// only serve while it is recent, so the pivot must keep tracking the
// chain head (see FrozenPivot).
phaseDownload syncPhase = iota
// phaseGenerate covers the local trie generation after the download
// has completed. It targets the exact pivot root it was started with,
// so pivot updates are refused from here on.
phaseGenerate
// phaseComplete means the sync ran to completion for the pivot.
phaseComplete
)
// syncProgressV2 is a database entry to allow suspending and resuming a snapshot state // syncProgressV2 is a database entry to allow suspending and resuming a snapshot state
// sync. Opposed to full and fast sync, there is no way to restart a suspended // sync. Opposed to full and fast sync, there is no way to restart a suspended
// snap sync without prior knowledge of the suspension point. // snap sync without prior knowledge of the suspension point.
type syncProgressV2 struct { type syncProgressV2 struct {
Pivot *types.Header // Pivot header being synced (for pivot move and reorg detection) Pivot *types.Header // Pivot header being synced (for pivot move and reorg detection)
Tasks []*accountTaskV2 // The suspended account tasks (contract tasks within) Tasks []*accountTaskV2 // The suspended account tasks (contract tasks within)
Complete bool // True once sync ran to completion for Pivot Phase syncPhase // Phase is how far the sync has progressed for Pivot
// Status report during syncing phase // Status report during syncing phase
AccountSynced uint64 // Number of accounts downloaded AccountSynced uint64 // Number of accounts downloaded
@ -313,7 +335,7 @@ type SyncPeerV2 interface {
// syncerV2 is an Ethereum account and storage trie syncer based on the snap // syncerV2 is an Ethereum account and storage trie syncer based on the snap
// protocol. It downloads all accounts, storage slots, and bytecodes from // protocol. It downloads all accounts, storage slots, and bytecodes from
// remote peers as flat state, applies BAL diffs on pivot moves, // remote peers as flat state, applies BAL diffs on pivot moves,
// and triggers a final trie rebuild once flat state is consistent. // and triggers a final trie generation once flat state is consistent.
// //
// Every network request has a variety of failure events: // Every network request has a variety of failure events:
// - The peer disconnects after task assignment, failing to send the request // - The peer disconnects after task assignment, failing to send the request
@ -322,14 +344,12 @@ type SyncPeerV2 interface {
// - The peer delivers a stale response after a previous timeout // - The peer delivers a stale response after a previous timeout
// - The peer delivers a refusal to serve the requested state // - The peer delivers a refusal to serve the requested state
type syncerV2 struct { type syncerV2 struct {
db ethdb.Database // Database to store the trie nodes into (and dedup) db ethdb.Database // Database to store the trie nodes into (and dedup)
scheme string // Node scheme used in node database scheme string // Node scheme used in node database
pivot *types.Header // Current pivot header being synced (lock needed)
pivot *types.Header // Current pivot header being synced (lock needed) phase atomic.Uint32 // Current syncPhase; atomic so phase transitions are visible across goroutines
previousPivot *types.Header // Pivot from previous sync run (for pivot move detection) tasks []*accountTaskV2 // Current account task set being synced
complete bool // Whether the persisted progress was a completed sync update chan struct{} // Notification channel for possible sync progression
tasks []*accountTaskV2 // Current account task set being synced
update chan struct{} // Notification channel for possible sync progression
peers map[string]SyncPeerV2 // Currently active peers to download from peers map[string]SyncPeerV2 // Currently active peers to download from
peerJoin *event.Feed // Event feed to react to peers joining peerJoin *event.Feed // Event feed to react to peers joining
@ -370,7 +390,7 @@ type syncerV2 struct {
// newSyncerV2 creates a new snapshot syncer to download the Ethereum state over the // newSyncerV2 creates a new snapshot syncer to download the Ethereum state over the
// snap protocol. // snap protocol.
func newSyncerV2(db ethdb.Database, scheme string) *syncerV2 { func newSyncerV2(db ethdb.Database, scheme string) *syncerV2 {
return &syncerV2{ s := &syncerV2{
db: db, db: db,
scheme: scheme, scheme: scheme,
@ -393,6 +413,24 @@ func newSyncerV2(db ethdb.Database, scheme string) *syncerV2 {
extProgress: new(syncProgressV2), extProgress: new(syncProgressV2),
} }
if raw := rawdb.ReadSnapshotSyncStatus(db); len(raw) > 0 && raw[0] == syncProgressVersion {
var progress syncProgressV2
if err := json.Unmarshal(raw[1:], &progress); err == nil {
s.pivot = progress.Pivot
s.setPhase(progress.Phase)
}
}
return s
}
// getPhase returns the current sync phase.
func (s *syncerV2) getPhase() syncPhase {
return syncPhase(s.phase.Load())
}
// setPhase moves the sync to the given phase.
func (s *syncerV2) setPhase(phase syncPhase) {
s.phase.Store(uint32(phase))
} }
// Register injects a new data source into the syncer's peerset. // Register injects a new data source into the syncer's peerset.
@ -452,19 +490,17 @@ func (s *syncerV2) Unregister(id string) error {
// Sync starts (or resumes a previous) sync cycle to iterate over a state trie // Sync starts (or resumes a previous) sync cycle to iterate over a state trie
// with the given pivot header and reconstruct the nodes based on the snapshot // with the given pivot header and reconstruct the nodes based on the snapshot
// leaves. // leaves.
func (s *syncerV2) Sync(pivot *types.Header, cancel chan struct{}) error { func (s *syncerV2) Sync(target *types.Header, cancel chan struct{}) error {
if pivot == nil { if target == nil {
return errors.New("snap sync: pivot header is nil") return errors.New("snap sync: pivot header is nil")
} }
s.lock.Lock() s.lock.Lock()
s.pivot = pivot
s.previousPivot = nil // loadSyncStatus overwrites when resuming from persisted progress
s.statelessPeers = make(map[string]struct{}) s.statelessPeers = make(map[string]struct{})
s.lock.Unlock() s.lock.Unlock()
if s.startTime.IsZero() { if s.startTime.IsZero() {
s.startTime = time.Now() s.startTime = time.Now()
} }
root := pivot.Root root := target.Root
// Retrieve the previous sync status from DB. If there's no persisted // Retrieve the previous sync status from DB. If there's no persisted
// status, sync is either fresh or already complete. // status, sync is either fresh or already complete.
@ -473,20 +509,24 @@ func (s *syncerV2) Sync(pivot *types.Header, cancel chan struct{}) error {
// isPivotChanged is true when we have prior progress against a different // isPivotChanged is true when we have prior progress against a different
// pivot. That means we need to roll forward via catchUp, or wipe and // pivot. That means we need to roll forward via catchUp, or wipe and
// restart if the prior pivot was reorged out. // restart if the prior pivot was reorged out.
isPivotChanged := s.previousPivot != nil && s.previousPivot.Hash() != s.pivot.Hash() s.lock.RLock()
prevPivot := s.pivot
s.lock.RUnlock()
isPivotChanged := prevPivot != nil && prevPivot.Hash() != target.Hash()
// Skip if we've already finished syncing this pivot. // Skip if we've already finished syncing this pivot.
if !isPivotChanged && s.complete { if !isPivotChanged && s.getPhase() == phaseComplete {
log.Info("Snap sync already complete for this pivot", "root", root) log.Info("Snap sync already complete for this pivot", "root", root)
return nil return nil
} }
// We're committing to running this sync. Clear the complete flag so a // We're committing to running this sync. Demote a completed phase so a
// mid-run save (on cancel or error) doesn't persist a stale Complete=true // mid-run save (on cancel or error) doesn't persist a stale complete
// status from a prior pivot. // status from a prior pivot. The download remains done, only the trie
s.lock.Lock() // generation must be redone against the new pivot.
s.complete = false if s.getPhase() == phaseComplete {
s.lock.Unlock() s.setPhase(phaseGenerate)
}
defer func() { defer func() {
// Whether sync completed or not, disregard any future packets // Whether sync completed or not, disregard any future packets
@ -515,24 +555,25 @@ func (s *syncerV2) Sync(pivot *types.Header, cancel chan struct{}) error {
// progress is still usable. If yes, roll forward via BAL catch-up. If not, // progress is still usable. If yes, roll forward via BAL catch-up. If not,
// wipe everything and restart fresh. // wipe everything and restart fresh.
if isPivotChanged { if isPivotChanged {
if isPivotReorged(s.db, s.previousPivot, s.pivot) { if isPivotReorged(s.db, prevPivot, target) {
log.Warn("Persisted progress unusable, restarting snap sync from scratch", log.Warn("Restarting snap sync from scratch", "oldnumber", prevPivot.Number, "oldHash", prevPivot.Hash())
"number", s.previousPivot.Number, "oldHash", s.previousPivot.Hash())
s.resetSyncState() s.resetSyncState()
} else if err := s.catchUp(cancel); err != nil { } else {
return err // A canonical pivot move past a frozen pivot should be impossible:
// the downloader both refuses moves (FrozenPivot) and resumes new
// cycles against the frozen header itself. Reaching this branch
// frozen indicates a bug on the downloader side; roll the flat
// state forward defensively and regenerate.
if s.getPhase() >= phaseGenerate {
log.Warn("Frozen pivot moved unexpectedly, rolling forward", "frozen", prevPivot.Number, "new", target.Number)
}
if err := s.catchUp(target, cancel); err != nil {
return err
}
} }
} }
// Pin previousPivot to the current pivot before downloadState runs.
// This is what saveSyncStatus persists. If the download is interrupted
// and the next Sync gets a different pivot, this is how isPivotReorged
// recognizes the partial flat state belongs to the old pivot. Without
// it, isPivotReorged sees nil, skips the reorg branch, and downloadState
// would resume from the persisted task markers but mix the old pivot's
// already-downloaded accounts with the new pivot's data.
s.lock.Lock() s.lock.Lock()
s.previousPivot = s.pivot s.pivot = target
s.lock.Unlock() s.lock.Unlock()
log.Info("Starting state download", "root", root) log.Info("Starting state download", "root", root)
@ -541,21 +582,47 @@ func (s *syncerV2) Sync(pivot *types.Header, cancel chan struct{}) error {
} }
log.Info("State download complete", "root", root) log.Info("State download complete", "root", root)
// Entering the generation phase makes the downloader stop moving the
// pivot (see FrozenPivot) until the pivot block is committed. The phase
// is persisted right away so the freeze also holds across a restart,
// before the generation has had a chance to finish.
if s.getPhase() < phaseGenerate {
s.setPhase(phaseGenerate)
s.saveSyncStatus()
}
log.Info("Starting trie generation", "root", root) log.Info("Starting trie generation", "root", root)
batch := s.db.NewBatch()
s.resetTrienodes(batch)
if err := batch.Write(); err != nil {
return err
}
if _, err := triedb.GenerateTrie(s.db, s.scheme, root, cancel); err != nil { if _, err := triedb.GenerateTrie(s.db, s.scheme, root, cancel); err != nil {
return err return err
} }
log.Info("Trie generation complete", "root", root) log.Info("Trie generation complete", "root", root)
// Mark sync complete. The deferred saveSyncStatus persists this with // Mark sync complete. The deferred saveSyncStatus persists this so a
// Complete=true so a follow-up Sync call for the same pivot can skip // follow-up Sync call for the same pivot can skip the work entirely.
// the work entirely. s.setPhase(phaseComplete)
s.lock.Lock()
s.complete = true
s.lock.Unlock()
return nil return nil
} }
// FrozenPivot returns the pivot header the sync is bound to, or nil while
// the pivot may still move freely. The pivot freezes once the state
// download completes. The remaining work (trie generation) and the pivot
// commit is purely local and targets the exact pivot root the download
// finished with, so from that point on the downloader must neither move the
// pivot nor start a new cycle against a different one.
func (s *syncerV2) FrozenPivot() *types.Header {
if s.getPhase() < phaseGenerate {
return nil
}
s.lock.RLock()
defer s.lock.RUnlock()
return s.pivot
}
// download runs the bulk flat-state download. It fetches // download runs the bulk flat-state download. It fetches
// account ranges, storage slots, and bytecodes, writing flat state to disk. // account ranges, storage slots, and bytecodes, writing flat state to disk.
func (s *syncerV2) downloadState(cancel chan struct{}) error { func (s *syncerV2) downloadState(cancel chan struct{}) error {
@ -660,10 +727,10 @@ func isPivotReorged(db ethdb.Database, prev, curr *types.Header) bool {
// catchUp runs the BAL catch-up. When the pivot has moved, it fetches BALs // catchUp runs the BAL catch-up. When the pivot has moved, it fetches BALs
// for the gap blocks, verifies them against block headers, and applies the // for the gap blocks, verifies them against block headers, and applies the
// diffs to roll flat state forward. // diffs to roll flat state forward.
func (s *syncerV2) catchUp(cancel chan struct{}) error { func (s *syncerV2) catchUp(target *types.Header, cancel chan struct{}) error {
s.lock.RLock() s.lock.RLock()
from := s.previousPivot.Number.Uint64() + 1 from := s.pivot.Number.Uint64() + 1
to := s.pivot.Number.Uint64() to := target.Number.Uint64()
s.lock.RUnlock() s.lock.RUnlock()
log.Info("Starting BAL catch-up", "from", from, "to", to, "blocks", to-from+1) log.Info("Starting BAL catch-up", "from", from, "to", to, "blocks", to-from+1)
@ -720,7 +787,7 @@ func (s *syncerV2) catchUp(cancel chan struct{}) error {
// Persist incremental progress so a crash mid-catchUp can resume // Persist incremental progress so a crash mid-catchUp can resume
// from the next unapplied block. // from the next unapplied block.
s.lock.Lock() s.lock.Lock()
s.previousPivot = headers[hash] s.pivot = headers[hash]
s.lock.Unlock() s.lock.Unlock()
s.saveSyncStatusWithDB(batch) s.saveSyncStatusWithDB(batch)
@ -952,8 +1019,8 @@ func (s *syncerV2) loadSyncStatus() {
} }
task.StorageCompleted = nil task.StorageCompleted = nil
} }
s.previousPivot = progress.Pivot s.pivot = progress.Pivot
s.complete = progress.Complete s.setPhase(progress.Phase)
s.accountSynced = progress.AccountSynced s.accountSynced = progress.AccountSynced
s.accountBytes = progress.AccountBytes s.accountBytes = progress.AccountBytes
s.bytecodeSynced = progress.BytecodeSynced s.bytecodeSynced = progress.BytecodeSynced
@ -1005,6 +1072,16 @@ func deleteRange(batch ethdb.Batch, prefix []byte) {
} }
} }
// resetTrienodes wipes all persisted trienodes if the path scheme is used.
// It's a defensive operation, ensuring all the leftover trie nodes are cleared
// before the new generation cycle.
func (s *syncerV2) resetTrienodes(batch ethdb.Batch) {
if s.scheme == rawdb.PathScheme {
deleteRange(batch, rawdb.TrieNodeAccountPrefix)
deleteRange(batch, rawdb.TrieNodeStoragePrefix)
}
}
// resetSyncState wipes all persisted snap-sync data (sync status, account // resetSyncState wipes all persisted snap-sync data (sync status, account
// and storage snapshots) and re-initializes in-memory state with a fresh // and storage snapshots) and re-initializes in-memory state with a fresh
// chunking of the account hash range. // chunking of the account hash range.
@ -1013,14 +1090,15 @@ func (s *syncerV2) resetSyncState() {
rawdb.DeleteSnapshotSyncStatus(batch) rawdb.DeleteSnapshotSyncStatus(batch)
deleteRange(batch, rawdb.SnapshotAccountPrefix) deleteRange(batch, rawdb.SnapshotAccountPrefix)
deleteRange(batch, rawdb.SnapshotStoragePrefix) deleteRange(batch, rawdb.SnapshotStoragePrefix)
s.resetTrienodes(batch)
batch.Write() batch.Write()
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
s.tasks = nil s.tasks = nil
s.previousPivot = nil s.pivot = nil
s.complete = false s.setPhase(phaseDownload)
s.accountSynced, s.accountBytes = 0, 0 s.accountSynced, s.accountBytes = 0, 0
s.bytecodeSynced, s.bytecodeBytes = 0, 0 s.bytecodeSynced, s.bytecodeBytes = 0, 0
s.storageSynced, s.storageBytes = 0, 0 s.storageSynced, s.storageBytes = 0, 0
@ -1069,9 +1147,9 @@ func (s *syncerV2) saveSyncStatusWithDB(db ethdb.KeyValueWriter) {
} }
// Store the actual progress markers. // Store the actual progress markers.
progress := &syncProgressV2{ progress := &syncProgressV2{
Pivot: s.previousPivot, Pivot: s.pivot,
Tasks: s.tasks, Tasks: s.tasks,
Complete: s.complete, Phase: s.getPhase(),
AccountSynced: s.accountSynced, AccountSynced: s.accountSynced,
AccountBytes: s.accountBytes, AccountBytes: s.accountBytes,
BytecodeSynced: s.bytecodeSynced, BytecodeSynced: s.bytecodeSynced,
@ -2028,7 +2106,7 @@ func (s *syncerV2) forwardAccountTask(task *accountTaskV2) {
// Persist the received account segments. These flat state maybe // Persist the received account segments. These flat state maybe
// outdated during the sync, but it can be fixed later during the // outdated during the sync, but it can be fixed later during the
// trie rebuild. // trie generation.
oldAccountBytes := s.accountBytes oldAccountBytes := s.accountBytes
batch := ethdb.HookedBatch{ batch := ethdb.HookedBatch{

View file

@ -547,6 +547,65 @@ func testSyncV2(t *testing.T, scheme string) {
verifyAdoptedSyncedState(scheme, syncer.db, sourceAccountTrie.Hash(), elems, t) verifyAdoptedSyncedState(scheme, syncer.db, sourceAccountTrie.Hash(), elems, t)
} }
// TestSyncV2FrozenPivot checks the pivot freeze signal around the sync
// lifecycle. The pivot is unfrozen while flat state is downloading, frozen
// once the download completes, stays frozen after the sync returns so the
// downloader resumes against it until the pivot block is committed, and
// unfreezes again after a state reset.
func TestSyncV2FrozenPivot(t *testing.T) {
t.Parallel()
testSyncV2FrozenPivot(t, rawdb.HashScheme)
testSyncV2FrozenPivot(t, rawdb.PathScheme)
}
func testSyncV2FrozenPivot(t *testing.T, scheme string) {
var (
once sync.Once
cancel = make(chan struct{})
term = func() { once.Do(func() { close(cancel) }) }
)
nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, scheme)
source := newTestPeerV2("source", t, term)
source.accountTrie = sourceAccountTrie.Copy()
source.accountValues = elems
syncer := setupSyncerV2(nodeScheme, source)
pivot := mkPivot(0, sourceAccountTrie.Hash())
// The handler runs while account ranges are still being served, so it
// can observe the signal mid download.
source.accountRequestV2Handler = func(p *testPeerV2, requestId uint64, root common.Hash, origin common.Hash, limit common.Hash, cap int) error {
if syncer.FrozenPivot() != nil {
t.Error("pivot frozen during flat state download")
}
return defaultAccountRequestHandlerV2(p, requestId, root, origin, limit, cap)
}
if syncer.FrozenPivot() != nil {
t.Fatal("pivot frozen before sync started")
}
if err := syncer.Sync(pivot, cancel); err != nil {
t.Fatalf("sync failed: %v", err)
}
if frozen := syncer.FrozenPivot(); frozen == nil || frozen.Hash() != pivot.Hash() {
t.Fatal("pivot not frozen at the synced header after download completed")
}
// A restart must not lose the freeze: a fresh syncer instance on the same
// database derives it from the persisted journal, before any Sync call.
restarted := newSyncerV2(syncer.db, nodeScheme)
if frozen := restarted.FrozenPivot(); frozen == nil || frozen.Hash() != pivot.Hash() {
t.Fatal("pivot freeze lost after restart")
}
syncer.resetSyncState()
if syncer.FrozenPivot() != nil {
t.Fatal("pivot still frozen after state reset")
}
// The reset deletes the journal, so a restarted instance is unfrozen too.
if restarted := newSyncerV2(syncer.db, nodeScheme); restarted.FrozenPivot() != nil {
t.Fatal("pivot still frozen after restart following a state reset")
}
}
// verifyAdoptedSyncedState exercises the snap/2 completion contract end-to-end: // verifyAdoptedSyncedState exercises the snap/2 completion contract end-to-end:
// after a real sync, opening a fresh triedb and calling AdoptSyncedState must // after a real sync, opening a fresh triedb and calling AdoptSyncedState must
// (a) succeed and (b) leave flat-state reads serving immediately, with no // (a) succeed and (b) leave flat-state reads serving immediately, with no
@ -1326,9 +1385,9 @@ func TestIsPivotReorged(t *testing.T) {
// canonical header at block 100 has a different hash. Sync is then called with // canonical header at block 100 has a different hash. Sync is then called with
// a new pivot at the same height. // a new pivot at the same height.
// //
// If isPivotReorged works, loadSyncStatus restores previousPivot, the check // If isPivotReorged works, loadSyncStatus restores the persisted pivot, the
// flags it as reorged, resetSyncState clears previousPivot, catchUp is // check flags it as reorged, resetSyncState clears it, catchUp is skipped,
// skipped, and the fresh download proceeds to completion. // and the fresh download proceeds to completion.
// //
// If detection doesn't fire, the pivot-move check would call catchUp with // If detection doesn't fire, the pivot-move check would call catchUp with
// from = 101 and to = 100 — the inverted-range guard surfaces that as an // from = 101 and to = 100 — the inverted-range guard surfaces that as an
@ -1347,10 +1406,8 @@ func TestSyncDetectsPivotReorged(t *testing.T) {
// and non-zero counter so the reset path has something to clean up. // and non-zero counter so the reset path has something to clean up.
orphanPivot := mkPivot(100, common.HexToHash("0xdead")) orphanPivot := mkPivot(100, common.HexToHash("0xdead"))
seed := newSyncerV2(db, nodeScheme) seed := newSyncerV2(db, nodeScheme)
// previousPivot reflects where flat state matches and it is what // pivot reflects where flat state matches and it is what saveSyncStatus
// saveSyncStatus persists. Set it to simulate a prior sync reaching // persists. Set it to simulate a prior sync reaching orphanPivot.
// orphanPivot.
seed.previousPivot = orphanPivot
seed.pivot = orphanPivot seed.pivot = orphanPivot
seed.accountSynced = 42 seed.accountSynced = 42
seed.tasks = []*accountTaskV2{{ seed.tasks = []*accountTaskV2{{
@ -1391,14 +1448,14 @@ func TestSyncDetectsPivotReorged(t *testing.T) {
if err := syncer.Sync(newPivot, cancel); err != nil { if err := syncer.Sync(newPivot, cancel); err != nil {
t.Fatalf("sync failed (reorg detection likely broken): %v", err) t.Fatalf("sync failed (reorg detection likely broken): %v", err)
} }
// After successful completion, status should be marked Complete=true // After successful completion, status should reach the complete phase
// against the new (canonical) pivot. // against the new (canonical) pivot.
loader := newSyncerV2(db, nodeScheme) loader := newSyncerV2(db, nodeScheme)
loader.loadSyncStatus() loader.loadSyncStatus()
if !loader.complete { if loader.getPhase() != phaseComplete {
t.Fatal("sync status should be marked Complete=true after successful completion") t.Fatal("sync status should reach the complete phase after successful completion")
} }
if loader.previousPivot == nil || loader.previousPivot.Hash() != newPivot.Hash() { if loader.pivot == nil || loader.pivot.Hash() != newPivot.Hash() {
t.Fatalf("expected persisted pivot to match new pivot") t.Fatalf("expected persisted pivot to match new pivot")
} }
if data := rawdb.ReadAccountSnapshot(db, orphanAccountHash); len(data) != 0 { if data := rawdb.ReadAccountSnapshot(db, orphanAccountHash); len(data) != 0 {
@ -1445,9 +1502,8 @@ func testInterruptedDownloadRecovery(t *testing.T, scheme string) {
syncer1.Register(src1) syncer1.Register(src1)
src1.remote = syncer1 src1.remote = syncer1
pivot := mkPivot(0, root) pivot := mkPivot(0, root)
syncer1.pivot = pivot
syncer1.previousPivot = pivot // Sync sets this before downloadState
syncer1.loadSyncStatus() syncer1.loadSyncStatus()
syncer1.pivot = pivot // Sync pins this before downloadState
syncer1.downloadState(cancel1) syncer1.downloadState(cancel1)
// Save progress // Save progress
@ -1483,9 +1539,8 @@ func testInterruptedDownloadRecovery(t *testing.T, scheme string) {
syncer2.Register(src2) syncer2.Register(src2)
src2.remote = syncer2 src2.remote = syncer2
pivot2 := mkPivot(0, root) pivot2 := mkPivot(0, root)
syncer2.pivot = pivot2
syncer2.previousPivot = pivot2 // Sync sets this before downloadState
syncer2.loadSyncStatus() syncer2.loadSyncStatus()
syncer2.pivot = pivot2 // Sync pins this before downloadState
if err := syncer2.downloadState(cancel2); err != nil { if err := syncer2.downloadState(cancel2); err != nil {
t.Fatalf("resumed download failed: %v", err) t.Fatalf("resumed download failed: %v", err)
} }
@ -1499,10 +1554,10 @@ func testInterruptedDownloadRecovery(t *testing.T, scheme string) {
} }
// TestSyncPersistsPivotDuringDownload verifies that after a fresh Sync is // TestSyncPersistsPivotDuringDownload verifies that after a fresh Sync is
// interrupted mid-download, the persisted previousPivot equals the current // interrupted mid-download, the persisted pivot equals the current pivot
// pivot (not nil). Without this, a follow-up Sync at a different pivot // (not nil). Without this, a follow-up Sync at a different pivot would not
// would not see that the partial flat state belongs to the old pivot, and // see that the partial flat state belongs to the old pivot, and would mix
// would mix old-pivot accounts with new-pivot data. // old-pivot accounts with new-pivot data.
func TestSyncPersistsPivotDuringDownload(t *testing.T) { func TestSyncPersistsPivotDuringDownload(t *testing.T) {
t.Parallel() t.Parallel()
nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, rawdb.HashScheme) nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, rawdb.HashScheme)
@ -1532,15 +1587,15 @@ func TestSyncPersistsPivotDuringDownload(t *testing.T) {
// Sync should be interrupted by the cancel after a couple of responses. // Sync should be interrupted by the cancel after a couple of responses.
_ = syncer.Sync(pivot, cancel) _ = syncer.Sync(pivot, cancel)
// Persisted previousPivot must equal the pivot, so a follow-up Sync at a // Persisted pivot must equal the pivot, so a follow-up Sync at a different
// different pivot can recognize the partial flat state belongs to this one. // pivot can recognize the partial flat state belongs to this one.
loader := newSyncerV2(db, nodeScheme) loader := newSyncerV2(db, nodeScheme)
loader.loadSyncStatus() loader.loadSyncStatus()
if loader.previousPivot == nil { if loader.pivot == nil {
t.Fatal("expected persisted previousPivot to be set after interrupted download, got nil") t.Fatal("expected persisted pivot to be set after interrupted download, got nil")
} }
if loader.previousPivot.Hash() != pivot.Hash() { if loader.pivot.Hash() != pivot.Hash() {
t.Errorf("persisted previousPivot mismatch: got %v, want %v", loader.previousPivot.Hash(), pivot.Hash()) t.Errorf("persisted pivot mismatch: got %v, want %v", loader.pivot.Hash(), pivot.Hash())
} }
} }
@ -1702,7 +1757,7 @@ func testPivotMovement(t *testing.T, scheme string, pivotMoves int) {
} }
// TestCatchUpPersistsIncrementally verifies that catchUp updates and persists // TestCatchUpPersistsIncrementally verifies that catchUp updates and persists
// previousPivot after each successfully applied BAL. If a later block in the // the pivot after each successfully applied BAL. If a later block in the
// gap fails to apply, the persisted state reflects the last successful block, // gap fails to apply, the persisted state reflects the last successful block,
// so a follow-up Sync can resume from there rather than reapplying everything. // so a follow-up Sync can resume from there rather than reapplying everything.
func TestCatchUpPersistsIncrementally(t *testing.T) { func TestCatchUpPersistsIncrementally(t *testing.T) {
@ -1776,7 +1831,7 @@ func testCatchUpPersistsIncrementally(t *testing.T, scheme string) {
blocks[i] = balBlock{header: header, bal: buf.Bytes()} blocks[i] = balBlock{header: header, bal: buf.Bytes()}
} }
// First sync: complete sync to A so persisted state has previousPivot=A, // First sync: complete sync to A so persisted state has pivot=A,
// flat state covers all accounts. // flat state covers all accounts.
{ {
var ( var (
@ -1826,22 +1881,22 @@ func testCatchUpPersistsIncrementally(t *testing.T, scheme string) {
t.Fatal("expected Sync to fail when applyAccessList hits corrupt flat state") t.Fatal("expected Sync to fail when applyAccessList hits corrupt flat state")
} }
// Persisted previousPivot should now reflect the last successfully applied // Persisted pivot should now reflect the last successfully applied
// block (A+2). Without per-iteration saves, it would still be at A. // block (A+2). Without per-iteration saves, it would still be at A.
loader := newSyncerV2(db, nodeScheme) loader := newSyncerV2(db, nodeScheme)
loader.loadSyncStatus() loader.loadSyncStatus()
if loader.previousPivot == nil { if loader.pivot == nil {
t.Fatal("expected persisted previousPivot to be set after partial catchUp") t.Fatal("expected persisted pivot to be set after partial catchUp")
} }
wantHash := blocks[1].header.Hash() wantHash := blocks[1].header.Hash()
if loader.previousPivot.Hash() != wantHash { if loader.pivot.Hash() != wantHash {
t.Errorf("persisted previousPivot mismatch after partial catchUp: got %v, want %v (block A+2)", t.Errorf("persisted pivot mismatch after partial catchUp: got %v, want %v (block A+2)",
loader.previousPivot.Hash(), wantHash) loader.pivot.Hash(), wantHash)
} }
} }
// TestSyncStatusMarkedCompleteAfterCompletion verifies that after a full sync // TestSyncStatusMarkedCompleteAfterCompletion verifies that after a full sync
// completes, the persisted sync status has Complete=true. This lets a // completes, the persisted sync status reaches the complete phase. This lets a
// subsequent Sync call distinguish "already done" from "fresh node" and skip. // subsequent Sync call distinguish "already done" from "fresh node" and skip.
func TestSyncStatusMarkedCompleteAfterCompletion(t *testing.T) { func TestSyncStatusMarkedCompleteAfterCompletion(t *testing.T) {
t.Parallel() t.Parallel()
@ -1870,13 +1925,13 @@ func testSyncStatusMarkedCompleteAfterCompletion(t *testing.T, scheme string) {
} }
// After successful sync, persisted status should be present with // After successful sync, persisted status should be present with
// Complete=true and the pivot we synced to. // the complete phase and the pivot we synced to.
loader := newSyncerV2(syncer.db, nodeScheme) loader := newSyncerV2(syncer.db, nodeScheme)
loader.loadSyncStatus() loader.loadSyncStatus()
if !loader.complete { if loader.getPhase() != phaseComplete {
t.Fatal("expected persisted status to have Complete=true after successful sync") t.Fatal("expected persisted status to reach the complete phase after successful sync")
} }
if loader.previousPivot == nil || loader.previousPivot.Hash() != pivot.Hash() { if loader.pivot == nil || loader.pivot.Hash() != pivot.Hash() {
t.Fatalf("expected persisted pivot to match synced pivot") t.Fatalf("expected persisted pivot to match synced pivot")
} }
} }
@ -1906,7 +1961,7 @@ func TestSyncSkipsIfAlreadyComplete(t *testing.T) {
t.Fatalf("first sync failed: %v", err) t.Fatalf("first sync failed: %v", err)
} }
// Wipe the flat state. The persisted status (with Complete=true) stays. // Wipe the flat state. The persisted status (in the complete phase) stays.
if err := syncer.db.DeleteRange(rawdb.SnapshotAccountPrefix, []byte{rawdb.SnapshotAccountPrefix[0] + 1}); err != nil { if err := syncer.db.DeleteRange(rawdb.SnapshotAccountPrefix, []byte{rawdb.SnapshotAccountPrefix[0] + 1}); err != nil {
t.Fatalf("failed to wipe account snapshot: %v", err) t.Fatalf("failed to wipe account snapshot: %v", err)
} }
@ -1922,17 +1977,17 @@ func TestSyncSkipsIfAlreadyComplete(t *testing.T) {
} }
} }
// TestInterruptedRebuildRecovery verifies that if sync is interrupted after // TestInterruptedGenerationRecovery verifies that if sync is interrupted after
// download completes but before trie rebuild finishes, the next Sync() call // download completes but before trie generation finishes, the next Sync() call
// re-runs the download (which completes immediately) and rebuild. // re-runs the download (which completes immediately) and generation.
func TestInterruptedRebuildRecovery(t *testing.T) { func TestInterruptedGenerationRecovery(t *testing.T) {
t.Parallel() t.Parallel()
nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, rawdb.HashScheme) nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, rawdb.HashScheme)
root := sourceAccountTrie.Hash() root := sourceAccountTrie.Hash()
// First run: complete download, save status, simulate interruption // First run: complete download, save status, simulate interruption
// before rebuild by calling downloadState() directly // before generation by calling downloadState() directly
var ( var (
once1 sync.Once once1 sync.Once
cancel1 = make(chan struct{}) cancel1 = make(chan struct{})
@ -1946,9 +2001,8 @@ func TestInterruptedRebuildRecovery(t *testing.T) {
syncer1.Register(src1) syncer1.Register(src1)
src1.remote = syncer1 src1.remote = syncer1
pivot := mkPivot(0, root) pivot := mkPivot(0, root)
syncer1.pivot = pivot
syncer1.previousPivot = pivot // Sync sets this before downloadState
syncer1.loadSyncStatus() syncer1.loadSyncStatus()
syncer1.pivot = pivot // Sync pins this before downloadState
if err := syncer1.downloadState(cancel1); err != nil { if err := syncer1.downloadState(cancel1); err != nil {
t.Fatalf("download failed: %v", err) t.Fatalf("download failed: %v", err)
@ -1960,11 +2014,11 @@ func TestInterruptedRebuildRecovery(t *testing.T) {
syncer1.cleanAccountTasks() syncer1.cleanAccountTasks()
syncer1.saveSyncStatus() syncer1.saveSyncStatus()
// Status should exist (rebuild hasn't run yet) // Status should exist (generation hasn't run yet)
if rawdb.ReadSnapshotSyncStatus(db) == nil { if rawdb.ReadSnapshotSyncStatus(db) == nil {
t.Fatal("sync status should exist after download") t.Fatal("sync status should exist after download")
} }
// Second run: full Sync should detect tasks are done, run rebuild // Second run: full Sync should detect tasks are done, run generation
var ( var (
once2 sync.Once once2 sync.Once
cancel2 = make(chan struct{}) cancel2 = make(chan struct{})
@ -1980,11 +2034,16 @@ func TestInterruptedRebuildRecovery(t *testing.T) {
if err := syncer2.Sync(mkPivot(0, root), cancel2); err != nil { if err := syncer2.Sync(mkPivot(0, root), cancel2); err != nil {
t.Fatalf("resumed sync failed: %v", err) t.Fatalf("resumed sync failed: %v", err)
} }
// After rebuild completes, status should be marked Complete=true. // The resumed run re-arms the pivot freeze once its no-op download
// completes, the downloader relies on it until the pivot block commits.
if syncer2.FrozenPivot() == nil {
t.Fatal("pivot not frozen after resumed sync")
}
// After generation completes, status should reach the complete phase.
loader := newSyncerV2(db, nodeScheme) loader := newSyncerV2(db, nodeScheme)
loader.loadSyncStatus() loader.loadSyncStatus()
if !loader.complete { if loader.getPhase() != phaseComplete {
t.Fatal("sync status should be marked Complete=true after rebuild completes") t.Fatal("sync status should reach the complete phase after generation completes")
} }
} }
@ -2462,7 +2521,7 @@ func TestCatchUpRetriesOnBadBAL(t *testing.T) {
// makeStorageTrieFromSlots builds a storage trie for owner from raw slot // makeStorageTrieFromSlots builds a storage trie for owner from raw slot
// key->value pairs, using the exact on-disk encoding the flat snapshot and the // key->value pairs, using the exact on-disk encoding the flat snapshot and the
// trie rebuild expect: each leaf is keyed by keccak256(slotKey) and its value is // trie generation expect: each leaf is keyed by keccak256(slotKey) and its value is
// rlp(TrimLeftZeroes(value)). Zero-valued slots are skipped (an unset slot has // rlp(TrimLeftZeroes(value)). Zero-valued slots are skipped (an unset slot has
// no leaf). It returns the storage root, the dirty node set, and the sorted // no leaf). It returns the storage root, the dirty node set, and the sorted
// snapshot leaves (which a test peer serves verbatim). // snapshot leaves (which a test peer serves verbatim).
@ -2529,12 +2588,12 @@ func makeStateWithStorageContract(scheme string, plain []*kv, contractAddr commo
// slot, an overwrite of an existing slot, a write of zero (deletion), and a // slot, an overwrite of an existing slot, a write of zero (deletion), and a
// multi-tx write where the post-block value wins. // multi-tx write where the post-block value wins.
// //
// It fully syncs pivot A (flat-state download + trie rebuild), then moves the // It fully syncs pivot A (flat-state download + trie generation), then moves the
// pivot to A+1. The move triggers catchUp, which fetches the A+1 BAL, applies // pivot to A+1. The move triggers catchUp, which fetches the A+1 BAL, applies
// the storage diffs to the flat state, and rebuilds the trie. The rebuild // the storage diffs to the flat state, and generates the trie. The generation
// verifies the recomputed root against the pivot's expected post-catch-up root, // verifies the recomputed root against the pivot's expected post-catch-up root,
// so a successful Sync proves the storage mutations were applied in the exact // so a successful Sync proves the storage mutations were applied in the exact
// encoding the trie rebuild consumes. verifyTrie re-walks the result as an // encoding the trie generation consumes. verifyTrie re-walks the result as an
// independent confirmation. // independent confirmation.
func TestCatchUpAppliesStorageBALs(t *testing.T) { func TestCatchUpAppliesStorageBALs(t *testing.T) {
t.Parallel() t.Parallel()
@ -2620,7 +2679,7 @@ func testCatchUpAppliesStorageBALs(t *testing.T, scheme string) {
// Sync, so the follow-up Sync's reorg check sees A as still-canonical and // Sync, so the follow-up Sync's reorg check sees A as still-canonical and
// runs catchUp instead of resetting. The A+1 header carries the BAL hash // runs catchUp instead of resetting. The A+1 header carries the BAL hash
// (verified during catch-up) and the expected post-catch-up state root // (verified during catch-up) and the expected post-catch-up state root
// (verified by the trie rebuild). // (verified by the trie generation).
db := rawdb.NewMemoryDatabase() db := rawdb.NewMemoryDatabase()
numA := uint64(128) numA := uint64(128)
emptyH := common.Hash{} emptyH := common.Hash{}
@ -2644,7 +2703,7 @@ func testCatchUpAppliesStorageBALs(t *testing.T, scheme string) {
rawdb.WriteHeader(db, hdrB) rawdb.WriteHeader(db, hdrB)
rawdb.WriteCanonicalHash(db, hdrB.Hash(), numA+1) rawdb.WriteCanonicalHash(db, hdrB.Hash(), numA+1)
// Sync 1: full flat-state download + trie rebuild against pivot A. // Sync 1: full flat-state download + trie generation against pivot A.
{ {
var ( var (
once sync.Once once sync.Once
@ -2665,7 +2724,7 @@ func testCatchUpAppliesStorageBALs(t *testing.T, scheme string) {
} }
close(done) close(done)
} }
// Sanity: the rebuilt trie for pivot A is complete and matches rootA. This // Sanity: the generated trie for pivot A is complete and matches rootA. This
// also confirms the test fixture itself is internally consistent. // also confirms the test fixture itself is internally consistent.
verifyTrie(scheme, db, rootA, t) verifyTrie(scheme, db, rootA, t)
@ -2690,6 +2749,12 @@ func testCatchUpAppliesStorageBALs(t *testing.T, scheme string) {
if err := syncer.Sync(hdrB, cancel); err != nil { if err := syncer.Sync(hdrB, cancel); err != nil {
t.Fatalf("pivot A+1 catch-up sync failed: %v", err) t.Fatalf("pivot A+1 catch-up sync failed: %v", err)
} }
// The freeze must re-arm on a pivot-moved cycle too, the downloader
// relies on it from download completion until commit, and it must
// point at the new pivot the catch-up rolled forward to.
if frozen := syncer.FrozenPivot(); frozen == nil || frozen.Hash() != hdrB.Hash() {
t.Fatal("pivot not frozen at the new header after catch-up sync")
}
close(done) close(done)
} }

View file

@ -337,7 +337,7 @@ func hashRanges(total int) [][2]common.Hash {
return ranges return ranges
} }
// GenerateTrie rebuilds all tries (storage + account) from flat snapshot // GenerateTrie builds all tries (storage + account) from flat snapshot
// data in the database. The account hash space is partitioned into 16 // data in the database. The account hash space is partitioned into 16
// slices aligned with the first-nibble branching of the MPT root. Each // slices aligned with the first-nibble branching of the MPT root. Each
// partition is processed by its own goroutine, which walks its slice, // partition is processed by its own goroutine, which walks its slice,
@ -346,10 +346,8 @@ func hashRanges(total int) [][2]common.Hash {
// trie. Once every partition has produced its subtree root, the top-level // trie. Once every partition has produced its subtree root, the top-level
// branch is assembled and its hash verified against the expected root. // branch is assembled and its hash verified against the expected root.
// //
// Resume: on entry, any partition that has a "done" marker from a // Generation is all or nothing: an interrupted run leaves no resume
// previous run is skipped. Its subtree blob is read from the marker // state and the next run builds every partition from scratch.
// and handed to assembleRoot directly. On a mid-run crash, only the
// in-flight partition(s) are redone.
func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-chan struct{}) (GenerateStats, error) { func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-chan struct{}) (GenerateStats, error) {
var ( var (
start = time.Now() start = time.Now()
@ -366,9 +364,8 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c
go tickProgress(progressDone, start, &scanned, &updated, &progress) go tickProgress(progressDone, start, &scanned, &updated, &progress)
defer close(progressDone) defer close(progressDone)
// For each partition, either skip (prior done marker found) or run // Run every partition concurrently, each producing the subtree root
// it. Prior runs can leave the partition's raw root blob in the done // blob that assembleRoot needs.
// marker. We recover it here so assembleRoot has everything it needs.
var ( var (
ranges = hashRanges(numPartitions) ranges = hashRanges(numPartitions)
eg, ctx = errgroup.WithContext(context.Background()) eg, ctx = errgroup.WithContext(context.Background())
@ -376,11 +373,6 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c
for i, r := range ranges { for i, r := range ranges {
partition := byte(i) partition := byte(i)
rangeStart, rangeEnd := r[0], r[1] rangeStart, rangeEnd := r[0], r[1]
if blob, ok := rawdb.ReadGenerateTriePartitionDone(db, partition); ok {
partitionBlobs[partition] = blob
progress[partition].Store(partitionFinished)
continue
}
eg.Go(func() error { eg.Go(func() error {
start := time.Now() start := time.Now()
blob, err := generatePartition(ctx, cancel, db, scheme, partition, rangeStart, rangeEnd, &scanned, &updated, &deleted, &progress[partition]) blob, err := generatePartition(ctx, cancel, db, scheme, partition, rangeStart, rangeEnd, &scanned, &updated, &deleted, &progress[partition])
@ -391,11 +383,6 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c
progress[partition].Store(partitionFinished) progress[partition].Store(partitionFinished)
partitionBlobs[partition] = blob partitionBlobs[partition] = blob
// Record completion only after the partition's batch has
// flushed inside generatePartition, so this marker appears
// on disk only when every write the partition did is durable.
rawdb.WriteGenerateTriePartitionDone(db, partition, blob)
return nil return nil
}) })
} }
@ -405,9 +392,8 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c
return GenerateStats{}, err return GenerateStats{}, err
} }
// Assemble the top-level root from the partition blobs, verify it // Assemble the top-level root from the partition blobs and verify it
// matches the expected root, and clear all partition markers on // matches the expected root.
// success.
got, err := assembleRoot(db, scheme, partitionBlobs) got, err := assembleRoot(db, scheme, partitionBlobs)
if err != nil { if err != nil {
return GenerateStats{}, fmt.Errorf("assemble root: %w", err) return GenerateStats{}, fmt.Errorf("assemble root: %w", err)
@ -415,15 +401,6 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c
if got != root { if got != root {
return GenerateStats{}, fmt.Errorf("state root mismatch: got %x, want %x", got, root) return GenerateStats{}, fmt.Errorf("state root mismatch: got %x, want %x", got, root)
} }
// Clear the partition progress marker, ending the generation process.
batch := db.NewBatch()
for i := range numPartitions {
rawdb.DeleteGenerateTriePartitionDone(batch, byte(i))
}
if err := batch.Write(); err != nil {
return GenerateStats{}, fmt.Errorf("clear partition markers: %w", err)
}
log.Info("Generated state trie", "scanned", scanned.Load(), "updated", updated.Load(), "dangling-slots", deleted.Load(), "elapsed", common.PrettyDuration(time.Since(start))) log.Info("Generated state trie", "scanned", scanned.Load(), "updated", updated.Load(), "dangling-slots", deleted.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
return GenerateStats{ return GenerateStats{
Scanned: scanned.Load(), Scanned: scanned.Load(),

View file

@ -370,97 +370,6 @@ func TestGenerateTrieOrphanStorage(t *testing.T) {
} }
} }
// TestGenerateTriePartialResume proves that the resume path actually
// fires when a partition's done marker is present.
func TestGenerateTriePartialResume(t *testing.T) {
// Build the account set. Empty storage keeps the test focused on the
// account-trie resume path.
const n = 200
accounts := make([]testAccount, 0, n)
for i := 0; i < n; i++ {
addr := common.BytesToAddress([]byte{byte(i >> 8), byte(i)})
hash := crypto.Keccak256Hash(addr[:])
accounts = append(accounts, testAccount{
hash: hash,
account: types.StateAccount{
Nonce: uint64(i),
Balance: uint256.NewInt(uint64(i + 1)),
Root: types.EmptyRootHash,
CodeHash: types.EmptyCodeHash.Bytes(),
},
})
}
expectedRoot := buildExpectedRoot(t, accounts)
for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} {
t.Run(scheme, func(t *testing.T) {
db := rawdb.NewMemoryDatabase()
// Step 1: write the account snapshots for this run.
for _, a := range accounts {
rawdb.WriteAccountSnapshot(db, a.hash, types.SlimAccountRLP(a.account))
}
// Step 2: run every partition once to populate trie nodes on disk
// and capture each partition's raw root blob.
var (
scanned atomic.Int64
updated atomic.Int64
deleted atomic.Int64
)
ranges := hashRanges(numPartitions)
blobs := make([][]byte, numPartitions)
for i, r := range ranges {
var pos atomic.Uint64
blob, err := generatePartition(context.Background(), nil, db, scheme, byte(i), r[0], r[1], &scanned, &updated, &deleted, &pos)
if err != nil {
t.Fatalf("pre-run partition %d: %v", i, err)
}
blobs[i] = blob
}
// Step 3: pre-seed done markers for even partitions only.
for i := 0; i < numPartitions; i++ {
if i%2 == 0 {
rawdb.WriteGenerateTriePartitionDone(db, byte(i), blobs[i])
}
}
// Step 4: delete flat-state account snapshots for every account that
// lives in an even partition. After this, rerunning generatePartition for
// an even partition would find no accounts and produce a nil blob,
// so a correct final root requires the resume path.
numDeleted := 0
for _, a := range accounts {
if (a.hash[0]>>4)%2 == 0 {
rawdb.DeleteAccountSnapshot(db, a.hash)
numDeleted++
}
}
if numDeleted == 0 {
t.Fatal("test setup failure: no accounts fell in even partitions")
}
// Step 5: run GenerateTrie. Success implies resume actually consulted
// the markers. Without it, even partitions would yield nil blobs and
// the root check inside GenerateTrie would fail.
if _, err := GenerateTrie(db, scheme, expectedRoot, nil); err != nil {
t.Fatalf("partial-resume GenerateTrie failed: %v", err)
}
// All markers cleared on success.
for i := 0; i < numPartitions; i++ {
if _, ok := rawdb.ReadGenerateTriePartitionDone(db, byte(i)); ok {
t.Errorf("partition %d marker not cleared after successful resume", i)
}
}
if scheme == rawdb.PathScheme {
assertCanonicalNodes(t, db, accounts)
}
})
}
}
// TestHashRanges checks that hashRanges fully and contiguously covers the // TestHashRanges checks that hashRanges fully and contiguously covers the
// 256-bit hash space, with the last range absorbing the rounding remainder. // 256-bit hash space, with the last range absorbing the rounding remainder.
func TestHashRanges(t *testing.T) { func TestHashRanges(t *testing.T) {