diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 6a1b717066..626d390c0d 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -18,13 +18,17 @@ package rawdb import ( "bytes" + "context" "errors" "fmt" "maps" "os" "path/filepath" + "runtime" "slices" "strings" + "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -32,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/log" + "golang.org/x/sync/errgroup" ) var ErrDeleteRangeInterrupted = errors.New("safe delete range operation interrupted") @@ -362,36 +367,36 @@ func (c counter) Percentage(current uint64) string { return fmt.Sprintf("%d", current*100/uint64(c)) } -// stat stores sizes and count for a parameter +// stat provides lock-free statistics aggregation using atomic operations type stat struct { - size common.StorageSize - count counter + size uint64 + count uint64 } -// Add size to the stat and increase the counter by 1 -func (s *stat) Add(size common.StorageSize) { - s.size += size - s.count++ +func (s *stat) empty() bool { + return atomic.LoadUint64(&s.count) == 0 } -func (s *stat) Size() string { - return s.size.String() +func (s *stat) add(size common.StorageSize) { + atomic.AddUint64(&s.size, uint64(size)) + atomic.AddUint64(&s.count, 1) } -func (s *stat) Count() string { - return s.count.String() +func (s *stat) sizeString() string { + return common.StorageSize(atomic.LoadUint64(&s.size)).String() +} + +func (s *stat) countString() string { + return counter(atomic.LoadUint64(&s.count)).String() } // InspectDatabase traverses the entire database and checks the size // of all different categories of data. func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { - it := db.NewIterator(keyPrefix, keyStart) - defer it.Release() - var ( - count int64 - start = time.Now() - logged = time.Now() + start = time.Now() + count atomic.Int64 + total atomic.Uint64 // Key-value store statistics headers stat @@ -427,144 +432,200 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { metadata stat unaccounted stat - // Totals - total common.StorageSize - // This map tracks example keys for unaccounted data. // For each unique two-byte prefix, the first unaccounted key encountered // by the iterator will be stored. unaccountedKeys = make(map[[2]byte][]byte) + unaccountedMu sync.Mutex ) - // Inspect key-value database first. - for it.Next() { - var ( - key = it.Key() - size = common.StorageSize(len(key) + len(it.Value())) - ) - total += size - switch { - case bytes.HasPrefix(key, headerPrefix) && len(key) == (len(headerPrefix)+8+common.HashLength): - headers.Add(size) - case bytes.HasPrefix(key, blockBodyPrefix) && len(key) == (len(blockBodyPrefix)+8+common.HashLength): - bodies.Add(size) - case bytes.HasPrefix(key, blockReceiptsPrefix) && len(key) == (len(blockReceiptsPrefix)+8+common.HashLength): - receipts.Add(size) - case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerTDSuffix): - tds.Add(size) - case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerHashSuffix): - numHashPairings.Add(size) - case bytes.HasPrefix(key, headerNumberPrefix) && len(key) == (len(headerNumberPrefix)+common.HashLength): - hashNumPairings.Add(size) - case IsLegacyTrieNode(key, it.Value()): - legacyTries.Add(size) - case bytes.HasPrefix(key, stateIDPrefix) && len(key) == len(stateIDPrefix)+common.HashLength: - stateLookups.Add(size) - case IsAccountTrieNode(key): - accountTries.Add(size) - case IsStorageTrieNode(key): - storageTries.Add(size) - case bytes.HasPrefix(key, CodePrefix) && len(key) == len(CodePrefix)+common.HashLength: - codes.Add(size) - case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength): - txLookups.Add(size) - case bytes.HasPrefix(key, SnapshotAccountPrefix) && len(key) == (len(SnapshotAccountPrefix)+common.HashLength): - accountSnaps.Add(size) - case bytes.HasPrefix(key, SnapshotStoragePrefix) && len(key) == (len(SnapshotStoragePrefix)+2*common.HashLength): - storageSnaps.Add(size) - case bytes.HasPrefix(key, PreimagePrefix) && len(key) == (len(PreimagePrefix)+common.HashLength): - preimages.Add(size) - case bytes.HasPrefix(key, configPrefix) && len(key) == (len(configPrefix)+common.HashLength): - metadata.Add(size) - case bytes.HasPrefix(key, genesisPrefix) && len(key) == (len(genesisPrefix)+common.HashLength): - metadata.Add(size) - case bytes.HasPrefix(key, skeletonHeaderPrefix) && len(key) == (len(skeletonHeaderPrefix)+8): - beaconHeaders.Add(size) - case bytes.HasPrefix(key, CliqueSnapshotPrefix) && len(key) == 7+common.HashLength: - cliqueSnaps.Add(size) - // new log index - case bytes.HasPrefix(key, filterMapRowPrefix) && len(key) <= len(filterMapRowPrefix)+9: - filterMapRows.Add(size) - case bytes.HasPrefix(key, filterMapLastBlockPrefix) && len(key) == len(filterMapLastBlockPrefix)+4: - filterMapLastBlock.Add(size) - case bytes.HasPrefix(key, filterMapBlockLVPrefix) && len(key) == len(filterMapBlockLVPrefix)+8: - filterMapBlockLV.Add(size) - - // old log index (deprecated) - case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength): - bloomBits.Add(size) - case bytes.HasPrefix(key, bloomBitsMetaPrefix) && len(key) < len(bloomBitsMetaPrefix)+8: - bloomBits.Add(size) - - // Path-based historic state indexes - case bytes.HasPrefix(key, StateHistoryIndexPrefix) && len(key) >= len(StateHistoryIndexPrefix)+common.HashLength: - stateIndex.Add(size) - - // Verkle trie data is detected, determine the sub-category - case bytes.HasPrefix(key, VerklePrefix): - remain := key[len(VerklePrefix):] + inspectRange := func(ctx context.Context, r byte) error { + var s []byte + if len(keyStart) > 0 { switch { - case IsAccountTrieNode(remain): - verkleTries.Add(size) - case bytes.HasPrefix(remain, stateIDPrefix) && len(remain) == len(stateIDPrefix)+common.HashLength: - verkleStateLookups.Add(size) - case bytes.Equal(remain, persistentStateIDKey): - metadata.Add(size) - case bytes.Equal(remain, trieJournalKey): - metadata.Add(size) - case bytes.Equal(remain, snapSyncStatusFlagKey): - metadata.Add(size) + case r < keyStart[0]: + return nil + case r == keyStart[0]: + s = keyStart[1:] default: - unaccounted.Add(size) + // entire key range is included for inspection } + } + it := db.NewIterator(append(keyPrefix, r), s) + defer it.Release() - // Metadata keys - case slices.ContainsFunc(knownMetadataKeys, func(x []byte) bool { return bytes.Equal(x, key) }): - metadata.Add(size) + for it.Next() { + var ( + key = it.Key() + size = common.StorageSize(len(key) + len(it.Value())) + ) + total.Add(uint64(size)) + count.Add(1) - default: - unaccounted.Add(size) - if len(key) >= 2 { - prefix := [2]byte(key[:2]) - if _, ok := unaccountedKeys[prefix]; !ok { - unaccountedKeys[prefix] = bytes.Clone(key) + switch { + case bytes.HasPrefix(key, headerPrefix) && len(key) == (len(headerPrefix)+8+common.HashLength): + headers.add(size) + case bytes.HasPrefix(key, blockBodyPrefix) && len(key) == (len(blockBodyPrefix)+8+common.HashLength): + bodies.add(size) + case bytes.HasPrefix(key, blockReceiptsPrefix) && len(key) == (len(blockReceiptsPrefix)+8+common.HashLength): + receipts.add(size) + case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerTDSuffix): + tds.add(size) + case bytes.HasPrefix(key, headerPrefix) && bytes.HasSuffix(key, headerHashSuffix): + numHashPairings.add(size) + case bytes.HasPrefix(key, headerNumberPrefix) && len(key) == (len(headerNumberPrefix)+common.HashLength): + hashNumPairings.add(size) + case IsLegacyTrieNode(key, it.Value()): + legacyTries.add(size) + case bytes.HasPrefix(key, stateIDPrefix) && len(key) == len(stateIDPrefix)+common.HashLength: + stateLookups.add(size) + case IsAccountTrieNode(key): + accountTries.add(size) + case IsStorageTrieNode(key): + storageTries.add(size) + case bytes.HasPrefix(key, CodePrefix) && len(key) == len(CodePrefix)+common.HashLength: + codes.add(size) + case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength): + txLookups.add(size) + case bytes.HasPrefix(key, SnapshotAccountPrefix) && len(key) == (len(SnapshotAccountPrefix)+common.HashLength): + accountSnaps.add(size) + case bytes.HasPrefix(key, SnapshotStoragePrefix) && len(key) == (len(SnapshotStoragePrefix)+2*common.HashLength): + storageSnaps.add(size) + case bytes.HasPrefix(key, PreimagePrefix) && len(key) == (len(PreimagePrefix)+common.HashLength): + preimages.add(size) + case bytes.HasPrefix(key, configPrefix) && len(key) == (len(configPrefix)+common.HashLength): + metadata.add(size) + case bytes.HasPrefix(key, genesisPrefix) && len(key) == (len(genesisPrefix)+common.HashLength): + metadata.add(size) + case bytes.HasPrefix(key, skeletonHeaderPrefix) && len(key) == (len(skeletonHeaderPrefix)+8): + beaconHeaders.add(size) + case bytes.HasPrefix(key, CliqueSnapshotPrefix) && len(key) == 7+common.HashLength: + cliqueSnaps.add(size) + + // new log index + case bytes.HasPrefix(key, filterMapRowPrefix) && len(key) <= len(filterMapRowPrefix)+9: + filterMapRows.add(size) + case bytes.HasPrefix(key, filterMapLastBlockPrefix) && len(key) == len(filterMapLastBlockPrefix)+4: + filterMapLastBlock.add(size) + case bytes.HasPrefix(key, filterMapBlockLVPrefix) && len(key) == len(filterMapBlockLVPrefix)+8: + filterMapBlockLV.add(size) + + // old log index (deprecated) + case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength): + bloomBits.add(size) + case bytes.HasPrefix(key, bloomBitsMetaPrefix) && len(key) < len(bloomBitsMetaPrefix)+8: + bloomBits.add(size) + + // Path-based historic state indexes + case bytes.HasPrefix(key, StateHistoryIndexPrefix) && len(key) >= len(StateHistoryIndexPrefix)+common.HashLength: + stateIndex.add(size) + + // Verkle trie data is detected, determine the sub-category + case bytes.HasPrefix(key, VerklePrefix): + remain := key[len(VerklePrefix):] + switch { + case IsAccountTrieNode(remain): + verkleTries.add(size) + case bytes.HasPrefix(remain, stateIDPrefix) && len(remain) == len(stateIDPrefix)+common.HashLength: + verkleStateLookups.add(size) + case bytes.Equal(remain, persistentStateIDKey): + metadata.add(size) + case bytes.Equal(remain, trieJournalKey): + metadata.add(size) + case bytes.Equal(remain, snapSyncStatusFlagKey): + metadata.add(size) + default: + unaccounted.add(size) + } + + // Metadata keys + case slices.ContainsFunc(knownMetadataKeys, func(x []byte) bool { return bytes.Equal(x, key) }): + metadata.add(size) + + default: + unaccounted.add(size) + if len(key) >= 2 { + prefix := [2]byte(key[:2]) + unaccountedMu.Lock() + if _, ok := unaccountedKeys[prefix]; !ok { + unaccountedKeys[prefix] = bytes.Clone(key) + } + unaccountedMu.Unlock() } } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } } - count++ - if count%1000 == 0 && time.Since(logged) > 8*time.Second { - log.Info("Inspecting database", "count", count, "elapsed", common.PrettyDuration(time.Since(start))) - logged = time.Now() - } + + return it.Error() } + + var ( + eg, ctx = errgroup.WithContext(context.Background()) + workers = runtime.NumCPU() + ) + eg.SetLimit(workers) + + // Progress reporter + done := make(chan struct{}) + go func() { + ticker := time.NewTicker(8 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + log.Info("Inspecting database", "count", count.Load(), "size", common.StorageSize(total.Load()), "elapsed", common.PrettyDuration(time.Since(start))) + case <-done: + return + } + } + }() + + // Inspect key-value database in parallel. + for i := 0; i < 256; i++ { + eg.Go(func() error { return inspectRange(ctx, byte(i)) }) + } + + if err := eg.Wait(); err != nil { + close(done) + return err + } + close(done) + // Display the database statistic of key-value store. stats := [][]string{ - {"Key-Value store", "Headers", headers.Size(), headers.Count()}, - {"Key-Value store", "Bodies", bodies.Size(), bodies.Count()}, - {"Key-Value store", "Receipt lists", receipts.Size(), receipts.Count()}, - {"Key-Value store", "Difficulties (deprecated)", tds.Size(), tds.Count()}, - {"Key-Value store", "Block number->hash", numHashPairings.Size(), numHashPairings.Count()}, - {"Key-Value store", "Block hash->number", hashNumPairings.Size(), hashNumPairings.Count()}, - {"Key-Value store", "Transaction index", txLookups.Size(), txLookups.Count()}, - {"Key-Value store", "Log index filter-map rows", filterMapRows.Size(), filterMapRows.Count()}, - {"Key-Value store", "Log index last-block-of-map", filterMapLastBlock.Size(), filterMapLastBlock.Count()}, - {"Key-Value store", "Log index block-lv", filterMapBlockLV.Size(), filterMapBlockLV.Count()}, - {"Key-Value store", "Log bloombits (deprecated)", bloomBits.Size(), bloomBits.Count()}, - {"Key-Value store", "Contract codes", codes.Size(), codes.Count()}, - {"Key-Value store", "Hash trie nodes", legacyTries.Size(), legacyTries.Count()}, - {"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()}, - {"Key-Value store", "Path trie account nodes", accountTries.Size(), accountTries.Count()}, - {"Key-Value store", "Path trie storage nodes", storageTries.Size(), storageTries.Count()}, - {"Key-Value store", "Path state history indexes", stateIndex.Size(), stateIndex.Count()}, - {"Key-Value store", "Verkle trie nodes", verkleTries.Size(), verkleTries.Count()}, - {"Key-Value store", "Verkle trie state lookups", verkleStateLookups.Size(), verkleStateLookups.Count()}, - {"Key-Value store", "Trie preimages", preimages.Size(), preimages.Count()}, - {"Key-Value store", "Account snapshot", accountSnaps.Size(), accountSnaps.Count()}, - {"Key-Value store", "Storage snapshot", storageSnaps.Size(), storageSnaps.Count()}, - {"Key-Value store", "Beacon sync headers", beaconHeaders.Size(), beaconHeaders.Count()}, - {"Key-Value store", "Clique snapshots", cliqueSnaps.Size(), cliqueSnaps.Count()}, - {"Key-Value store", "Singleton metadata", metadata.Size(), metadata.Count()}, + {"Key-Value store", "Headers", headers.sizeString(), headers.countString()}, + {"Key-Value store", "Bodies", bodies.sizeString(), bodies.countString()}, + {"Key-Value store", "Receipt lists", receipts.sizeString(), receipts.countString()}, + {"Key-Value store", "Difficulties (deprecated)", tds.sizeString(), tds.countString()}, + {"Key-Value store", "Block number->hash", numHashPairings.sizeString(), numHashPairings.countString()}, + {"Key-Value store", "Block hash->number", hashNumPairings.sizeString(), hashNumPairings.countString()}, + {"Key-Value store", "Transaction index", txLookups.sizeString(), txLookups.countString()}, + {"Key-Value store", "Log index filter-map rows", filterMapRows.sizeString(), filterMapRows.countString()}, + {"Key-Value store", "Log index last-block-of-map", filterMapLastBlock.sizeString(), filterMapLastBlock.countString()}, + {"Key-Value store", "Log index block-lv", filterMapBlockLV.sizeString(), filterMapBlockLV.countString()}, + {"Key-Value store", "Log bloombits (deprecated)", bloomBits.sizeString(), bloomBits.countString()}, + {"Key-Value store", "Contract codes", codes.sizeString(), codes.countString()}, + {"Key-Value store", "Hash trie nodes", legacyTries.sizeString(), legacyTries.countString()}, + {"Key-Value store", "Path trie state lookups", stateLookups.sizeString(), stateLookups.countString()}, + {"Key-Value store", "Path trie account nodes", accountTries.sizeString(), accountTries.countString()}, + {"Key-Value store", "Path trie storage nodes", storageTries.sizeString(), storageTries.countString()}, + {"Key-Value store", "Path state history indexes", stateIndex.sizeString(), stateIndex.countString()}, + {"Key-Value store", "Verkle trie nodes", verkleTries.sizeString(), verkleTries.countString()}, + {"Key-Value store", "Verkle trie state lookups", verkleStateLookups.sizeString(), verkleStateLookups.countString()}, + {"Key-Value store", "Trie preimages", preimages.sizeString(), preimages.countString()}, + {"Key-Value store", "Account snapshot", accountSnaps.sizeString(), accountSnaps.countString()}, + {"Key-Value store", "Storage snapshot", storageSnaps.sizeString(), storageSnaps.countString()}, + {"Key-Value store", "Beacon sync headers", beaconHeaders.sizeString(), beaconHeaders.countString()}, + {"Key-Value store", "Clique snapshots", cliqueSnaps.sizeString(), cliqueSnaps.countString()}, + {"Key-Value store", "Singleton metadata", metadata.sizeString(), metadata.countString()}, } + // Inspect all registered append-only file store then. ancients, err := inspectFreezers(db) if err != nil { @@ -579,16 +640,17 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { fmt.Sprintf("%d", ancient.count()), }) } - total += ancient.size() + total.Add(uint64(ancient.size())) } + table := newTableWriter(os.Stdout) table.SetHeader([]string{"Database", "Category", "Size", "Items"}) - table.SetFooter([]string{"", "Total", total.String(), " "}) + table.SetFooter([]string{"", "Total", common.StorageSize(total.Load()).String(), fmt.Sprintf("%d", count.Load())}) table.AppendBulk(stats) table.Render() - if unaccounted.size > 0 { - log.Error("Database contains unaccounted data", "size", unaccounted.size, "count", unaccounted.count) + if !unaccounted.empty() { + log.Error("Database contains unaccounted data", "size", unaccounted.sizeString(), "count", unaccounted.countString()) for _, e := range slices.SortedFunc(maps.Values(unaccountedKeys), bytes.Compare) { log.Error(fmt.Sprintf(" example key: %x", e)) }