mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-03-10 13:19:04 +00:00
in parallel
Signed-off-by: jsvisa <delweng@gmail.com>
This commit is contained in:
parent
dc19cae10e
commit
d22a7d833a
1 changed files with 70 additions and 52 deletions
|
|
@ -339,11 +339,19 @@ func traverseState(ctx *cli.Context) error {
|
|||
start = time.Now()
|
||||
)
|
||||
|
||||
cctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
timer := time.NewTicker(time.Second * 8)
|
||||
defer timer.Stop()
|
||||
for range timer.C {
|
||||
log.Info("Traversing state", "accounts", accounts.Load(), "slots", slots.Load(), "codes", codes.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
log.Info("Traversing state", "accounts", accounts.Load(), "slots", slots.Load(), "codes", codes.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
case <-cctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
|
@ -389,58 +397,57 @@ func traverseState(ctx *cli.Context) error {
|
|||
}
|
||||
|
||||
log.Info("Storage traversal complete", "slots", slots.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return nil
|
||||
} else {
|
||||
log.Info("Start traversing state trie", "root", config.root.Hex(), "startKey", common.Bytes2Hex(config.startKey), "limitKey", common.Bytes2Hex(config.limitKey))
|
||||
|
||||
return traverseStateParallel(t, triedb, chaindb, config, &accounts, &slots, &codes, start)
|
||||
eg, ctx := errgroup.WithContext(cctx)
|
||||
|
||||
// Parallel processing with boundary checks
|
||||
for i := 0; i < 16; i++ {
|
||||
nibble := byte(i)
|
||||
eg.Go(func() error {
|
||||
// Skip branches that are entirely before startKey
|
||||
if len(config.startKey) > 0 {
|
||||
startNibble := config.startKey[0] >> 4
|
||||
if nibble < startNibble {
|
||||
return nil // Skip this branch
|
||||
}
|
||||
}
|
||||
|
||||
// Skip branches that are entirely after limitKey
|
||||
if len(config.limitKey) > 0 {
|
||||
limitNibble := config.limitKey[0] >> 4
|
||||
if nibble > limitNibble {
|
||||
return nil // Skip this branch
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
startKey = []byte{nibble << 4}
|
||||
limitKey []byte
|
||||
)
|
||||
if nibble < 15 {
|
||||
limitKey = []byte{(nibble + 1) << 4}
|
||||
} else {
|
||||
// Last branch (0xf*) has no limit
|
||||
limitKey = nil
|
||||
}
|
||||
|
||||
return traverseBranch(ctx, t, triedb, chaindb, config, startKey, limitKey, &accounts, &slots, &codes)
|
||||
})
|
||||
}
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("State traversal complete", "accounts", accounts.Load(), "slots", slots.Load(), "codes", codes.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
}
|
||||
}
|
||||
|
||||
// traverseStateParallel parallelizes state traversal by dividing work across 16 trie branches
|
||||
func traverseStateParallel(t *trie.StateTrie, triedb *triedb.Database, chaindb ethdb.Database, config *traverseConfig, accounts, slots, codes *atomic.Uint64, start time.Time) error {
|
||||
ctx := context.Background()
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
for i := 0; i < 16; i++ {
|
||||
nibble := byte(i)
|
||||
g.Go(func() error {
|
||||
startKey := config.startKey
|
||||
limitKey := config.limitKey
|
||||
|
||||
branchStartKey := make([]byte, len(startKey)+1)
|
||||
branchLimitKey := make([]byte, len(startKey)+1)
|
||||
|
||||
if len(startKey) > 0 {
|
||||
copy(branchStartKey, startKey)
|
||||
copy(branchLimitKey, startKey)
|
||||
}
|
||||
|
||||
branchStartKey[len(startKey)] = nibble << 4
|
||||
branchLimitKey[len(startKey)] = (nibble + 1) << 4
|
||||
|
||||
if limitKey != nil && bytes.Compare(branchStartKey, limitKey) >= 0 {
|
||||
return nil
|
||||
}
|
||||
if limitKey != nil && bytes.Compare(branchLimitKey, limitKey) > 0 {
|
||||
branchLimitKey = make([]byte, len(limitKey))
|
||||
copy(branchLimitKey, limitKey)
|
||||
}
|
||||
|
||||
return traverseBranch(ctx, t, triedb, chaindb, config.root, branchStartKey, branchLimitKey, accounts, slots, codes)
|
||||
})
|
||||
}
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("State traversal complete", "accounts", accounts.Load(), "slots", slots.Load(), "codes", codes.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// traverseBranch traverses a specific branch of the state trie
|
||||
func traverseBranch(ctx context.Context, t *trie.StateTrie, triedb *triedb.Database, chaindb ethdb.Database, root common.Hash, startKey, limitKey []byte, accounts, slots, codes *atomic.Uint64) error {
|
||||
func traverseBranch(ctx context.Context, t *trie.StateTrie, triedb *triedb.Database, chaindb ethdb.Database, config *traverseConfig, startKey, limitKey []byte, accounts, slots, codes *atomic.Uint64) error {
|
||||
acctIt, err := t.NodeIterator(startKey)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -448,13 +455,16 @@ func traverseBranch(ctx context.Context, t *trie.StateTrie, triedb *triedb.Datab
|
|||
|
||||
accIter := trie.NewIterator(acctIt)
|
||||
for accIter.Next() {
|
||||
// Check if context was cancelled
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if config.startKey != nil && bytes.Compare(accIter.Key, config.startKey) < 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if limitKey != nil && bytes.Compare(accIter.Key, limitKey) >= 0 {
|
||||
break
|
||||
}
|
||||
|
|
@ -468,7 +478,7 @@ func traverseBranch(ctx context.Context, t *trie.StateTrie, triedb *triedb.Datab
|
|||
}
|
||||
|
||||
if acc.Root != types.EmptyRootHash {
|
||||
id := trie.StorageTrieID(root, common.BytesToHash(accIter.Key), acc.Root)
|
||||
id := trie.StorageTrieID(config.root, common.BytesToHash(accIter.Key), acc.Root)
|
||||
storageTrie, err := trie.NewStateTrie(id, triedb)
|
||||
if err != nil {
|
||||
log.Error("Failed to open storage trie", "root", acc.Root, "err", err)
|
||||
|
|
@ -507,10 +517,18 @@ func traverseStorageParallel(ctx context.Context, storageTrie *trie.StateTrie) (
|
|||
for i := 0; i < 16; i++ {
|
||||
nibble := byte(i)
|
||||
g.Go(func() error {
|
||||
branchStartKey := []byte{nibble << 4}
|
||||
branchLimitKey := []byte{(nibble + 1) << 4}
|
||||
var (
|
||||
startKey = []byte{nibble << 4}
|
||||
limitKey []byte
|
||||
)
|
||||
if nibble < 15 {
|
||||
limitKey = []byte{(nibble + 1) << 4}
|
||||
} else {
|
||||
// For the last branch (0xf*), no limit key (traverse to end)
|
||||
limitKey = nil
|
||||
}
|
||||
|
||||
localSlots, err := traverseStorageBranch(ctx, storageTrie, branchStartKey, branchLimitKey)
|
||||
localSlots, err := traverseStorageBranch(ctx, storageTrie, startKey, limitKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -544,7 +562,7 @@ func traverseStorageBranch(ctx context.Context, storageTrie *trie.StateTrie, sta
|
|||
default:
|
||||
}
|
||||
|
||||
if bytes.Compare(storageIter.Key, limitKey) >= 0 {
|
||||
if limitKey != nil && bytes.Compare(storageIter.Key, limitKey) >= 0 {
|
||||
break
|
||||
}
|
||||
slots++
|
||||
|
|
|
|||
Loading…
Reference in a new issue