From 0e9f9ff1c814f64d81ea81c65dadbcadedbf9ce7 Mon Sep 17 00:00:00 2001 From: jonny rhea <5555162+jrhea@users.noreply.github.com> Date: Wed, 13 May 2026 15:00:28 -0500 Subject: [PATCH] eth/protocols/snap: rename loadSyncStatus/saveSyncStatus to v1 variants and move to sync_v1.go --- eth/protocols/snap/sync.go | 176 +--------------------------------- eth/protocols/snap/sync_v1.go | 173 +++++++++++++++++++++++++++++++++ 2 files changed, 175 insertions(+), 174 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index ff52661f13..8ab40ffc41 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -18,7 +18,6 @@ package snap import ( "bytes" - "encoding/json" "errors" "fmt" "math/big" @@ -538,7 +537,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { s.startTime = time.Now() } // Retrieve the previous sync status from LevelDB and abort if already synced - s.loadSyncStatus() + s.loadSyncStatusV1() if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 { log.Debug("Snapshot sync already completed") return nil @@ -548,7 +547,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { s.forwardAccountTask(task) } s.cleanAccountTasks() - s.saveSyncStatus() + s.saveSyncStatusV1() }() log.Debug("Starting snapshot sync cycle", "root", root) @@ -687,177 +686,6 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { } } -// loadSyncStatus retrieves a previously aborted sync status from the database, -// or generates a fresh one if none is available. -func (s *Syncer) loadSyncStatus() { - var progress SyncProgress - - if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil { - if err := json.Unmarshal(status, &progress); err != nil { - log.Error("Failed to decode snap sync status", "err", err) - } else { - for _, task := range progress.Tasks { - log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last) - } - s.tasks = progress.Tasks - for _, task := range s.tasks { - // Restore the completed storages - task.stateCompleted = make(map[common.Hash]struct{}) - for _, hash := range task.StorageCompleted { - task.stateCompleted[hash] = struct{}{} - } - task.StorageCompleted = nil - - // Allocate batch for account trie generation - task.genBatch = ethdb.HookedBatch{ - Batch: s.db.NewBatch(), - OnPut: func(key []byte, value []byte) { - s.accountBytes += common.StorageSize(len(key) + len(value)) - }, - } - if s.scheme == rawdb.HashScheme { - task.genTrie = newHashTrie(task.genBatch) - } - if s.scheme == rawdb.PathScheme { - task.genTrie = newPathTrie(common.Hash{}, task.Next != common.Hash{}, s.db, task.genBatch) - } - // Restore leftover storage tasks - for accountHash, subtasks := range task.SubTasks { - for _, subtask := range subtasks { - subtask.genBatch = ethdb.HookedBatch{ - Batch: s.db.NewBatch(), - OnPut: func(key []byte, value []byte) { - s.storageBytes += common.StorageSize(len(key) + len(value)) - }, - } - if s.scheme == rawdb.HashScheme { - subtask.genTrie = newHashTrie(subtask.genBatch) - } - if s.scheme == rawdb.PathScheme { - subtask.genTrie = newPathTrie(accountHash, subtask.Next != common.Hash{}, s.db, subtask.genBatch) - } - } - } - } - s.lock.Lock() - defer s.lock.Unlock() - - s.snapped = len(s.tasks) == 0 - - s.accountSynced = progress.AccountSynced - s.accountBytes = progress.AccountBytes - s.bytecodeSynced = progress.BytecodeSynced - s.bytecodeBytes = progress.BytecodeBytes - s.storageSynced = progress.StorageSynced - s.storageBytes = progress.StorageBytes - - s.trienodeHealSynced = progress.TrienodeHealSynced - s.trienodeHealBytes = progress.TrienodeHealBytes - s.bytecodeHealSynced = progress.BytecodeHealSynced - s.bytecodeHealBytes = progress.BytecodeHealBytes - return - } - } - // Either we've failed to decode the previous state, or there was none. - // Start a fresh sync by chunking up the account range and scheduling - // them for retrieval. - s.tasks = nil - s.accountSynced, s.accountBytes = 0, 0 - s.bytecodeSynced, s.bytecodeBytes = 0, 0 - s.storageSynced, s.storageBytes = 0, 0 - s.trienodeHealSynced, s.trienodeHealBytes = 0, 0 - s.bytecodeHealSynced, s.bytecodeHealBytes = 0, 0 - - var next common.Hash - step := new(big.Int).Sub( - new(big.Int).Div( - new(big.Int).Exp(common.Big2, common.Big256, nil), - big.NewInt(int64(accountConcurrency)), - ), common.Big1, - ) - for i := 0; i < accountConcurrency; i++ { - last := common.BigToHash(new(big.Int).Add(next.Big(), step)) - if i == accountConcurrency-1 { - // Make sure we don't overflow if the step is not a proper divisor - last = common.MaxHash - } - batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), - OnPut: func(key []byte, value []byte) { - s.accountBytes += common.StorageSize(len(key) + len(value)) - }, - } - var tr genTrie - if s.scheme == rawdb.HashScheme { - tr = newHashTrie(batch) - } - if s.scheme == rawdb.PathScheme { - tr = newPathTrie(common.Hash{}, next != common.Hash{}, s.db, batch) - } - s.tasks = append(s.tasks, &accountTask{ - Next: next, - Last: last, - SubTasks: make(map[common.Hash][]*storageTask), - genBatch: batch, - stateCompleted: make(map[common.Hash]struct{}), - genTrie: tr, - }) - log.Debug("Created account sync task", "from", next, "last", last) - next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1)) - } -} - -// saveSyncStatus marshals the remaining sync tasks into leveldb. -func (s *Syncer) saveSyncStatus() { - // Serialize any partial progress to disk before spinning down - for _, task := range s.tasks { - // Claim the right boundary as incomplete before flushing the - // accumulated nodes in batch, the nodes on right boundary - // will be discarded and cleaned up by this call. - task.genTrie.commit(false) - if err := task.genBatch.Write(); err != nil { - log.Error("Failed to persist account slots", "err", err) - } - for _, subtasks := range task.SubTasks { - for _, subtask := range subtasks { - // Same for account trie, discard and cleanup the - // incomplete right boundary. - subtask.genTrie.commit(false) - if err := subtask.genBatch.Write(); err != nil { - log.Error("Failed to persist storage slots", "err", err) - } - } - } - // Save the account hashes of completed storage. - task.StorageCompleted = make([]common.Hash, 0, len(task.stateCompleted)) - for hash := range task.stateCompleted { - task.StorageCompleted = append(task.StorageCompleted, hash) - } - if len(task.StorageCompleted) > 0 { - log.Debug("Leftover completed storages", "number", len(task.StorageCompleted), "next", task.Next, "last", task.Last) - } - } - // Store the actual progress markers - progress := &SyncProgress{ - Tasks: s.tasks, - AccountSynced: s.accountSynced, - AccountBytes: s.accountBytes, - BytecodeSynced: s.bytecodeSynced, - BytecodeBytes: s.bytecodeBytes, - StorageSynced: s.storageSynced, - StorageBytes: s.storageBytes, - TrienodeHealSynced: s.trienodeHealSynced, - TrienodeHealBytes: s.trienodeHealBytes, - BytecodeHealSynced: s.bytecodeHealSynced, - BytecodeHealBytes: s.bytecodeHealBytes, - } - status, err := json.Marshal(progress) - if err != nil { - panic(err) // This can only fail during implementation - } - rawdb.WriteSnapshotSyncStatus(s.db, status) -} - // Progress returns the snap sync status statistics. func (s *Syncer) Progress() (*SyncProgress, *SyncPending) { s.lock.Lock() diff --git a/eth/protocols/snap/sync_v1.go b/eth/protocols/snap/sync_v1.go index d6cd5ab5ca..7f8fafb0d6 100644 --- a/eth/protocols/snap/sync_v1.go +++ b/eth/protocols/snap/sync_v1.go @@ -18,9 +18,11 @@ package snap import ( "bytes" + "encoding/json" "errors" "fmt" gomath "math" + "math/big" "math/rand" "sort" "time" @@ -889,3 +891,174 @@ func (s *Syncer) reportHealProgress(force bool) { log.Info("Syncing: state healing in progress", "accounts", accounts, "slots", storage, "codes", bytecode, "nodes", trienode, "pending", s.healer.scheduler.Pending()) } + +// loadSyncStatusV1 retrieves a previously aborted sync status from the database, +// or generates a fresh one if none is available. +func (s *Syncer) loadSyncStatusV1() { + var progress SyncProgress + + if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil { + if err := json.Unmarshal(status, &progress); err != nil { + log.Error("Failed to decode snap sync status", "err", err) + } else { + for _, task := range progress.Tasks { + log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last) + } + s.tasks = progress.Tasks + for _, task := range s.tasks { + // Restore the completed storages + task.stateCompleted = make(map[common.Hash]struct{}) + for _, hash := range task.StorageCompleted { + task.stateCompleted[hash] = struct{}{} + } + task.StorageCompleted = nil + + // Allocate batch for account trie generation + task.genBatch = ethdb.HookedBatch{ + Batch: s.db.NewBatch(), + OnPut: func(key []byte, value []byte) { + s.accountBytes += common.StorageSize(len(key) + len(value)) + }, + } + if s.scheme == rawdb.HashScheme { + task.genTrie = newHashTrie(task.genBatch) + } + if s.scheme == rawdb.PathScheme { + task.genTrie = newPathTrie(common.Hash{}, task.Next != common.Hash{}, s.db, task.genBatch) + } + // Restore leftover storage tasks + for accountHash, subtasks := range task.SubTasks { + for _, subtask := range subtasks { + subtask.genBatch = ethdb.HookedBatch{ + Batch: s.db.NewBatch(), + OnPut: func(key []byte, value []byte) { + s.storageBytes += common.StorageSize(len(key) + len(value)) + }, + } + if s.scheme == rawdb.HashScheme { + subtask.genTrie = newHashTrie(subtask.genBatch) + } + if s.scheme == rawdb.PathScheme { + subtask.genTrie = newPathTrie(accountHash, subtask.Next != common.Hash{}, s.db, subtask.genBatch) + } + } + } + } + s.lock.Lock() + defer s.lock.Unlock() + + s.snapped = len(s.tasks) == 0 + + s.accountSynced = progress.AccountSynced + s.accountBytes = progress.AccountBytes + s.bytecodeSynced = progress.BytecodeSynced + s.bytecodeBytes = progress.BytecodeBytes + s.storageSynced = progress.StorageSynced + s.storageBytes = progress.StorageBytes + + s.trienodeHealSynced = progress.TrienodeHealSynced + s.trienodeHealBytes = progress.TrienodeHealBytes + s.bytecodeHealSynced = progress.BytecodeHealSynced + s.bytecodeHealBytes = progress.BytecodeHealBytes + return + } + } + // Either we've failed to decode the previous state, or there was none. + // Start a fresh sync by chunking up the account range and scheduling + // them for retrieval. + s.tasks = nil + s.accountSynced, s.accountBytes = 0, 0 + s.bytecodeSynced, s.bytecodeBytes = 0, 0 + s.storageSynced, s.storageBytes = 0, 0 + s.trienodeHealSynced, s.trienodeHealBytes = 0, 0 + s.bytecodeHealSynced, s.bytecodeHealBytes = 0, 0 + + var next common.Hash + step := new(big.Int).Sub( + new(big.Int).Div( + new(big.Int).Exp(common.Big2, common.Big256, nil), + big.NewInt(int64(accountConcurrency)), + ), common.Big1, + ) + for i := 0; i < accountConcurrency; i++ { + last := common.BigToHash(new(big.Int).Add(next.Big(), step)) + if i == accountConcurrency-1 { + // Make sure we don't overflow if the step is not a proper divisor + last = common.MaxHash + } + batch := ethdb.HookedBatch{ + Batch: s.db.NewBatch(), + OnPut: func(key []byte, value []byte) { + s.accountBytes += common.StorageSize(len(key) + len(value)) + }, + } + var tr genTrie + if s.scheme == rawdb.HashScheme { + tr = newHashTrie(batch) + } + if s.scheme == rawdb.PathScheme { + tr = newPathTrie(common.Hash{}, next != common.Hash{}, s.db, batch) + } + s.tasks = append(s.tasks, &accountTask{ + Next: next, + Last: last, + SubTasks: make(map[common.Hash][]*storageTask), + genBatch: batch, + stateCompleted: make(map[common.Hash]struct{}), + genTrie: tr, + }) + log.Debug("Created account sync task", "from", next, "last", last) + next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1)) + } +} + +// saveSyncStatusV1 marshals the remaining sync tasks into leveldb. +func (s *Syncer) saveSyncStatusV1() { + // Serialize any partial progress to disk before spinning down + for _, task := range s.tasks { + // Claim the right boundary as incomplete before flushing the + // accumulated nodes in batch, the nodes on right boundary + // will be discarded and cleaned up by this call. + task.genTrie.commit(false) + if err := task.genBatch.Write(); err != nil { + log.Error("Failed to persist account slots", "err", err) + } + for _, subtasks := range task.SubTasks { + for _, subtask := range subtasks { + // Same for account trie, discard and cleanup the + // incomplete right boundary. + subtask.genTrie.commit(false) + if err := subtask.genBatch.Write(); err != nil { + log.Error("Failed to persist storage slots", "err", err) + } + } + } + // Save the account hashes of completed storage. + task.StorageCompleted = make([]common.Hash, 0, len(task.stateCompleted)) + for hash := range task.stateCompleted { + task.StorageCompleted = append(task.StorageCompleted, hash) + } + if len(task.StorageCompleted) > 0 { + log.Debug("Leftover completed storages", "number", len(task.StorageCompleted), "next", task.Next, "last", task.Last) + } + } + // Store the actual progress markers + progress := &SyncProgress{ + Tasks: s.tasks, + AccountSynced: s.accountSynced, + AccountBytes: s.accountBytes, + BytecodeSynced: s.bytecodeSynced, + BytecodeBytes: s.bytecodeBytes, + StorageSynced: s.storageSynced, + StorageBytes: s.storageBytes, + TrienodeHealSynced: s.trienodeHealSynced, + TrienodeHealBytes: s.trienodeHealBytes, + BytecodeHealSynced: s.bytecodeHealSynced, + BytecodeHealBytes: s.bytecodeHealBytes, + } + status, err := json.Marshal(progress) + if err != nil { + panic(err) // This can only fail during implementation + } + rawdb.WriteSnapshotSyncStatus(s.db, status) +}