core/rawdb: inspect database in parallel (#32506)

`db inspect` on the full database currently takes **30min+**, because
the db iterate was run in one thread, propose to split the key-space to
256 sub range, and assign them to the worker pool to speed up.

After the change, the time of running `db inspect --workers 16` reduced
to **10min**(the keyspace is not evenly distributed).

---------

Signed-off-by: jsvisa <delweng@gmail.com>
Co-authored-by: Gary Rong <garyrong0905@gmail.com>
This commit is contained in:
Delweng 2025-09-01 13:41:41 +08:00 committed by GitHub
parent 3aeccadd04
commit 0cde5278e8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -18,13 +18,17 @@ package rawdb
import ( import (
"bytes" "bytes"
"context"
"errors" "errors"
"fmt" "fmt"
"maps" "maps"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"slices" "slices"
"strings" "strings"
"sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -32,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"golang.org/x/sync/errgroup"
) )
var ErrDeleteRangeInterrupted = errors.New("safe delete range operation interrupted") var ErrDeleteRangeInterrupted = errors.New("safe delete range operation interrupted")
@ -362,36 +367,36 @@ func (c counter) Percentage(current uint64) string {
return fmt.Sprintf("%d", current*100/uint64(c)) return fmt.Sprintf("%d", current*100/uint64(c))
} }
// stat stores sizes and count for a parameter // stat provides lock-free statistics aggregation using atomic operations
type stat struct { type stat struct {
size common.StorageSize size uint64
count counter count uint64
} }
// Add size to the stat and increase the counter by 1 func (s *stat) empty() bool {
func (s *stat) Add(size common.StorageSize) { return atomic.LoadUint64(&s.count) == 0
s.size += size
s.count++
} }
func (s *stat) Size() string { func (s *stat) add(size common.StorageSize) {
return s.size.String() atomic.AddUint64(&s.size, uint64(size))
atomic.AddUint64(&s.count, 1)
} }
func (s *stat) Count() string { func (s *stat) sizeString() string {
return s.count.String() return common.StorageSize(atomic.LoadUint64(&s.size)).String()
}
func (s *stat) countString() string {
return counter(atomic.LoadUint64(&s.count)).String()
} }
// InspectDatabase traverses the entire database and checks the size // InspectDatabase traverses the entire database and checks the size
// of all different categories of data. // of all different categories of data.
func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
it := db.NewIterator(keyPrefix, keyStart)
defer it.Release()
var ( var (
count int64 start = time.Now()
start = time.Now() count atomic.Int64
logged = time.Now() total atomic.Uint64
// Key-value store statistics // Key-value store statistics
headers stat headers stat
@ -427,144 +432,200 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
metadata stat metadata stat
unaccounted stat unaccounted stat
// Totals
total common.StorageSize
// This map tracks example keys for unaccounted data. // This map tracks example keys for unaccounted data.
// For each unique two-byte prefix, the first unaccounted key encountered // For each unique two-byte prefix, the first unaccounted key encountered
// by the iterator will be stored. // by the iterator will be stored.
unaccountedKeys = make(map[[2]byte][]byte) unaccountedKeys = make(map[[2]byte][]byte)
unaccountedMu sync.Mutex
) )
// Inspect key-value database first.
for it.Next() {
var (
key = it.Key()
size = common.StorageSize(len(key) + len(it.Value()))
)
total += size
switch {
case bytes.HasPrefix(key, headerPrefix) && len(key) == (len(headerPrefix)+8+common.HashLength):
headers.Add(size)
case bytes.HasPrefix(key, blockBodyPrefix) && len(key) == (len(blockBodyPrefix)+8+common.HashLength):
bodies.Add(size)
case bytes.HasPrefix(key, blockReceiptsPrefix) && len(key) == (len(blockReceiptsPrefix)+8+common.HashLength):
receipts.Add(size)
case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerTDSuffix):
tds.Add(size)
case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerHashSuffix):
numHashPairings.Add(size)
case bytes.HasPrefix(key, headerNumberPrefix) && len(key) == (len(headerNumberPrefix)+common.HashLength):
hashNumPairings.Add(size)
case IsLegacyTrieNode(key, it.Value()):
legacyTries.Add(size)
case bytes.HasPrefix(key, stateIDPrefix) && len(key) == len(stateIDPrefix)+common.HashLength:
stateLookups.Add(size)
case IsAccountTrieNode(key):
accountTries.Add(size)
case IsStorageTrieNode(key):
storageTries.Add(size)
case bytes.HasPrefix(key, CodePrefix) && len(key) == len(CodePrefix)+common.HashLength:
codes.Add(size)
case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength):
txLookups.Add(size)
case bytes.HasPrefix(key, SnapshotAccountPrefix) && len(key) == (len(SnapshotAccountPrefix)+common.HashLength):
accountSnaps.Add(size)
case bytes.HasPrefix(key, SnapshotStoragePrefix) && len(key) == (len(SnapshotStoragePrefix)+2*common.HashLength):
storageSnaps.Add(size)
case bytes.HasPrefix(key, PreimagePrefix) && len(key) == (len(PreimagePrefix)+common.HashLength):
preimages.Add(size)
case bytes.HasPrefix(key, configPrefix) && len(key) == (len(configPrefix)+common.HashLength):
metadata.Add(size)
case bytes.HasPrefix(key, genesisPrefix) && len(key) == (len(genesisPrefix)+common.HashLength):
metadata.Add(size)
case bytes.HasPrefix(key, skeletonHeaderPrefix) && len(key) == (len(skeletonHeaderPrefix)+8):
beaconHeaders.Add(size)
case bytes.HasPrefix(key, CliqueSnapshotPrefix) && len(key) == 7+common.HashLength:
cliqueSnaps.Add(size)
// new log index inspectRange := func(ctx context.Context, r byte) error {
case bytes.HasPrefix(key, filterMapRowPrefix) && len(key) <= len(filterMapRowPrefix)+9: var s []byte
filterMapRows.Add(size) if len(keyStart) > 0 {
case bytes.HasPrefix(key, filterMapLastBlockPrefix) && len(key) == len(filterMapLastBlockPrefix)+4:
filterMapLastBlock.Add(size)
case bytes.HasPrefix(key, filterMapBlockLVPrefix) && len(key) == len(filterMapBlockLVPrefix)+8:
filterMapBlockLV.Add(size)
// old log index (deprecated)
case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength):
bloomBits.Add(size)
case bytes.HasPrefix(key, bloomBitsMetaPrefix) && len(key) < len(bloomBitsMetaPrefix)+8:
bloomBits.Add(size)
// Path-based historic state indexes
case bytes.HasPrefix(key, StateHistoryIndexPrefix) && len(key) >= len(StateHistoryIndexPrefix)+common.HashLength:
stateIndex.Add(size)
// Verkle trie data is detected, determine the sub-category
case bytes.HasPrefix(key, VerklePrefix):
remain := key[len(VerklePrefix):]
switch { switch {
case IsAccountTrieNode(remain): case r < keyStart[0]:
verkleTries.Add(size) return nil
case bytes.HasPrefix(remain, stateIDPrefix) && len(remain) == len(stateIDPrefix)+common.HashLength: case r == keyStart[0]:
verkleStateLookups.Add(size) s = keyStart[1:]
case bytes.Equal(remain, persistentStateIDKey):
metadata.Add(size)
case bytes.Equal(remain, trieJournalKey):
metadata.Add(size)
case bytes.Equal(remain, snapSyncStatusFlagKey):
metadata.Add(size)
default: default:
unaccounted.Add(size) // entire key range is included for inspection
} }
}
it := db.NewIterator(append(keyPrefix, r), s)
defer it.Release()
// Metadata keys for it.Next() {
case slices.ContainsFunc(knownMetadataKeys, func(x []byte) bool { return bytes.Equal(x, key) }): var (
metadata.Add(size) key = it.Key()
size = common.StorageSize(len(key) + len(it.Value()))
)
total.Add(uint64(size))
count.Add(1)
default: switch {
unaccounted.Add(size) case bytes.HasPrefix(key, headerPrefix) && len(key) == (len(headerPrefix)+8+common.HashLength):
if len(key) >= 2 { headers.add(size)
prefix := [2]byte(key[:2]) case bytes.HasPrefix(key, blockBodyPrefix) && len(key) == (len(blockBodyPrefix)+8+common.HashLength):
if _, ok := unaccountedKeys[prefix]; !ok { bodies.add(size)
unaccountedKeys[prefix] = bytes.Clone(key) case bytes.HasPrefix(key, blockReceiptsPrefix) && len(key) == (len(blockReceiptsPrefix)+8+common.HashLength):
receipts.add(size)
case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerTDSuffix):
tds.add(size)
case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerHashSuffix):
numHashPairings.add(size)
case bytes.HasPrefix(key, headerNumberPrefix) && len(key) == (len(headerNumberPrefix)+common.HashLength):
hashNumPairings.add(size)
case IsLegacyTrieNode(key, it.Value()):
legacyTries.add(size)
case bytes.HasPrefix(key, stateIDPrefix) && len(key) == len(stateIDPrefix)+common.HashLength:
stateLookups.add(size)
case IsAccountTrieNode(key):
accountTries.add(size)
case IsStorageTrieNode(key):
storageTries.add(size)
case bytes.HasPrefix(key, CodePrefix) && len(key) == len(CodePrefix)+common.HashLength:
codes.add(size)
case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength):
txLookups.add(size)
case bytes.HasPrefix(key, SnapshotAccountPrefix) && len(key) == (len(SnapshotAccountPrefix)+common.HashLength):
accountSnaps.add(size)
case bytes.HasPrefix(key, SnapshotStoragePrefix) && len(key) == (len(SnapshotStoragePrefix)+2*common.HashLength):
storageSnaps.add(size)
case bytes.HasPrefix(key, PreimagePrefix) && len(key) == (len(PreimagePrefix)+common.HashLength):
preimages.add(size)
case bytes.HasPrefix(key, configPrefix) && len(key) == (len(configPrefix)+common.HashLength):
metadata.add(size)
case bytes.HasPrefix(key, genesisPrefix) && len(key) == (len(genesisPrefix)+common.HashLength):
metadata.add(size)
case bytes.HasPrefix(key, skeletonHeaderPrefix) && len(key) == (len(skeletonHeaderPrefix)+8):
beaconHeaders.add(size)
case bytes.HasPrefix(key, CliqueSnapshotPrefix) && len(key) == 7+common.HashLength:
cliqueSnaps.add(size)
// new log index
case bytes.HasPrefix(key, filterMapRowPrefix) && len(key) <= len(filterMapRowPrefix)+9:
filterMapRows.add(size)
case bytes.HasPrefix(key, filterMapLastBlockPrefix) && len(key) == len(filterMapLastBlockPrefix)+4:
filterMapLastBlock.add(size)
case bytes.HasPrefix(key, filterMapBlockLVPrefix) && len(key) == len(filterMapBlockLVPrefix)+8:
filterMapBlockLV.add(size)
// old log index (deprecated)
case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength):
bloomBits.add(size)
case bytes.HasPrefix(key, bloomBitsMetaPrefix) && len(key) < len(bloomBitsMetaPrefix)+8:
bloomBits.add(size)
// Path-based historic state indexes
case bytes.HasPrefix(key, StateHistoryIndexPrefix) && len(key) >= len(StateHistoryIndexPrefix)+common.HashLength:
stateIndex.add(size)
// Verkle trie data is detected, determine the sub-category
case bytes.HasPrefix(key, VerklePrefix):
remain := key[len(VerklePrefix):]
switch {
case IsAccountTrieNode(remain):
verkleTries.add(size)
case bytes.HasPrefix(remain, stateIDPrefix) && len(remain) == len(stateIDPrefix)+common.HashLength:
verkleStateLookups.add(size)
case bytes.Equal(remain, persistentStateIDKey):
metadata.add(size)
case bytes.Equal(remain, trieJournalKey):
metadata.add(size)
case bytes.Equal(remain, snapSyncStatusFlagKey):
metadata.add(size)
default:
unaccounted.add(size)
}
// Metadata keys
case slices.ContainsFunc(knownMetadataKeys, func(x []byte) bool { return bytes.Equal(x, key) }):
metadata.add(size)
default:
unaccounted.add(size)
if len(key) >= 2 {
prefix := [2]byte(key[:2])
unaccountedMu.Lock()
if _, ok := unaccountedKeys[prefix]; !ok {
unaccountedKeys[prefix] = bytes.Clone(key)
}
unaccountedMu.Unlock()
} }
} }
select {
case <-ctx.Done():
return ctx.Err()
default:
}
} }
count++
if count%1000 == 0 && time.Since(logged) > 8*time.Second { return it.Error()
log.Info("Inspecting database", "count", count, "elapsed", common.PrettyDuration(time.Since(start)))
logged = time.Now()
}
} }
var (
eg, ctx = errgroup.WithContext(context.Background())
workers = runtime.NumCPU()
)
eg.SetLimit(workers)
// Progress reporter
done := make(chan struct{})
go func() {
ticker := time.NewTicker(8 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Info("Inspecting database", "count", count.Load(), "size", common.StorageSize(total.Load()), "elapsed", common.PrettyDuration(time.Since(start)))
case <-done:
return
}
}
}()
// Inspect key-value database in parallel.
for i := 0; i < 256; i++ {
eg.Go(func() error { return inspectRange(ctx, byte(i)) })
}
if err := eg.Wait(); err != nil {
close(done)
return err
}
close(done)
// Display the database statistic of key-value store. // Display the database statistic of key-value store.
stats := [][]string{ stats := [][]string{
{"Key-Value store", "Headers", headers.Size(), headers.Count()}, {"Key-Value store", "Headers", headers.sizeString(), headers.countString()},
{"Key-Value store", "Bodies", bodies.Size(), bodies.Count()}, {"Key-Value store", "Bodies", bodies.sizeString(), bodies.countString()},
{"Key-Value store", "Receipt lists", receipts.Size(), receipts.Count()}, {"Key-Value store", "Receipt lists", receipts.sizeString(), receipts.countString()},
{"Key-Value store", "Difficulties (deprecated)", tds.Size(), tds.Count()}, {"Key-Value store", "Difficulties (deprecated)", tds.sizeString(), tds.countString()},
{"Key-Value store", "Block number->hash", numHashPairings.Size(), numHashPairings.Count()}, {"Key-Value store", "Block number->hash", numHashPairings.sizeString(), numHashPairings.countString()},
{"Key-Value store", "Block hash->number", hashNumPairings.Size(), hashNumPairings.Count()}, {"Key-Value store", "Block hash->number", hashNumPairings.sizeString(), hashNumPairings.countString()},
{"Key-Value store", "Transaction index", txLookups.Size(), txLookups.Count()}, {"Key-Value store", "Transaction index", txLookups.sizeString(), txLookups.countString()},
{"Key-Value store", "Log index filter-map rows", filterMapRows.Size(), filterMapRows.Count()}, {"Key-Value store", "Log index filter-map rows", filterMapRows.sizeString(), filterMapRows.countString()},
{"Key-Value store", "Log index last-block-of-map", filterMapLastBlock.Size(), filterMapLastBlock.Count()}, {"Key-Value store", "Log index last-block-of-map", filterMapLastBlock.sizeString(), filterMapLastBlock.countString()},
{"Key-Value store", "Log index block-lv", filterMapBlockLV.Size(), filterMapBlockLV.Count()}, {"Key-Value store", "Log index block-lv", filterMapBlockLV.sizeString(), filterMapBlockLV.countString()},
{"Key-Value store", "Log bloombits (deprecated)", bloomBits.Size(), bloomBits.Count()}, {"Key-Value store", "Log bloombits (deprecated)", bloomBits.sizeString(), bloomBits.countString()},
{"Key-Value store", "Contract codes", codes.Size(), codes.Count()}, {"Key-Value store", "Contract codes", codes.sizeString(), codes.countString()},
{"Key-Value store", "Hash trie nodes", legacyTries.Size(), legacyTries.Count()}, {"Key-Value store", "Hash trie nodes", legacyTries.sizeString(), legacyTries.countString()},
{"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()}, {"Key-Value store", "Path trie state lookups", stateLookups.sizeString(), stateLookups.countString()},
{"Key-Value store", "Path trie account nodes", accountTries.Size(), accountTries.Count()}, {"Key-Value store", "Path trie account nodes", accountTries.sizeString(), accountTries.countString()},
{"Key-Value store", "Path trie storage nodes", storageTries.Size(), storageTries.Count()}, {"Key-Value store", "Path trie storage nodes", storageTries.sizeString(), storageTries.countString()},
{"Key-Value store", "Path state history indexes", stateIndex.Size(), stateIndex.Count()}, {"Key-Value store", "Path state history indexes", stateIndex.sizeString(), stateIndex.countString()},
{"Key-Value store", "Verkle trie nodes", verkleTries.Size(), verkleTries.Count()}, {"Key-Value store", "Verkle trie nodes", verkleTries.sizeString(), verkleTries.countString()},
{"Key-Value store", "Verkle trie state lookups", verkleStateLookups.Size(), verkleStateLookups.Count()}, {"Key-Value store", "Verkle trie state lookups", verkleStateLookups.sizeString(), verkleStateLookups.countString()},
{"Key-Value store", "Trie preimages", preimages.Size(), preimages.Count()}, {"Key-Value store", "Trie preimages", preimages.sizeString(), preimages.countString()},
{"Key-Value store", "Account snapshot", accountSnaps.Size(), accountSnaps.Count()}, {"Key-Value store", "Account snapshot", accountSnaps.sizeString(), accountSnaps.countString()},
{"Key-Value store", "Storage snapshot", storageSnaps.Size(), storageSnaps.Count()}, {"Key-Value store", "Storage snapshot", storageSnaps.sizeString(), storageSnaps.countString()},
{"Key-Value store", "Beacon sync headers", beaconHeaders.Size(), beaconHeaders.Count()}, {"Key-Value store", "Beacon sync headers", beaconHeaders.sizeString(), beaconHeaders.countString()},
{"Key-Value store", "Clique snapshots", cliqueSnaps.Size(), cliqueSnaps.Count()}, {"Key-Value store", "Clique snapshots", cliqueSnaps.sizeString(), cliqueSnaps.countString()},
{"Key-Value store", "Singleton metadata", metadata.Size(), metadata.Count()}, {"Key-Value store", "Singleton metadata", metadata.sizeString(), metadata.countString()},
} }
// Inspect all registered append-only file store then. // Inspect all registered append-only file store then.
ancients, err := inspectFreezers(db) ancients, err := inspectFreezers(db)
if err != nil { if err != nil {
@ -579,16 +640,17 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
fmt.Sprintf("%d", ancient.count()), fmt.Sprintf("%d", ancient.count()),
}) })
} }
total += ancient.size() total.Add(uint64(ancient.size()))
} }
table := newTableWriter(os.Stdout) table := newTableWriter(os.Stdout)
table.SetHeader([]string{"Database", "Category", "Size", "Items"}) table.SetHeader([]string{"Database", "Category", "Size", "Items"})
table.SetFooter([]string{"", "Total", total.String(), " "}) table.SetFooter([]string{"", "Total", common.StorageSize(total.Load()).String(), fmt.Sprintf("%d", count.Load())})
table.AppendBulk(stats) table.AppendBulk(stats)
table.Render() table.Render()
if unaccounted.size > 0 { if !unaccounted.empty() {
log.Error("Database contains unaccounted data", "size", unaccounted.size, "count", unaccounted.count) log.Error("Database contains unaccounted data", "size", unaccounted.sizeString(), "count", unaccounted.countString())
for _, e := range slices.SortedFunc(maps.Values(unaccountedKeys), bytes.Compare) { for _, e := range slices.SortedFunc(maps.Values(unaccountedKeys), bytes.Compare) {
log.Error(fmt.Sprintf(" example key: %x", e)) log.Error(fmt.Sprintf(" example key: %x", e))
} }