eth, triedb, internal: add snap/2 sync progress (#35178)

This PR does two things:

- Expose snap/2 specific sync progress fields
- Seed the sync progress after `loadSyncStatus `
This commit is contained in:
rjl493456442 2026-06-17 13:43:51 +08:00 committed by GitHub
parent 1be5da2330
commit 0e810e4984
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 164 additions and 51 deletions

View file

@ -295,12 +295,19 @@ func (d *Downloader) Progress() ethereum.SyncProgress {
SyncedBytecodeBytes: uint64(progress.BytecodeBytes),
SyncedStorage: progress.StorageSynced,
SyncedStorageBytes: uint64(progress.StorageBytes),
// Snap/1 progress fields
HealedTrienodes: progress.TrienodeHealSynced,
HealedTrienodeBytes: uint64(progress.TrienodeHealBytes),
HealedBytecodes: progress.BytecodeHealSynced,
HealedBytecodeBytes: uint64(progress.BytecodeHealBytes),
HealingTrienodes: progress.HealingTrienodes,
HealingBytecode: progress.HealingBytecode,
// Snap/2 progress fields
SyncedAccessLists: progress.AccessListSynced,
TotalAccessLists: progress.AccessListTotal,
TrieGenProgress: progress.TrieGenPercent,
}
}

View file

@ -41,6 +41,11 @@ type Progress struct {
BytecodeHealBytes common.StorageSize
HealingTrienodes uint64
HealingBytecode uint64
// snap/2-specific status. Reported by snap/2 only.
AccessListSynced uint64 // Block access lists fetched during catch-up
AccessListTotal uint64 // Total block access lists to fetch for catch-up
TrieGenPercent uint64 // Trie generation completion, in percent (0..100)
}
// Syncer is the uniform view over the snap/1 (*syncer) and snap/2 (*syncerV2)
@ -139,12 +144,15 @@ type syncerV2Adapter struct{ *syncerV2 }
func (s syncerV2Adapter) Progress() Progress {
progress := s.syncerV2.Progress()
return Progress{
AccountSynced: progress.AccountSynced,
AccountBytes: progress.AccountBytes,
BytecodeSynced: progress.BytecodeSynced,
BytecodeBytes: progress.BytecodeBytes,
StorageSynced: progress.StorageSynced,
StorageBytes: progress.StorageBytes,
AccountSynced: progress.AccountSynced,
AccountBytes: progress.AccountBytes,
BytecodeSynced: progress.BytecodeSynced,
BytecodeBytes: progress.BytecodeBytes,
StorageSynced: progress.StorageSynced,
StorageBytes: progress.StorageBytes,
AccessListSynced: progress.AccessListSynced,
AccessListTotal: progress.AccessListTotal,
TrieGenPercent: progress.TrieGenPercent,
}
}

View file

@ -319,6 +319,10 @@ type syncProgressV2 struct {
BytecodeBytes common.StorageSize // Number of bytecode bytes downloaded
StorageSynced uint64 // Number of storage slots downloaded
StorageBytes common.StorageSize // Number of storage trie bytes persisted to disk
AccessListSynced uint64 `json:"-"` // Block access lists fetched during catch-up
AccessListTotal uint64 `json:"-"` // Total block access lists to fetch for catch-up
TrieGenPercent uint64 `json:"-"` // Trie generation completion, in percent (0..100)
}
// SyncPeerV2 abstracts out the methods required for a peer to be synced against
@ -399,6 +403,10 @@ type syncerV2 struct {
storageSynced uint64 // Number of storage slots downloaded
storageBytes common.StorageSize // Number of storage trie bytes persisted to disk
accessListSynced uint64 // Block access lists fetched so far during catch-up
accessListTotal uint64 // Block access lists to fetch for the current catch-up
genProgress atomic.Uint64 // The live trie-generation progress
extProgress *syncProgressV2 // progress that can be exposed to external caller.
startTime time.Time // Time instance when snapshot sync started
@ -629,8 +637,9 @@ func (s *syncerV2) Sync(target *types.Header, cancel chan struct{}) error {
if err := batch.Write(); err != nil {
return err
}
if _, err := triedb.GenerateTrie(s.db, s.scheme, root, cancel); err != nil {
return err
_, genErr := triedb.GenerateTrieWithProgress(s.db, s.scheme, root, cancel, &s.genProgress)
if genErr != nil {
return genErr
}
log.Info("Trie generation complete", "root", root)
@ -690,15 +699,9 @@ func (s *syncerV2) downloadState(cancel chan struct{}) error {
// Update sync progress
s.lock.Lock()
s.extProgress = &syncProgressV2{
AccountSynced: s.accountSynced,
AccountBytes: s.accountBytes,
BytecodeSynced: s.bytecodeSynced,
BytecodeBytes: s.bytecodeBytes,
StorageSynced: s.storageSynced,
StorageBytes: s.storageBytes,
}
s.refreshProgressLocked()
s.lock.Unlock()
// Wait for something to happen
select {
case <-s.update:
@ -775,6 +778,12 @@ func (s *syncerV2) catchUp(target *types.Header, cancel chan struct{}) error {
s.lock.RUnlock()
log.Info("Starting BAL catch-up", "from", from, "to", to, "blocks", to-from+1)
s.lock.Lock()
s.accessListTotal = to - from + 1
s.accessListSynced = 0
s.refreshProgressLocked()
s.lock.Unlock()
for start := from; start <= to; start += s.catchUpWindow {
select {
case <-cancel:
@ -916,6 +925,10 @@ func (s *syncerV2) fetchAccessLists(hashes []common.Hash, headers map[common.Has
case res := <-accessListResps:
s.processAccessListResponse(res, headers, pending, fetched, refused)
}
s.lock.Lock()
s.accessListSynced += uint64(len(fetched))
s.refreshProgressLocked()
s.lock.Unlock()
}
// Assemble results in input order
results := make([]rlp.RawValue, len(hashes))
@ -1099,6 +1112,11 @@ func (s *syncerV2) loadSyncStatus() {
s.bytecodeBytes = progress.BytecodeBytes
s.storageSynced = progress.StorageSynced
s.storageBytes = progress.StorageBytes
// Seed the externally-exposed snapshot from the restored counters so
// eth_syncing reports real stats during catch-up and trie generation
// after a resume, instead of the zero-valued initial snapshot.
s.refreshProgressLocked()
return
}
}
@ -1174,6 +1192,9 @@ func (s *syncerV2) resetSyncState() {
s.accountSynced, s.accountBytes = 0, 0
s.bytecodeSynced, s.bytecodeBytes = 0, 0
s.storageSynced, s.storageBytes = 0, 0
s.accessListSynced, s.accessListTotal = 0, 0
s.genProgress.Store(0)
s.refreshProgressLocked()
var next common.Hash
step := new(big.Int).Sub(
@ -1238,11 +1259,29 @@ func (s *syncerV2) saveSyncStatusWithDB(db ethdb.KeyValueWriter) {
rawdb.WriteSnapshotSyncStatus(db, status)
}
// refreshProgressLocked rebuilds the externally-exposed progress snapshot from
// the live counters. The caller must hold s.lock.
func (s *syncerV2) refreshProgressLocked() {
s.extProgress = &syncProgressV2{
AccountSynced: s.accountSynced,
AccountBytes: s.accountBytes,
BytecodeSynced: s.bytecodeSynced,
BytecodeBytes: s.bytecodeBytes,
StorageSynced: s.storageSynced,
StorageBytes: s.storageBytes,
AccessListSynced: s.accessListSynced,
AccessListTotal: s.accessListTotal,
}
}
// Progress returns the snap sync status statistics.
func (s *syncerV2) Progress() *syncProgressV2 {
s.lock.Lock()
defer s.lock.Unlock()
return s.extProgress
p := *s.extProgress
p.TrieGenPercent = s.genProgress.Load()
return &p
}
// cleanAccountTasks removes account range retrieval tasks that have already been

View file

@ -839,6 +839,9 @@ type rpcProgress struct {
HealedBytecodeBytes hexutil.Uint64
HealingTrienodes hexutil.Uint64
HealingBytecode hexutil.Uint64
SyncedAccessLists hexutil.Uint64
TotalAccessLists hexutil.Uint64
TrieGenProgress hexutil.Uint64
TxIndexFinishedBlocks hexutil.Uint64
TxIndexRemainingBlocks hexutil.Uint64
StateIndexRemaining hexutil.Uint64
@ -867,6 +870,9 @@ func (p *rpcProgress) toSyncProgress() *ethereum.SyncProgress {
HealedBytecodeBytes: uint64(p.HealedBytecodeBytes),
HealingTrienodes: uint64(p.HealingTrienodes),
HealingBytecode: uint64(p.HealingBytecode),
SyncedAccessLists: uint64(p.SyncedAccessLists),
TotalAccessLists: uint64(p.TotalAccessLists),
TrieGenProgress: uint64(p.TrieGenProgress),
TxIndexFinishedBlocks: uint64(p.TxIndexFinishedBlocks),
TxIndexRemainingBlocks: uint64(p.TxIndexRemainingBlocks),
StateIndexRemaining: uint64(p.StateIndexRemaining),

View file

@ -127,13 +127,18 @@ type SyncProgress struct {
SyncedStorage uint64 // Number of storage slots downloaded
SyncedStorageBytes uint64 // Number of storage trie bytes persisted to disk
// Snap/1 specific fields
HealedTrienodes uint64 // Number of state trie nodes downloaded
HealedTrienodeBytes uint64 // Number of state trie bytes persisted to disk
HealedBytecodes uint64 // Number of bytecodes downloaded
HealedBytecodeBytes uint64 // Number of bytecodes persisted to disk
HealingTrienodes uint64 // Number of state trie nodes pending
HealingBytecode uint64 // Number of bytecodes pending
HealingTrienodes uint64 // Number of state trie nodes pending
HealingBytecode uint64 // Number of bytecodes pending
// Snap/2 specific fields
SyncedAccessLists uint64 // Number of block access lists fetched during catch-up
TotalAccessLists uint64 // Total number of block access lists to fetch for catch-up
TrieGenProgress uint64 // Trie generation completion, in percent (0..100)
// "transaction indexing" fields
TxIndexFinishedBlocks uint64 // Number of blocks whose transactions are already indexed

View file

@ -182,6 +182,9 @@ func (api *EthereumAPI) Syncing(ctx context.Context) (interface{}, error) {
"healedBytecodeBytes": hexutil.Uint64(progress.HealedBytecodeBytes),
"healingTrienodes": hexutil.Uint64(progress.HealingTrienodes),
"healingBytecode": hexutil.Uint64(progress.HealingBytecode),
"syncedAccessLists": hexutil.Uint64(progress.SyncedAccessLists),
"totalAccessLists": hexutil.Uint64(progress.TotalAccessLists),
"trieGenProgress": hexutil.Uint64(progress.TrieGenProgress),
"txIndexFinishedBlocks": hexutil.Uint64(progress.TxIndexFinishedBlocks),
"txIndexRemainingBlocks": hexutil.Uint64(progress.TxIndexRemainingBlocks),
"stateIndexRemaining": hexutil.Uint64(progress.StateIndexRemaining),

View file

@ -65,6 +65,21 @@ const (
partitionFinished = ^uint64(0)
)
// genCounters bundles the progress counters threaded through a GenerateTrie run.
type genCounters struct {
accounts atomic.Int64 // accounts scanned
slots atomic.Int64 // storage slots scanned
accountUpdated atomic.Int64 // accounts whose stale storage Root was rewritten
storageDeleted atomic.Int64 // dangling storage slots removed
accountTrieNodes atomic.Int64 // generated account trie nodes
accountTrieBytes atomic.Int64 // generated account trie bytes
storageTrieNodes atomic.Int64 // generated storage trie nodes
storageTrieBytes atomic.Int64 // generated storage trie bytes
progress [numPartitions]atomic.Uint64 // per-partition keyspace position
}
// rangeIterators bundles the per-partition account and storage iterators.
type rangeIterators struct {
db ethdb.Database
@ -134,7 +149,7 @@ func reopenFlatIterator(db ethdb.Database, old *internal.HoldableIterator, prefi
// both per-account storage subtries and the partition's slice of the
// account trie. Returns the partition's stripped subtree root blob, or
// nil if the partition had no accounts at all.
func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Database, scheme string, partition byte, rangeStart, rangeEnd common.Hash, scanned, updated, deleted *atomic.Int64, pos *atomic.Uint64) ([]byte, error) {
func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Database, scheme string, partition byte, rangeStart, rangeEnd common.Hash, c *genCounters) ([]byte, error) {
iters := openRangeIterators(db, rangeStart)
defer iters.release()
@ -154,6 +169,13 @@ func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Dat
if len(path) == 1 {
root = common.CopyBytes(blob)
}
c.accountTrieNodes.Add(1)
if scheme == rawdb.PathScheme {
c.accountTrieBytes.Add(int64(len(path) + len(blob)))
} else {
c.accountTrieBytes.Add(int64(common.HashLength + len(blob)))
}
rawdb.WriteTrieNode(batch, common.Hash{}, path, hash, blob, scheme)
})
@ -172,8 +194,8 @@ func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Dat
if bytes.Compare(accountHash[:], rangeEnd[:]) > 0 {
break
}
scanned.Add(1)
pos.Store(binary.BigEndian.Uint64(accountHash[:8]))
c.accounts.Add(1)
c.progress[partition].Store(binary.BigEndian.Uint64(accountHash[:8]))
// Decode the account object
account, err := types.FullAccount(iters.acct.Value())
@ -184,6 +206,13 @@ func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Dat
// Build the account's storage trie from the flat storage snapshot.
// StackTrie's onTrieNode callback persists nodes as they finalize.
storageTrie := trie.NewStackTrie(func(path []byte, hash common.Hash, blob []byte) {
c.storageTrieNodes.Add(1)
if scheme == rawdb.PathScheme {
c.storageTrieBytes.Add(int64(len(path) + common.HashLength + len(blob)))
} else {
c.storageTrieBytes.Add(int64(common.HashLength + len(blob)))
}
rawdb.WriteTrieNode(batch, accountHash, path, hash, blob, scheme)
})
@ -213,7 +242,7 @@ func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Dat
copy(lastDanglingAccount, storageAccount)
log.Error("Unexpected storage entries for dangling account", "expected", accountHash, "got", common.BytesToHash(storageAccount))
}
deleted.Add(1)
c.storageDeleted.Add(1)
slotHash := sk[len(rawdb.SnapshotStoragePrefix)+common.HashLength:]
rawdb.DeleteStorageSnapshot(batch, common.BytesToHash(storageAccount), common.BytesToHash(slotHash))
if err := iters.flushIfFull(batch, "dangling"); err != nil {
@ -237,6 +266,7 @@ func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Dat
if err := storageTrie.Update(slotHash, iters.stor.Value()); err != nil {
return nil, fmt.Errorf("storage stack trie update for %x: %w", accountHash, err)
}
c.slots.Add(1)
if err := iters.flushIfFull(batch, "storage"); err != nil {
return nil, err
}
@ -252,7 +282,7 @@ func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Dat
if computed != account.Root {
account.Root = computed
rawdb.WriteAccountSnapshot(batch, accountHash, types.SlimAccountRLP(*account))
updated.Add(1)
c.accountUpdated.Add(1)
}
fullAccount, err := rlp.EncodeToBytes(account)
if err != nil {
@ -291,7 +321,7 @@ func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Dat
copy(lastDanglingTail, acct)
log.Error("Unexpected storage entries for dangling account", "addrhash", common.BytesToHash(acct))
}
deleted.Add(1)
c.storageDeleted.Add(1)
slotHash := sk[len(rawdb.SnapshotStoragePrefix)+common.HashLength:]
rawdb.DeleteStorageSnapshot(batch, common.BytesToHash(acct), common.BytesToHash(slotHash))
if err := iters.flushIfFull(batch, "dangling tail"); err != nil {
@ -349,19 +379,21 @@ func hashRanges(total int) [][2]common.Hash {
// Generation is all or nothing: an interrupted run leaves no resume
// state and the next run builds every partition from scratch.
func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-chan struct{}) (GenerateStats, error) {
return GenerateTrieWithProgress(db, scheme, root, cancel, nil)
}
// GenerateTrieWithProgress is GenerateTrie with live progress reporting.
func GenerateTrieWithProgress(db ethdb.Database, scheme string, root common.Hash, cancel <-chan struct{}, prog *atomic.Uint64) (GenerateStats, error) {
var (
start = time.Now()
scanned atomic.Int64
updated atomic.Int64
deleted atomic.Int64
progress [numPartitions]atomic.Uint64
c genCounters
progressDone = make(chan struct{})
// partitionBlobs[i] holds the root node for partition i, or nil if
// the partition is empty.
partitionBlobs [numPartitions][]byte
)
go tickProgress(progressDone, start, &scanned, &updated, &progress)
go tickProgress(progressDone, start, &c, prog)
defer close(progressDone)
// Run every partition concurrently, each producing the subtree root
@ -375,13 +407,13 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c
rangeStart, rangeEnd := r[0], r[1]
eg.Go(func() error {
start := time.Now()
blob, err := generatePartition(ctx, cancel, db, scheme, partition, rangeStart, rangeEnd, &scanned, &updated, &deleted, &progress[partition])
blob, err := generatePartition(ctx, cancel, db, scheme, partition, rangeStart, rangeEnd, &c)
if err != nil {
return err
}
log.Info("Partition done", "partition", partition, "elapsed", common.PrettyDuration(time.Since(start)))
progress[partition].Store(partitionFinished)
c.progress[partition].Store(partitionFinished)
partitionBlobs[partition] = blob
return nil
})
@ -391,7 +423,9 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c
if err := eg.Wait(); err != nil {
return GenerateStats{}, err
}
if prog != nil {
prog.Store(100)
}
// Assemble the top-level root from the partition blobs and verify it
// matches the expected root.
got, err := assembleRoot(db, scheme, partitionBlobs)
@ -401,11 +435,17 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c
if got != root {
return GenerateStats{}, fmt.Errorf("state root mismatch: got %x, want %x", got, root)
}
log.Info("Generated state trie", "scanned", scanned.Load(), "updated", updated.Load(), "dangling-slots", deleted.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
log.Info("Generated state trie",
"accounts", c.accounts.Load(), "slots", c.slots.Load(),
"account-nodes", c.accountTrieNodes.Load(), "storage-nodes", c.storageTrieNodes.Load(),
"account-nodebytes", common.StorageSize(c.accountTrieBytes.Load()), "storage-nodebytes", common.StorageSize(c.storageTrieBytes.Load()),
"updated-accounts", c.accountUpdated.Load(), "dangling-slots", c.storageDeleted.Load(),
"elapsed", common.PrettyDuration(time.Since(start)))
return GenerateStats{
Scanned: scanned.Load(),
Updated: updated.Load(),
Deleted: deleted.Load(),
Scanned: c.accounts.Load(),
Updated: c.accountUpdated.Load(),
Deleted: c.storageDeleted.Load(),
}, nil
}
@ -483,25 +523,32 @@ func assembleRoot(db ethdb.Database, scheme string, partitionBlobs [numPartition
// tickProgress logs an aggregate progress line every 30 seconds until done
// is closed. Cheap: a handful of atomic loads and one log line per tick.
func tickProgress(done <-chan struct{}, start time.Time, scanned, updated *atomic.Int64, progress *[numPartitions]atomic.Uint64) {
func tickProgress(done <-chan struct{}, start time.Time, c *genCounters, prog *atomic.Uint64) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
elapsed := time.Since(start)
fraction := progressFraction(progress)
fraction := progressFraction(&c.progress)
// Notify the external subscriber about the generation progress
if prog != nil {
prog.Store(uint64(100 * fraction))
}
eta := "n/a"
if fraction > 0.005 {
eta = common.PrettyDuration(time.Duration(float64(elapsed) * (1.0/fraction - 1.0))).String()
}
log.Info("Generating trie",
"progress", fmt.Sprintf("%.1f%%", fraction*100), "eta", eta,
"scanned", scanned.Load(), "updated", updated.Load(),
"accounts", c.accounts.Load(), "slots", c.slots.Load(),
"account-updated", c.accountUpdated.Load(), "dangling-slots", c.storageDeleted.Load(),
"elapsed", common.PrettyDuration(elapsed),
"acct/s", uint64(float64(scanned.Load())/elapsed.Seconds()))
"acct/s", uint64(float64(c.accounts.Load())/elapsed.Seconds()))
}
}
}

View file

@ -21,7 +21,6 @@ import (
"context"
"math/big"
"sort"
"sync/atomic"
"testing"
"github.com/ethereum/go-ethereum/common"
@ -674,22 +673,21 @@ func TestGenerateTrieBatchFlush(t *testing.T) {
tc.build(db)
peak := 0
var scanned, updated, deleted atomic.Int64
var pos atomic.Uint64
var c genCounters
ranges := hashRanges(numPartitions)
if _, err := generatePartition(context.Background(), nil, peakBatchDB{Database: db, peak: &peak},
rawdb.HashScheme, 0, ranges[0][0], ranges[0][1], &scanned, &updated, &deleted, &pos); err != nil {
rawdb.HashScheme, 0, ranges[0][0], ranges[0][1], &c); err != nil {
t.Fatalf("generatePartition: %v", err)
}
if scanned.Load() != tc.wantScanned {
t.Errorf("scanned = %d, want %d (an account was skipped?)", scanned.Load(), tc.wantScanned)
if c.accounts.Load() != tc.wantScanned {
t.Errorf("scanned = %d, want %d (an account was skipped?)", c.accounts.Load(), tc.wantScanned)
}
if deleted.Load() != tc.wantDeleted {
t.Errorf("deleted = %d, want %d", deleted.Load(), tc.wantDeleted)
if c.storageDeleted.Load() != tc.wantDeleted {
t.Errorf("deleted = %d, want %d", c.storageDeleted.Load(), tc.wantDeleted)
}
if updated.Load() != 0 {
t.Errorf("updated = %d, want 0 (a storage slot was dropped across a flush?)", updated.Load())
if c.accountUpdated.Load() != 0 {
t.Errorf("updated = %d, want 0 (a storage slot was dropped across a flush?)", c.accountUpdated.Load())
}
// The batch must have stayed bounded. Without this site's flush its
// full write set (far larger than IdealBatchSize) buffers into one batch.