From d22a7d833adca3d4163592f1e11fa1311d2f50fc Mon Sep 17 00:00:00 2001 From: jsvisa Date: Wed, 10 Sep 2025 04:51:47 +0000 Subject: [PATCH] in parallel Signed-off-by: jsvisa --- cmd/geth/snapshot.go | 122 +++++++++++++++++++++++++------------------ 1 file changed, 70 insertions(+), 52 deletions(-) diff --git a/cmd/geth/snapshot.go b/cmd/geth/snapshot.go index 1a8573e118..edcb67cc9a 100644 --- a/cmd/geth/snapshot.go +++ b/cmd/geth/snapshot.go @@ -339,11 +339,19 @@ func traverseState(ctx *cli.Context) error { start = time.Now() ) + cctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { timer := time.NewTicker(time.Second * 8) defer timer.Stop() - for range timer.C { - log.Info("Traversing state", "accounts", accounts.Load(), "slots", slots.Load(), "codes", codes.Load(), "elapsed", common.PrettyDuration(time.Since(start))) + for { + select { + case <-timer.C: + log.Info("Traversing state", "accounts", accounts.Load(), "slots", slots.Load(), "codes", codes.Load(), "elapsed", common.PrettyDuration(time.Since(start))) + case <-cctx.Done(): + return + } } }() @@ -389,58 +397,57 @@ func traverseState(ctx *cli.Context) error { } log.Info("Storage traversal complete", "slots", slots.Load(), "elapsed", common.PrettyDuration(time.Since(start))) - return nil } else { log.Info("Start traversing state trie", "root", config.root.Hex(), "startKey", common.Bytes2Hex(config.startKey), "limitKey", common.Bytes2Hex(config.limitKey)) - return traverseStateParallel(t, triedb, chaindb, config, &accounts, &slots, &codes, start) + eg, ctx := errgroup.WithContext(cctx) + + // Parallel processing with boundary checks + for i := 0; i < 16; i++ { + nibble := byte(i) + eg.Go(func() error { + // Skip branches that are entirely before startKey + if len(config.startKey) > 0 { + startNibble := config.startKey[0] >> 4 + if nibble < startNibble { + return nil // Skip this branch + } + } + + // Skip branches that are entirely after limitKey + if len(config.limitKey) > 0 { + limitNibble := config.limitKey[0] >> 4 + if nibble > limitNibble { + return nil // Skip this branch + } + } + + var ( + startKey = []byte{nibble << 4} + limitKey []byte + ) + if nibble < 15 { + limitKey = []byte{(nibble + 1) << 4} + } else { + // Last branch (0xf*) has no limit + limitKey = nil + } + + return traverseBranch(ctx, t, triedb, chaindb, config, startKey, limitKey, &accounts, &slots, &codes) + }) + } + + if err := eg.Wait(); err != nil { + return err + } + + log.Info("State traversal complete", "accounts", accounts.Load(), "slots", slots.Load(), "codes", codes.Load(), "elapsed", common.PrettyDuration(time.Since(start))) } -} - -// traverseStateParallel parallelizes state traversal by dividing work across 16 trie branches -func traverseStateParallel(t *trie.StateTrie, triedb *triedb.Database, chaindb ethdb.Database, config *traverseConfig, accounts, slots, codes *atomic.Uint64, start time.Time) error { - ctx := context.Background() - g, ctx := errgroup.WithContext(ctx) - - for i := 0; i < 16; i++ { - nibble := byte(i) - g.Go(func() error { - startKey := config.startKey - limitKey := config.limitKey - - branchStartKey := make([]byte, len(startKey)+1) - branchLimitKey := make([]byte, len(startKey)+1) - - if len(startKey) > 0 { - copy(branchStartKey, startKey) - copy(branchLimitKey, startKey) - } - - branchStartKey[len(startKey)] = nibble << 4 - branchLimitKey[len(startKey)] = (nibble + 1) << 4 - - if limitKey != nil && bytes.Compare(branchStartKey, limitKey) >= 0 { - return nil - } - if limitKey != nil && bytes.Compare(branchLimitKey, limitKey) > 0 { - branchLimitKey = make([]byte, len(limitKey)) - copy(branchLimitKey, limitKey) - } - - return traverseBranch(ctx, t, triedb, chaindb, config.root, branchStartKey, branchLimitKey, accounts, slots, codes) - }) - } - - if err := g.Wait(); err != nil { - return err - } - - log.Info("State traversal complete", "accounts", accounts.Load(), "slots", slots.Load(), "codes", codes.Load(), "elapsed", common.PrettyDuration(time.Since(start))) return nil } // traverseBranch traverses a specific branch of the state trie -func traverseBranch(ctx context.Context, t *trie.StateTrie, triedb *triedb.Database, chaindb ethdb.Database, root common.Hash, startKey, limitKey []byte, accounts, slots, codes *atomic.Uint64) error { +func traverseBranch(ctx context.Context, t *trie.StateTrie, triedb *triedb.Database, chaindb ethdb.Database, config *traverseConfig, startKey, limitKey []byte, accounts, slots, codes *atomic.Uint64) error { acctIt, err := t.NodeIterator(startKey) if err != nil { return err @@ -448,13 +455,16 @@ func traverseBranch(ctx context.Context, t *trie.StateTrie, triedb *triedb.Datab accIter := trie.NewIterator(acctIt) for accIter.Next() { - // Check if context was cancelled select { case <-ctx.Done(): return ctx.Err() default: } + if config.startKey != nil && bytes.Compare(accIter.Key, config.startKey) < 0 { + continue + } + if limitKey != nil && bytes.Compare(accIter.Key, limitKey) >= 0 { break } @@ -468,7 +478,7 @@ func traverseBranch(ctx context.Context, t *trie.StateTrie, triedb *triedb.Datab } if acc.Root != types.EmptyRootHash { - id := trie.StorageTrieID(root, common.BytesToHash(accIter.Key), acc.Root) + id := trie.StorageTrieID(config.root, common.BytesToHash(accIter.Key), acc.Root) storageTrie, err := trie.NewStateTrie(id, triedb) if err != nil { log.Error("Failed to open storage trie", "root", acc.Root, "err", err) @@ -507,10 +517,18 @@ func traverseStorageParallel(ctx context.Context, storageTrie *trie.StateTrie) ( for i := 0; i < 16; i++ { nibble := byte(i) g.Go(func() error { - branchStartKey := []byte{nibble << 4} - branchLimitKey := []byte{(nibble + 1) << 4} + var ( + startKey = []byte{nibble << 4} + limitKey []byte + ) + if nibble < 15 { + limitKey = []byte{(nibble + 1) << 4} + } else { + // For the last branch (0xf*), no limit key (traverse to end) + limitKey = nil + } - localSlots, err := traverseStorageBranch(ctx, storageTrie, branchStartKey, branchLimitKey) + localSlots, err := traverseStorageBranch(ctx, storageTrie, startKey, limitKey) if err != nil { return err } @@ -544,7 +562,7 @@ func traverseStorageBranch(ctx context.Context, storageTrie *trie.StateTrie, sta default: } - if bytes.Compare(storageIter.Key, limitKey) >= 0 { + if limitKey != nil && bytes.Compare(storageIter.Key, limitKey) >= 0 { break } slots++