eth/protocols/snap: rename loadSyncStatus/saveSyncStatus to v1 variants and move to sync_v1.go

This commit is contained in:
jonny rhea 2026-05-13 15:00:28 -05:00
parent 3618763da5
commit 0e9f9ff1c8
2 changed files with 175 additions and 174 deletions

View file

@ -18,7 +18,6 @@ package snap
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"math/big"
@ -538,7 +537,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
s.startTime = time.Now()
}
// Retrieve the previous sync status from LevelDB and abort if already synced
s.loadSyncStatus()
s.loadSyncStatusV1()
if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
log.Debug("Snapshot sync already completed")
return nil
@ -548,7 +547,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
s.forwardAccountTask(task)
}
s.cleanAccountTasks()
s.saveSyncStatus()
s.saveSyncStatusV1()
}()
log.Debug("Starting snapshot sync cycle", "root", root)
@ -687,177 +686,6 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
}
}
// loadSyncStatus retrieves a previously aborted sync status from the database,
// or generates a fresh one if none is available.
func (s *Syncer) loadSyncStatus() {
var progress SyncProgress
if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil {
if err := json.Unmarshal(status, &progress); err != nil {
log.Error("Failed to decode snap sync status", "err", err)
} else {
for _, task := range progress.Tasks {
log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last)
}
s.tasks = progress.Tasks
for _, task := range s.tasks {
// Restore the completed storages
task.stateCompleted = make(map[common.Hash]struct{})
for _, hash := range task.StorageCompleted {
task.stateCompleted[hash] = struct{}{}
}
task.StorageCompleted = nil
// Allocate batch for account trie generation
task.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
if s.scheme == rawdb.HashScheme {
task.genTrie = newHashTrie(task.genBatch)
}
if s.scheme == rawdb.PathScheme {
task.genTrie = newPathTrie(common.Hash{}, task.Next != common.Hash{}, s.db, task.genBatch)
}
// Restore leftover storage tasks
for accountHash, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
subtask.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
if s.scheme == rawdb.HashScheme {
subtask.genTrie = newHashTrie(subtask.genBatch)
}
if s.scheme == rawdb.PathScheme {
subtask.genTrie = newPathTrie(accountHash, subtask.Next != common.Hash{}, s.db, subtask.genBatch)
}
}
}
}
s.lock.Lock()
defer s.lock.Unlock()
s.snapped = len(s.tasks) == 0
s.accountSynced = progress.AccountSynced
s.accountBytes = progress.AccountBytes
s.bytecodeSynced = progress.BytecodeSynced
s.bytecodeBytes = progress.BytecodeBytes
s.storageSynced = progress.StorageSynced
s.storageBytes = progress.StorageBytes
s.trienodeHealSynced = progress.TrienodeHealSynced
s.trienodeHealBytes = progress.TrienodeHealBytes
s.bytecodeHealSynced = progress.BytecodeHealSynced
s.bytecodeHealBytes = progress.BytecodeHealBytes
return
}
}
// Either we've failed to decode the previous state, or there was none.
// Start a fresh sync by chunking up the account range and scheduling
// them for retrieval.
s.tasks = nil
s.accountSynced, s.accountBytes = 0, 0
s.bytecodeSynced, s.bytecodeBytes = 0, 0
s.storageSynced, s.storageBytes = 0, 0
s.trienodeHealSynced, s.trienodeHealBytes = 0, 0
s.bytecodeHealSynced, s.bytecodeHealBytes = 0, 0
var next common.Hash
step := new(big.Int).Sub(
new(big.Int).Div(
new(big.Int).Exp(common.Big2, common.Big256, nil),
big.NewInt(int64(accountConcurrency)),
), common.Big1,
)
for i := 0; i < accountConcurrency; i++ {
last := common.BigToHash(new(big.Int).Add(next.Big(), step))
if i == accountConcurrency-1 {
// Make sure we don't overflow if the step is not a proper divisor
last = common.MaxHash
}
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
var tr genTrie
if s.scheme == rawdb.HashScheme {
tr = newHashTrie(batch)
}
if s.scheme == rawdb.PathScheme {
tr = newPathTrie(common.Hash{}, next != common.Hash{}, s.db, batch)
}
s.tasks = append(s.tasks, &accountTask{
Next: next,
Last: last,
SubTasks: make(map[common.Hash][]*storageTask),
genBatch: batch,
stateCompleted: make(map[common.Hash]struct{}),
genTrie: tr,
})
log.Debug("Created account sync task", "from", next, "last", last)
next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
}
}
// saveSyncStatus marshals the remaining sync tasks into leveldb.
func (s *Syncer) saveSyncStatus() {
// Serialize any partial progress to disk before spinning down
for _, task := range s.tasks {
// Claim the right boundary as incomplete before flushing the
// accumulated nodes in batch, the nodes on right boundary
// will be discarded and cleaned up by this call.
task.genTrie.commit(false)
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist account slots", "err", err)
}
for _, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
// Same for account trie, discard and cleanup the
// incomplete right boundary.
subtask.genTrie.commit(false)
if err := subtask.genBatch.Write(); err != nil {
log.Error("Failed to persist storage slots", "err", err)
}
}
}
// Save the account hashes of completed storage.
task.StorageCompleted = make([]common.Hash, 0, len(task.stateCompleted))
for hash := range task.stateCompleted {
task.StorageCompleted = append(task.StorageCompleted, hash)
}
if len(task.StorageCompleted) > 0 {
log.Debug("Leftover completed storages", "number", len(task.StorageCompleted), "next", task.Next, "last", task.Last)
}
}
// Store the actual progress markers
progress := &SyncProgress{
Tasks: s.tasks,
AccountSynced: s.accountSynced,
AccountBytes: s.accountBytes,
BytecodeSynced: s.bytecodeSynced,
BytecodeBytes: s.bytecodeBytes,
StorageSynced: s.storageSynced,
StorageBytes: s.storageBytes,
TrienodeHealSynced: s.trienodeHealSynced,
TrienodeHealBytes: s.trienodeHealBytes,
BytecodeHealSynced: s.bytecodeHealSynced,
BytecodeHealBytes: s.bytecodeHealBytes,
}
status, err := json.Marshal(progress)
if err != nil {
panic(err) // This can only fail during implementation
}
rawdb.WriteSnapshotSyncStatus(s.db, status)
}
// Progress returns the snap sync status statistics.
func (s *Syncer) Progress() (*SyncProgress, *SyncPending) {
s.lock.Lock()

View file

@ -18,9 +18,11 @@ package snap
import (
"bytes"
"encoding/json"
"errors"
"fmt"
gomath "math"
"math/big"
"math/rand"
"sort"
"time"
@ -889,3 +891,174 @@ func (s *Syncer) reportHealProgress(force bool) {
log.Info("Syncing: state healing in progress", "accounts", accounts, "slots", storage,
"codes", bytecode, "nodes", trienode, "pending", s.healer.scheduler.Pending())
}
// loadSyncStatusV1 retrieves a previously aborted sync status from the database,
// or generates a fresh one if none is available.
func (s *Syncer) loadSyncStatusV1() {
var progress SyncProgress
if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil {
if err := json.Unmarshal(status, &progress); err != nil {
log.Error("Failed to decode snap sync status", "err", err)
} else {
for _, task := range progress.Tasks {
log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last)
}
s.tasks = progress.Tasks
for _, task := range s.tasks {
// Restore the completed storages
task.stateCompleted = make(map[common.Hash]struct{})
for _, hash := range task.StorageCompleted {
task.stateCompleted[hash] = struct{}{}
}
task.StorageCompleted = nil
// Allocate batch for account trie generation
task.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
if s.scheme == rawdb.HashScheme {
task.genTrie = newHashTrie(task.genBatch)
}
if s.scheme == rawdb.PathScheme {
task.genTrie = newPathTrie(common.Hash{}, task.Next != common.Hash{}, s.db, task.genBatch)
}
// Restore leftover storage tasks
for accountHash, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
subtask.genBatch = ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
if s.scheme == rawdb.HashScheme {
subtask.genTrie = newHashTrie(subtask.genBatch)
}
if s.scheme == rawdb.PathScheme {
subtask.genTrie = newPathTrie(accountHash, subtask.Next != common.Hash{}, s.db, subtask.genBatch)
}
}
}
}
s.lock.Lock()
defer s.lock.Unlock()
s.snapped = len(s.tasks) == 0
s.accountSynced = progress.AccountSynced
s.accountBytes = progress.AccountBytes
s.bytecodeSynced = progress.BytecodeSynced
s.bytecodeBytes = progress.BytecodeBytes
s.storageSynced = progress.StorageSynced
s.storageBytes = progress.StorageBytes
s.trienodeHealSynced = progress.TrienodeHealSynced
s.trienodeHealBytes = progress.TrienodeHealBytes
s.bytecodeHealSynced = progress.BytecodeHealSynced
s.bytecodeHealBytes = progress.BytecodeHealBytes
return
}
}
// Either we've failed to decode the previous state, or there was none.
// Start a fresh sync by chunking up the account range and scheduling
// them for retrieval.
s.tasks = nil
s.accountSynced, s.accountBytes = 0, 0
s.bytecodeSynced, s.bytecodeBytes = 0, 0
s.storageSynced, s.storageBytes = 0, 0
s.trienodeHealSynced, s.trienodeHealBytes = 0, 0
s.bytecodeHealSynced, s.bytecodeHealBytes = 0, 0
var next common.Hash
step := new(big.Int).Sub(
new(big.Int).Div(
new(big.Int).Exp(common.Big2, common.Big256, nil),
big.NewInt(int64(accountConcurrency)),
), common.Big1,
)
for i := 0; i < accountConcurrency; i++ {
last := common.BigToHash(new(big.Int).Add(next.Big(), step))
if i == accountConcurrency-1 {
// Make sure we don't overflow if the step is not a proper divisor
last = common.MaxHash
}
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
var tr genTrie
if s.scheme == rawdb.HashScheme {
tr = newHashTrie(batch)
}
if s.scheme == rawdb.PathScheme {
tr = newPathTrie(common.Hash{}, next != common.Hash{}, s.db, batch)
}
s.tasks = append(s.tasks, &accountTask{
Next: next,
Last: last,
SubTasks: make(map[common.Hash][]*storageTask),
genBatch: batch,
stateCompleted: make(map[common.Hash]struct{}),
genTrie: tr,
})
log.Debug("Created account sync task", "from", next, "last", last)
next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
}
}
// saveSyncStatusV1 marshals the remaining sync tasks into leveldb.
func (s *Syncer) saveSyncStatusV1() {
// Serialize any partial progress to disk before spinning down
for _, task := range s.tasks {
// Claim the right boundary as incomplete before flushing the
// accumulated nodes in batch, the nodes on right boundary
// will be discarded and cleaned up by this call.
task.genTrie.commit(false)
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist account slots", "err", err)
}
for _, subtasks := range task.SubTasks {
for _, subtask := range subtasks {
// Same for account trie, discard and cleanup the
// incomplete right boundary.
subtask.genTrie.commit(false)
if err := subtask.genBatch.Write(); err != nil {
log.Error("Failed to persist storage slots", "err", err)
}
}
}
// Save the account hashes of completed storage.
task.StorageCompleted = make([]common.Hash, 0, len(task.stateCompleted))
for hash := range task.stateCompleted {
task.StorageCompleted = append(task.StorageCompleted, hash)
}
if len(task.StorageCompleted) > 0 {
log.Debug("Leftover completed storages", "number", len(task.StorageCompleted), "next", task.Next, "last", task.Last)
}
}
// Store the actual progress markers
progress := &SyncProgress{
Tasks: s.tasks,
AccountSynced: s.accountSynced,
AccountBytes: s.accountBytes,
BytecodeSynced: s.bytecodeSynced,
BytecodeBytes: s.bytecodeBytes,
StorageSynced: s.storageSynced,
StorageBytes: s.storageBytes,
TrienodeHealSynced: s.trienodeHealSynced,
TrienodeHealBytes: s.trienodeHealBytes,
BytecodeHealSynced: s.bytecodeHealSynced,
BytecodeHealBytes: s.bytecodeHealBytes,
}
status, err := json.Marshal(progress)
if err != nil {
panic(err) // This can only fail during implementation
}
rawdb.WriteSnapshotSyncStatus(s.db, status)
}