mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-24 08:49:29 +00:00
cmd/geth,triedb: progress ticker + ETA in GenerateTrie
This commit is contained in:
parent
da40c4574c
commit
ac46d51483
3 changed files with 126 additions and 46 deletions
|
|
@ -27,7 +27,6 @@ import (
|
|||
"path/filepath"
|
||||
"slices"
|
||||
"sort"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
|
@ -90,7 +89,7 @@ In other words, this command does the snapshot to trie conversion.
|
|||
},
|
||||
{
|
||||
Name: "generate-trie",
|
||||
Usage: "Benchmark triedb.GenerateTrie against a hardlinked checkpoint of the chaindata",
|
||||
Usage: "Benchmark triedb.GenerateTrie against a hard-linked checkpoint of the chaindata",
|
||||
ArgsUsage: "[<root>]",
|
||||
Action: benchGenerateTrie,
|
||||
Flags: slices.Concat(utils.NetworkFlags, utils.DatabaseFlags, []cli.Flag{
|
||||
|
|
@ -106,7 +105,7 @@ In other words, this command does the snapshot to trie conversion.
|
|||
Description: `
|
||||
geth snapshot generate-trie [<root>]
|
||||
|
||||
Takes a pebble checkpoint of the chaindata (hardlinked SST files, near-zero
|
||||
Takes a pebble checkpoint of the chaindata (hard-linked SST files, near-zero
|
||||
disk usage and near-instant) and runs triedb.GenerateTrie against the
|
||||
checkpoint. The source datadir is opened read-only for the checkpoint and
|
||||
never written to. The checkpoint is removed on exit unless --keep is set,
|
||||
|
|
@ -322,7 +321,7 @@ func verifyState(ctx *cli.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
// benchGenerateTrie runs triedb.GenerateTrie against a hardlinked checkpoint
|
||||
// benchGenerateTrie runs triedb.GenerateTrie against a hard-linked checkpoint
|
||||
// of the chaindata so the source datadir is never written to.
|
||||
func benchGenerateTrie(ctx *cli.Context) error {
|
||||
stack, _ := makeConfigNode(ctx)
|
||||
|
|
@ -362,7 +361,7 @@ func benchGenerateTrie(ctx *cli.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Default checkpoint sits next to chaindata so hardlinks work.
|
||||
// Default checkpoint sits next to chaindata so hard links work.
|
||||
ckpt := ctx.String("checkpoint")
|
||||
if ckpt == "" {
|
||||
ts := time.Now().Format("20060102-150405")
|
||||
|
|
@ -373,11 +372,11 @@ func benchGenerateTrie(ctx *cli.Context) error {
|
|||
}
|
||||
|
||||
log.Info("creating pebble checkpoint", "src", srcDir, "dst", ckpt)
|
||||
cpStart := time.Now()
|
||||
checkpointStart := time.Now()
|
||||
if err := makeCheckpoint(srcDir, ckpt); err != nil {
|
||||
return fmt.Errorf("checkpoint failed: %w", err)
|
||||
}
|
||||
log.Info("checkpoint created", "elapsed", time.Since(cpStart))
|
||||
log.Info("checkpoint created", "elapsed", time.Since(checkpointStart))
|
||||
|
||||
// Clean up the checkpoint on exit, including Ctrl-C.
|
||||
keep := ctx.Bool("keep")
|
||||
|
|
@ -404,7 +403,7 @@ func benchGenerateTrie(ctx *cli.Context) error {
|
|||
}()
|
||||
|
||||
// Open the checkpoint writable. Reuse source ancient. Checkpoint only
|
||||
// hardlinks the pebble SSTs (not the freezer), and GenerateTrie never
|
||||
// hard-links the pebble SSTs (not the freezer), and GenerateTrie never
|
||||
// writes to ancient, so sharing it is safe.
|
||||
srcAncient := stack.ResolveAncient("chaindata", "")
|
||||
kv, err := pebble.New(ckpt, 4096, 1024, "gentrie-bench", false)
|
||||
|
|
@ -428,35 +427,30 @@ func benchGenerateTrie(ctx *cli.Context) error {
|
|||
|
||||
log.Info("running GenerateTrie", "scheme", scheme, "root", root)
|
||||
runStart := time.Now()
|
||||
err = triedb.GenerateTrie(chaindb, scheme, root, cancelCh)
|
||||
stats, err := triedb.GenerateTrie(chaindb, scheme, root, cancelCh)
|
||||
elapsed := time.Since(runStart)
|
||||
|
||||
status := "root matched"
|
||||
if err != nil {
|
||||
// On a mid-snap-sync datadir the reconstructed root won't match the
|
||||
// expected one. Treat that as a warning so the benchmark still
|
||||
// reports wall time. Real errors (iterator, write failures) propagate.
|
||||
if strings.Contains(err.Error(), "state root mismatch") {
|
||||
log.Warn("root mismatch (expected on partial snapshot)", "err", err)
|
||||
} else {
|
||||
log.Error("GenerateTrie failed", "elapsed", elapsed, "err", err)
|
||||
return err
|
||||
}
|
||||
status = fmt.Sprintf("failed (%s)", err)
|
||||
log.Error("GenerateTrie failed", "elapsed", elapsed, "err", err)
|
||||
}
|
||||
|
||||
fmt.Printf("\n=== generate-trie benchmark ===\n")
|
||||
fmt.Printf("source: %s (untouched)\n", srcDir)
|
||||
fmt.Printf("checkpoint: %s\n", ckpt)
|
||||
fmt.Printf("scheme: %s\n", scheme)
|
||||
fmt.Printf("root: %s\n", root.Hex())
|
||||
fmt.Printf("wall time: %s\n", elapsed)
|
||||
return nil
|
||||
fmt.Printf("scheme: %s\n", scheme)
|
||||
fmt.Printf("root: %s\n", root.Hex())
|
||||
fmt.Printf("status: %s\n", status)
|
||||
fmt.Printf("accounts: %d (%d updated)\n", stats.Scanned, stats.Updated)
|
||||
fmt.Printf("wall time: %s\n", elapsed)
|
||||
return err
|
||||
}
|
||||
|
||||
// makeCheckpoint opens srcDir as a pebble database and writes a hardlinked
|
||||
// makeCheckpoint opens srcDir as a pebble database and writes a hard-linked
|
||||
// checkpoint to dstDir. Source is closed on return.
|
||||
//
|
||||
// Opens read-write so pebble can finalize its startup (WAL replay, fresh
|
||||
// OPTIONS file) before checkpointing. Read-only mode skips that step, and
|
||||
// Checkpoint then fails trying to hardlink the missing OPTIONS file. The
|
||||
// Checkpoint then fails trying to hard-link the missing OPTIONS file. The
|
||||
// read-write open does no more than a normal geth startup would.
|
||||
func makeCheckpoint(srcDir, dstDir string) error {
|
||||
db, err := pebbleimpl.Open(srcDir, &pebbleimpl.Options{})
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package triedb
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"sync/atomic"
|
||||
|
|
@ -41,10 +42,27 @@ import (
|
|||
// channel before completing.
|
||||
var ErrCancelled = internal.ErrCancelled
|
||||
|
||||
// GenerateStats reports per-run counters from GenerateTrie. Scanned is
|
||||
// the number of accounts walked, Updated is how many had a stale Root
|
||||
// field that was rewritten to match the recomputed storage root.
|
||||
type GenerateStats struct {
|
||||
Scanned int64
|
||||
Updated int64
|
||||
}
|
||||
|
||||
// numPartitions is the number of slices the account hash space is divided
|
||||
// into by GenerateTrie.
|
||||
const numPartitions = 16
|
||||
|
||||
// Each partition covers 1/16 of the account hash space. We track progress
|
||||
// by interpreting the top 8 bytes of an account hash as a uint64, so each
|
||||
// partition spans 2^64 / 16 = 2^60. partitionFinished is stored in a
|
||||
// partition's position when it completes.
|
||||
const (
|
||||
partitionRangeSize = uint64(1) << 60
|
||||
partitionFinished = ^uint64(0)
|
||||
)
|
||||
|
||||
// rangeIterators bundles the per-partition account and storage iterators.
|
||||
type rangeIterators struct {
|
||||
db ethdb.Database
|
||||
|
|
@ -100,7 +118,7 @@ func reopenFlatIterator(db ethdb.Database, old *internal.HoldableIterator, prefi
|
|||
// both per-account storage subtries and the partition's slice of the
|
||||
// account trie. Returns the raw (unstripped) partition root blob, or
|
||||
// nil if the partition had no accounts at all.
|
||||
func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Database, scheme string, partition byte, rangeStart, rangeEnd common.Hash, scanned, updated *atomic.Int64) ([]byte, error) {
|
||||
func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Database, scheme string, partition byte, rangeStart, rangeEnd common.Hash, scanned, updated *atomic.Int64, pos *atomic.Uint64) ([]byte, error) {
|
||||
iters := openRangeIterators(db, rangeStart)
|
||||
defer iters.release()
|
||||
batch := db.NewBatch()
|
||||
|
|
@ -133,6 +151,7 @@ func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Dat
|
|||
break
|
||||
}
|
||||
scanned.Add(1)
|
||||
pos.Store(binary.BigEndian.Uint64(accountHash[:8]))
|
||||
account, err := types.FullAccount(iters.acct.Value())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode account %x: %w", accountHash, err)
|
||||
|
|
@ -275,13 +294,18 @@ func hashRanges(total int) [][2]common.Hash {
|
|||
// previous run is skipped. Its subtree blob is read from the marker
|
||||
// and handed to assembleRoot directly. On a mid-run crash, only the
|
||||
// in-flight partition(s) are redone.
|
||||
func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-chan struct{}) error {
|
||||
func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-chan struct{}) (GenerateStats, error) {
|
||||
start := time.Now()
|
||||
var (
|
||||
scanned atomic.Int64
|
||||
updated atomic.Int64
|
||||
scanned atomic.Int64
|
||||
updated atomic.Int64
|
||||
partitionPos [numPartitions]atomic.Uint64
|
||||
)
|
||||
|
||||
progressDone := make(chan struct{})
|
||||
go tickProgress(progressDone, start, &scanned, &updated, &partitionPos)
|
||||
defer close(progressDone)
|
||||
|
||||
// partitionBlobs[i] holds the raw (unstripped) StackTrie root node
|
||||
// blob for partition i, or nil if the partition is empty.
|
||||
var partitionBlobs [numPartitions][]byte
|
||||
|
|
@ -296,13 +320,17 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c
|
|||
rangeStart, rangeEnd := r[0], r[1]
|
||||
if blob, ok := rawdb.ReadGenerateTriePartitionDone(db, partition); ok {
|
||||
partitionBlobs[partition] = blob
|
||||
partitionPos[partition].Store(partitionFinished)
|
||||
continue
|
||||
}
|
||||
eg.Go(func() error {
|
||||
blob, err := generatePartition(ctx, cancel, db, scheme, partition, rangeStart, rangeEnd, &scanned, &updated)
|
||||
partitionStart := time.Now()
|
||||
blob, err := generatePartition(ctx, cancel, db, scheme, partition, rangeStart, rangeEnd, &scanned, &updated, &partitionPos[partition])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Info("Partition done", "partition", partition, "elapsed", common.PrettyDuration(time.Since(partitionStart)))
|
||||
partitionPos[partition].Store(partitionFinished)
|
||||
partitionBlobs[partition] = blob
|
||||
// Record completion only after the partition's batch has
|
||||
// flushed inside generatePartition, so this marker appears
|
||||
|
|
@ -311,8 +339,11 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c
|
|||
return nil
|
||||
})
|
||||
}
|
||||
stats := func() GenerateStats {
|
||||
return GenerateStats{Scanned: scanned.Load(), Updated: updated.Load()}
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
return err
|
||||
return stats(), err
|
||||
}
|
||||
|
||||
// Assemble the top-level root from the partition blobs, verify it
|
||||
|
|
@ -320,20 +351,21 @@ func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-c
|
|||
// success.
|
||||
got, err := assembleRoot(db, scheme, partitionBlobs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("assemble root: %w", err)
|
||||
return stats(), fmt.Errorf("assemble root: %w", err)
|
||||
}
|
||||
if got != root {
|
||||
return fmt.Errorf("state root mismatch: got %x, want %x", got, root)
|
||||
return stats(), fmt.Errorf("state root mismatch: got %x, want %x", got, root)
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
for i := range numPartitions {
|
||||
rawdb.DeleteGenerateTriePartitionDone(batch, byte(i))
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
return fmt.Errorf("clear partition markers: %w", err)
|
||||
return stats(), fmt.Errorf("clear partition markers: %w", err)
|
||||
}
|
||||
log.Info("Generated state trie", "scanned", scanned.Load(), "updated", updated.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return nil
|
||||
final := stats()
|
||||
log.Info("Generated state trie", "scanned", final.Scanned, "updated", final.Updated, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return final, nil
|
||||
}
|
||||
|
||||
// assembleRoot computes the canonical state root from the 16 raw
|
||||
|
|
@ -408,3 +440,56 @@ func assembleRoot(db ethdb.Database, scheme string, partitionBlobs [numPartition
|
|||
}
|
||||
return rootHash, nil
|
||||
}
|
||||
|
||||
// tickProgress logs an aggregate progress line every 30 seconds until done
|
||||
// is closed. Cheap: a handful of atomic loads and one log line per tick.
|
||||
func tickProgress(done <-chan struct{}, start time.Time, scanned, updated *atomic.Int64, positions *[numPartitions]atomic.Uint64) {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
elapsed := time.Since(start)
|
||||
fraction := progressFraction(positions)
|
||||
eta := "n/a"
|
||||
if fraction > 0.005 {
|
||||
eta = common.PrettyDuration(time.Duration(float64(elapsed) * (1.0/fraction - 1.0))).String()
|
||||
}
|
||||
s, u := scanned.Load(), updated.Load()
|
||||
log.Info("Generating trie",
|
||||
"progress", fmt.Sprintf("%.1f%%", fraction*100),
|
||||
"eta", eta,
|
||||
"scanned", s, "updated", u,
|
||||
"elapsed", common.PrettyDuration(elapsed),
|
||||
"acct/s", uint64(float64(s)/elapsed.Seconds()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// progressFraction averages each partition's iterator position (as a fraction
|
||||
// of its hash range) into an overall completion estimate in [0, 1]. Keccak
|
||||
// hashes are uniform, so keyspace position is a good proxy for work done.
|
||||
func progressFraction(positions *[numPartitions]atomic.Uint64) float64 {
|
||||
var total float64
|
||||
for i := range numPartitions {
|
||||
p := positions[i].Load()
|
||||
switch {
|
||||
case p == partitionFinished:
|
||||
total += 1.0
|
||||
case p == 0:
|
||||
// not started yet
|
||||
default:
|
||||
rangeStart := uint64(i) * partitionRangeSize
|
||||
if p > rangeStart {
|
||||
rel := p - rangeStart
|
||||
if rel > partitionRangeSize {
|
||||
rel = partitionRangeSize
|
||||
}
|
||||
total += float64(rel) / float64(partitionRangeSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
return total / float64(numPartitions)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ func computeStorageRootFromSlots(slots []testSlot) common.Hash {
|
|||
|
||||
func TestGenerateTrieEmpty(t *testing.T) {
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
if err := GenerateTrie(db, rawdb.HashScheme, types.EmptyRootHash, nil); err != nil {
|
||||
if _, err := GenerateTrie(db, rawdb.HashScheme, types.EmptyRootHash, nil); err != nil {
|
||||
t.Fatalf("GenerateTrie on empty state failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -110,7 +110,7 @@ func TestGenerateTrieAccountsOnly(t *testing.T) {
|
|||
}
|
||||
root := buildExpectedRoot(t, accounts)
|
||||
|
||||
if err := GenerateTrie(db, rawdb.HashScheme, root, nil); err != nil {
|
||||
if _, err := GenerateTrie(db, rawdb.HashScheme, root, nil); err != nil {
|
||||
t.Fatalf("GenerateTrie failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -157,7 +157,7 @@ func TestGenerateTrieWithStorage(t *testing.T) {
|
|||
}
|
||||
root := buildExpectedRoot(t, accounts)
|
||||
|
||||
if err := GenerateTrie(db, rawdb.HashScheme, root, nil); err != nil {
|
||||
if _, err := GenerateTrie(db, rawdb.HashScheme, root, nil); err != nil {
|
||||
t.Fatalf("GenerateTrie failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -174,7 +174,7 @@ func TestGenerateTrieRootMismatch(t *testing.T) {
|
|||
rawdb.WriteAccountSnapshot(db, common.HexToHash("0x01"), types.SlimAccountRLP(acct))
|
||||
|
||||
wrongRoot := common.HexToHash("0xdeadbeef")
|
||||
err := GenerateTrie(db, rawdb.HashScheme, wrongRoot, nil)
|
||||
_, err := GenerateTrie(db, rawdb.HashScheme, wrongRoot, nil)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for root mismatch, got nil")
|
||||
}
|
||||
|
|
@ -235,7 +235,7 @@ func TestGenerateTrieFixesStaleRoots(t *testing.T) {
|
|||
rawdb.WriteAccountSnapshot(db, a.hash, types.SlimAccountRLP(onDisk))
|
||||
}
|
||||
|
||||
if err := GenerateTrie(db, rawdb.HashScheme, expectedRoot, nil); err != nil {
|
||||
if _, err := GenerateTrie(db, rawdb.HashScheme, expectedRoot, nil); err != nil {
|
||||
t.Fatalf("GenerateTrie failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -257,7 +257,7 @@ func TestGenerateTrieCancel(t *testing.T) {
|
|||
|
||||
cancel := make(chan struct{})
|
||||
close(cancel)
|
||||
if err := GenerateTrie(db, rawdb.HashScheme, common.Hash{}, cancel); err != ErrCancelled {
|
||||
if _, err := GenerateTrie(db, rawdb.HashScheme, common.Hash{}, cancel); err != ErrCancelled {
|
||||
t.Fatalf("expected ErrCancelled, got %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -299,7 +299,7 @@ func TestGenerateTrieOrphanStorage(t *testing.T) {
|
|||
|
||||
expectedRoot := buildExpectedRoot(t, []testAccount{acc})
|
||||
|
||||
if err := GenerateTrie(db, rawdb.HashScheme, expectedRoot, nil); err != nil {
|
||||
if _, err := GenerateTrie(db, rawdb.HashScheme, expectedRoot, nil); err != nil {
|
||||
t.Fatalf("GenerateTrie with orphan storage failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -336,7 +336,8 @@ func TestGenerateTriePartialResume(t *testing.T) {
|
|||
ranges := hashRanges(numPartitions)
|
||||
blobs := make([][]byte, numPartitions)
|
||||
for i, r := range ranges {
|
||||
blob, err := generatePartition(context.Background(), nil, db, rawdb.HashScheme, byte(i), r[0], r[1], &scanned, &updated)
|
||||
var pos atomic.Uint64
|
||||
blob, err := generatePartition(context.Background(), nil, db, rawdb.HashScheme, byte(i), r[0], r[1], &scanned, &updated, &pos)
|
||||
if err != nil {
|
||||
t.Fatalf("pre-run partition %d: %v", i, err)
|
||||
}
|
||||
|
|
@ -368,7 +369,7 @@ func TestGenerateTriePartialResume(t *testing.T) {
|
|||
// Step 5: run GenerateTrie. Success implies resume actually consulted
|
||||
// the markers — without it, even partitions would yield nil blobs and
|
||||
// the root check inside GenerateTrie would fail.
|
||||
if err := GenerateTrie(db, rawdb.HashScheme, expectedRoot, nil); err != nil {
|
||||
if _, err := GenerateTrie(db, rawdb.HashScheme, expectedRoot, nil); err != nil {
|
||||
t.Fatalf("partial-resume GenerateTrie failed: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue