mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-02 21:18:40 +00:00
triedb: rebuild GenerateTrie as a single 16-way partitioned pass with resume
This commit is contained in:
parent
b5ba8dd98a
commit
4935e0259d
5 changed files with 438 additions and 156 deletions
|
|
@ -208,3 +208,59 @@ func WriteSnapshotSyncStatus(db ethdb.KeyValueWriter, status []byte) {
|
||||||
log.Crit("Failed to store snapshot sync status", "err", err)
|
log.Crit("Failed to store snapshot sync status", "err", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WriteGenerateTrieProgress records a partition's current progress.
|
||||||
|
func WriteGenerateTrieProgress(db ethdb.KeyValueWriter, partition byte, hash common.Hash) {
|
||||||
|
if err := db.Put(generateTrieProgressKey(partition), hash[:]); err != nil {
|
||||||
|
log.Crit("Failed to store generate-trie progress marker", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteGenerateTrieProgress removes a partition's progress marker.
|
||||||
|
func DeleteGenerateTrieProgress(db ethdb.KeyValueWriter, partition byte) {
|
||||||
|
if err := db.Delete(generateTrieProgressKey(partition)); err != nil {
|
||||||
|
log.Crit("Failed to remove generate-trie progress marker", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadGenerateTriePartitionDone returns the raw subtree root blob for a
|
||||||
|
// partition that has previously completed.
|
||||||
|
func ReadGenerateTriePartitionDone(db ethdb.KeyValueReader, partition byte) ([]byte, bool) {
|
||||||
|
data, err := db.Get(generateTriePartitionDoneKey(partition))
|
||||||
|
if err != nil {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
if len(data) == 0 {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
switch data[0] {
|
||||||
|
case 0x00:
|
||||||
|
// Partition is done and it is empty.
|
||||||
|
return nil, true
|
||||||
|
case 0x01:
|
||||||
|
// Partition is done and the blob follows.
|
||||||
|
return data[1:], true
|
||||||
|
default:
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteGenerateTriePartitionDone records a completed partition.
|
||||||
|
func WriteGenerateTriePartitionDone(db ethdb.KeyValueWriter, partition byte, blob []byte) {
|
||||||
|
var value []byte
|
||||||
|
if blob == nil {
|
||||||
|
value = []byte{0x00}
|
||||||
|
} else {
|
||||||
|
value = append([]byte{0x01}, blob...)
|
||||||
|
}
|
||||||
|
if err := db.Put(generateTriePartitionDoneKey(partition), value); err != nil {
|
||||||
|
log.Crit("Failed to store generate-trie done marker", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteGenerateTriePartitionDone removes a partition's done marker.
|
||||||
|
func DeleteGenerateTriePartitionDone(db ethdb.KeyValueWriter, partition byte) {
|
||||||
|
if err := db.Delete(generateTriePartitionDoneKey(partition)); err != nil {
|
||||||
|
log.Crit("Failed to remove generate-trie done marker", "err", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -104,6 +104,14 @@ var (
|
||||||
// snapSyncStatusFlagKey flags that status of snap sync.
|
// snapSyncStatusFlagKey flags that status of snap sync.
|
||||||
snapSyncStatusFlagKey = []byte("SnapSyncStatus")
|
snapSyncStatusFlagKey = []byte("SnapSyncStatus")
|
||||||
|
|
||||||
|
// generateTrieProgressPrefix tracks the last account hash processed by each
|
||||||
|
// of triedb.GenerateTrie's 16 partitions.
|
||||||
|
generateTrieProgressPrefix = []byte("gtp") // generateTrieProgressPrefix + partition byte -> last account hash
|
||||||
|
|
||||||
|
// generateTriePartitionDonePrefix stores the subtree root hash of each
|
||||||
|
// triedb.GenerateTrie partition once it finishes.
|
||||||
|
generateTriePartitionDonePrefix = []byte("gtd") // generateTriePartitionDonePrefix + partition byte -> subtree root hash
|
||||||
|
|
||||||
// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
|
// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
|
||||||
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
|
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
|
||||||
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td (deprecated)
|
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td (deprecated)
|
||||||
|
|
@ -465,3 +473,13 @@ func trienodeHistoryIndexBlockKey(addressHash common.Hash, path []byte, blockID
|
||||||
func transitionStateKey(hash common.Hash) []byte {
|
func transitionStateKey(hash common.Hash) []byte {
|
||||||
return append(VerkleTransitionStatePrefix, hash.Bytes()...)
|
return append(VerkleTransitionStatePrefix, hash.Bytes()...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// generateTrieProgressKey = generateTrieProgressPrefix + partition (single byte).
|
||||||
|
func generateTrieProgressKey(partition byte) []byte {
|
||||||
|
return append(generateTrieProgressPrefix, partition)
|
||||||
|
}
|
||||||
|
|
||||||
|
// generateTriePartitionDoneKey = generateTriePartitionDonePrefix + partition (single byte).
|
||||||
|
func generateTriePartitionDoneKey(partition byte) []byte {
|
||||||
|
return append(generateTriePartitionDonePrefix, partition)
|
||||||
|
}
|
||||||
|
|
|
||||||
75
trie/node.go
75
trie/node.go
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -335,3 +336,77 @@ func wrapError(err error, ctx string) error {
|
||||||
func (err *decodeError) Error() string {
|
func (err *decodeError) Error() string {
|
||||||
return fmt.Sprintf("%v (decode path: %s)", err.what, strings.Join(err.stack, "<-"))
|
return fmt.Sprintf("%v (decode path: %s)", err.what, strings.Join(err.stack, "<-"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StripPartitionRoot strips the leading nibble n from a partition subtree
|
||||||
|
// root blob produced by a StackTrie built over keys that all share that
|
||||||
|
// nibble. Returns the hash the canonical top-level branch should mount in
|
||||||
|
// slot n and, if a new node had to be constructed during stripping, the
|
||||||
|
// blob the caller must persist.
|
||||||
|
func StripPartitionRoot(blob []byte, n byte) (hash common.Hash, writeBlob []byte, err error) {
|
||||||
|
elems, err := decodeNodeElements(blob)
|
||||||
|
if err != nil {
|
||||||
|
return common.Hash{}, nil, fmt.Errorf("decode partition root: %w", err)
|
||||||
|
}
|
||||||
|
if len(elems) != 2 {
|
||||||
|
return common.Hash{}, nil, fmt.Errorf("expected shortNode (2 elements), got %d", len(elems))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Elements from SplitListValues come with their RLP tag. Strip the
|
||||||
|
// tag off the compact-key element to get the raw compact bytes.
|
||||||
|
compactKey, _, err := rlp.SplitString(elems[0])
|
||||||
|
if err != nil {
|
||||||
|
return common.Hash{}, nil, fmt.Errorf("parse compact key: %w", err)
|
||||||
|
}
|
||||||
|
hex := compactToHex(compactKey)
|
||||||
|
if len(hex) == 0 {
|
||||||
|
return common.Hash{}, nil, fmt.Errorf("partition root has empty key")
|
||||||
|
}
|
||||||
|
if hex[0] != n {
|
||||||
|
return common.Hash{}, nil, fmt.Errorf("partition root key starts with nibble %d, want %d", hex[0], n)
|
||||||
|
}
|
||||||
|
childOrValue := elems[1]
|
||||||
|
|
||||||
|
// Case 1: extension of exactly [n] -> reuse the existing child. This is the
|
||||||
|
// common case, the partition has many accounts, they all share exactly the
|
||||||
|
// leading N, and diverge at the second nibble.
|
||||||
|
if !hasTerm(hex) && len(hex) == 1 {
|
||||||
|
content, _, err := rlp.SplitString(childOrValue)
|
||||||
|
if err != nil {
|
||||||
|
return common.Hash{}, nil, fmt.Errorf("parse child ref: %w", err)
|
||||||
|
}
|
||||||
|
if len(content) != common.HashLength {
|
||||||
|
return common.Hash{}, nil, fmt.Errorf("child ref is %d bytes, expected 32", len(content))
|
||||||
|
}
|
||||||
|
return common.BytesToHash(content), nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Case 2: extension with path [n, more...] -> The new node is an extension
|
||||||
|
// with path [more...], same child. All accounts in the partition happen to
|
||||||
|
// share the second nibble (or more) too. The extension "ate" more than just
|
||||||
|
// the leading N.
|
||||||
|
strippedCompact := hexToCompact(hex[1:])
|
||||||
|
newKeyRLP, err := rlp.EncodeToBytes(strippedCompact)
|
||||||
|
if err != nil {
|
||||||
|
return common.Hash{}, nil, fmt.Errorf("encode stripped key: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Case 3: leaf with path [n, more..., term] -> The new node is a leaf with
|
||||||
|
// path [more..., term], same value. The partition's single account produces
|
||||||
|
// a leaf whose path is the full 64-nibble account hash plus terminator.
|
||||||
|
writeBlob, err = encodeNodeElements([][]byte{newKeyRLP, childOrValue})
|
||||||
|
if err != nil {
|
||||||
|
return common.Hash{}, nil, fmt.Errorf("encode stripped node: %w", err)
|
||||||
|
}
|
||||||
|
return crypto.Keccak256Hash(writeBlob), writeBlob, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AssembleBranch constructs a fullNode (17-slot branch) from the given
|
||||||
|
// children and returns its RLP encoding and 32-byte hash.
|
||||||
|
func AssembleBranch(children [17][]byte) ([]byte, common.Hash, error) {
|
||||||
|
fn := &fullnodeEncoder{Children: children}
|
||||||
|
w := rlp.NewEncoderBuffer(nil)
|
||||||
|
fn.encode(w)
|
||||||
|
blob := w.ToBytes()
|
||||||
|
w.Flush()
|
||||||
|
return blob, crypto.Keccak256Hash(blob), nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,17 +21,17 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"runtime"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/ethdb/memorydb"
|
"github.com/ethereum/go-ethereum/ethdb/memorydb"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
"github.com/ethereum/go-ethereum/triedb/internal"
|
"github.com/ethereum/go-ethereum/triedb/internal"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
@ -41,68 +41,9 @@ import (
|
||||||
// channel before completing.
|
// channel before completing.
|
||||||
var ErrCancelled = internal.ErrCancelled
|
var ErrCancelled = internal.ErrCancelled
|
||||||
|
|
||||||
// updateStorageRootsProgressPrefix is the key prefix used to persist a
|
// numPartitions is the number of slices the account hash space is divided
|
||||||
// per-partition progress marker during updateStorageRoots.
|
// into by GenerateTrie.
|
||||||
var updateStorageRootsProgressPrefix = []byte("triedb-updsr-")
|
const numPartitions = 16
|
||||||
|
|
||||||
func updateStorageRootsProgressKey(partition int) []byte {
|
|
||||||
return append(updateStorageRootsProgressPrefix, byte(partition))
|
|
||||||
}
|
|
||||||
|
|
||||||
// kvAccountIterator wraps an ethdb.Iterator to iterate over account snapshot
|
|
||||||
// entries in the database, implementing internal.AccountIterator.
|
|
||||||
type kvAccountIterator struct {
|
|
||||||
it ethdb.Iterator
|
|
||||||
hash common.Hash
|
|
||||||
}
|
|
||||||
|
|
||||||
func newKVAccountIterator(db ethdb.Iteratee) *kvAccountIterator {
|
|
||||||
it := rawdb.NewKeyLengthIterator(
|
|
||||||
db.NewIterator(rawdb.SnapshotAccountPrefix, nil),
|
|
||||||
len(rawdb.SnapshotAccountPrefix)+common.HashLength,
|
|
||||||
)
|
|
||||||
return &kvAccountIterator{it: it}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (it *kvAccountIterator) Next() bool {
|
|
||||||
if !it.it.Next() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
key := it.it.Key()
|
|
||||||
copy(it.hash[:], key[len(rawdb.SnapshotAccountPrefix):])
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (it *kvAccountIterator) Hash() common.Hash { return it.hash }
|
|
||||||
func (it *kvAccountIterator) Account() []byte { return it.it.Value() }
|
|
||||||
func (it *kvAccountIterator) Error() error { return it.it.Error() }
|
|
||||||
func (it *kvAccountIterator) Release() { it.it.Release() }
|
|
||||||
|
|
||||||
// kvStorageIterator wraps an ethdb.Iterator to iterate over storage snapshot
|
|
||||||
// entries for a specific account, implementing internal.StorageIterator.
|
|
||||||
type kvStorageIterator struct {
|
|
||||||
it ethdb.Iterator
|
|
||||||
hash common.Hash
|
|
||||||
}
|
|
||||||
|
|
||||||
func newKVStorageIterator(db ethdb.Iteratee, accountHash common.Hash) *kvStorageIterator {
|
|
||||||
it := rawdb.IterateStorageSnapshots(db, accountHash)
|
|
||||||
return &kvStorageIterator{it: it}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (it *kvStorageIterator) Next() bool {
|
|
||||||
if !it.it.Next() {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
key := it.it.Key()
|
|
||||||
copy(it.hash[:], key[len(rawdb.SnapshotStoragePrefix)+common.HashLength:])
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (it *kvStorageIterator) Hash() common.Hash { return it.hash }
|
|
||||||
func (it *kvStorageIterator) Slot() []byte { return it.it.Value() }
|
|
||||||
func (it *kvStorageIterator) Error() error { return it.it.Error() }
|
|
||||||
func (it *kvStorageIterator) Release() { it.it.Release() }
|
|
||||||
|
|
||||||
// rangeIterators bundles the per-partition account and storage iterators.
|
// rangeIterators bundles the per-partition account and storage iterators.
|
||||||
type rangeIterators struct {
|
type rangeIterators struct {
|
||||||
|
|
@ -152,77 +93,58 @@ func reopenFlatIterator(db ethdb.Database, old *internal.HoldableIterator, prefi
|
||||||
return openFlatIterator(db, prefix, next[len(prefix):], suffixLen)
|
return openFlatIterator(db, prefix, next[len(prefix):], suffixLen)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateStorageRoots walks flat-state accounts and updates each account's
|
// generatePartition walks accounts whose first nibble equals `partition`,
|
||||||
// Root to match the storage root computed from its flat storage slots.
|
// reconciling each account's Root with its flat storage and building
|
||||||
func updateStorageRoots(db ethdb.Database, cancel <-chan struct{}) error {
|
// both per-account storage subtries and the partition's slice of the
|
||||||
start := time.Now()
|
// account trie. Returns the raw (unstripped) partition root blob, or
|
||||||
threads := runtime.NumCPU()
|
// nil if the partition had no accounts at all.
|
||||||
var (
|
func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Database, scheme string, partition byte, rangeStart, rangeEnd common.Hash, scanned, updated *atomic.Int64) ([]byte, error) {
|
||||||
batchMu sync.Mutex
|
|
||||||
batch = db.NewBatch()
|
|
||||||
scanned atomic.Int64
|
|
||||||
updated atomic.Int64
|
|
||||||
)
|
|
||||||
eg, ctx := errgroup.WithContext(context.Background())
|
|
||||||
|
|
||||||
// Spawn one worker per hash-space partition. Each walker handles its
|
|
||||||
// [rangeStart, rangeEnd] slice independently. errgroup cancels ctx
|
|
||||||
// on the first error so peers exit.
|
|
||||||
for i, r := range hashRanges(threads) {
|
|
||||||
partition := i
|
|
||||||
rangeStart, rangeEnd := r[0], r[1]
|
|
||||||
eg.Go(func() error {
|
|
||||||
return updateStorageRootsInRange(ctx, cancel, db, partition, rangeStart, rangeEnd, &batchMu, batch, &scanned, &updated)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if err := eg.Wait(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clean up the progress markers now that every partition has finished
|
|
||||||
// successfully.
|
|
||||||
for i := 0; i < threads; i++ {
|
|
||||||
batch.Delete(updateStorageRootsProgressKey(i))
|
|
||||||
}
|
|
||||||
if err := batch.Write(); err != nil {
|
|
||||||
return fmt.Errorf("final batch write: %w", err)
|
|
||||||
}
|
|
||||||
log.Info("Updated stale storage roots", "scanned", scanned.Load(), "updated", updated.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateStorageRootsInRange walks accounts whose hashes fall inside
|
|
||||||
// [rangeStart, rangeEnd] and fixes each account's Root to match its flat
|
|
||||||
// storage.
|
|
||||||
func updateStorageRootsInRange(ctx context.Context, cancel <-chan struct{}, db ethdb.Database, partition int, rangeStart, rangeEnd common.Hash, batchMu *sync.Mutex, batch ethdb.Batch, scanned, updated *atomic.Int64) error {
|
|
||||||
iters := openRangeIterators(db, rangeStart)
|
iters := openRangeIterators(db, rangeStart)
|
||||||
defer iters.release()
|
defer iters.release()
|
||||||
|
batch := db.NewBatch()
|
||||||
|
|
||||||
|
// Account-trie StackTrie for this partition. Persist every node except
|
||||||
|
// the root. assembleRoot() may need to strip the leading-nibble extension
|
||||||
|
// off the root, so we capture its bytes and return them instead.
|
||||||
|
var rootBlob []byte
|
||||||
|
acctTrie := trie.NewStackTrie(func(path []byte, hash common.Hash, blob []byte) {
|
||||||
|
if len(path) == 0 {
|
||||||
|
rootBlob = common.CopyBytes(blob)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rawdb.WriteTrieNode(batch, common.Hash{}, path, hash, blob, scheme)
|
||||||
|
})
|
||||||
|
|
||||||
// Iterate through all the accounts.
|
// Iterate through all the accounts.
|
||||||
for iters.acct.Next() {
|
for iters.acct.Next() {
|
||||||
select {
|
select {
|
||||||
case <-cancel:
|
case <-cancel:
|
||||||
return ErrCancelled
|
return nil, ErrCancelled
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil
|
return nil, ctx.Err()
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
key := iters.acct.Key()
|
key := iters.acct.Key()
|
||||||
var accountHash common.Hash
|
var accountHash common.Hash
|
||||||
copy(accountHash[:], key[len(rawdb.SnapshotAccountPrefix):])
|
copy(accountHash[:], key[len(rawdb.SnapshotAccountPrefix):])
|
||||||
if bytes.Compare(accountHash[:], rangeEnd[:]) > 0 {
|
if bytes.Compare(accountHash[:], rangeEnd[:]) > 0 {
|
||||||
return nil
|
break
|
||||||
}
|
}
|
||||||
scanned.Add(1)
|
scanned.Add(1)
|
||||||
account, err := types.FullAccount(iters.acct.Value())
|
account, err := types.FullAccount(iters.acct.Value())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("decode account %x: %w", accountHash, err)
|
return nil, fmt.Errorf("decode account %x: %w", accountHash, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Build the account's storage trie from the flat storage snapshot.
|
||||||
|
// StackTrie's onTrieNode callback persists nodes as they finalize.
|
||||||
|
stoTrie := trie.NewStackTrie(func(path []byte, hash common.Hash, blob []byte) {
|
||||||
|
rawdb.WriteTrieNode(batch, accountHash, path, hash, blob, scheme)
|
||||||
|
})
|
||||||
|
|
||||||
// Compute the storage root by consuming matching slots from the
|
// Compute the storage root by consuming matching slots from the
|
||||||
// shared storage iterator. The inner loop terminates on Hold()
|
// shared storage iterator. The inner loop terminates on Hold()
|
||||||
// (slot belongs to a later account) or exhaustion.
|
// (slot belongs to a later account) or exhaustion.
|
||||||
t := trie.NewStackTrie(nil)
|
|
||||||
for iters.stor.Next() {
|
for iters.stor.Next() {
|
||||||
sk := iters.stor.Key()
|
sk := iters.stor.Key()
|
||||||
storAcc := sk[len(rawdb.SnapshotStoragePrefix) : len(rawdb.SnapshotStoragePrefix)+common.HashLength]
|
storAcc := sk[len(rawdb.SnapshotStoragePrefix) : len(rawdb.SnapshotStoragePrefix)+common.HashLength]
|
||||||
|
|
@ -247,43 +169,60 @@ func updateStorageRootsInRange(ctx context.Context, cancel <-chan struct{}, db e
|
||||||
|
|
||||||
// The slot belongs to this account so we add it to the StackTrie.
|
// The slot belongs to this account so we add it to the StackTrie.
|
||||||
slotHash := sk[len(rawdb.SnapshotStoragePrefix)+common.HashLength:]
|
slotHash := sk[len(rawdb.SnapshotStoragePrefix)+common.HashLength:]
|
||||||
if err := t.Update(slotHash, iters.stor.Value()); err != nil {
|
if err := stoTrie.Update(slotHash, iters.stor.Value()); err != nil {
|
||||||
return fmt.Errorf("stack trie update for %x: %w", accountHash, err)
|
return nil, fmt.Errorf("storage stack trie update for %x: %w", accountHash, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := iters.stor.Error(); err != nil {
|
if err := iters.stor.Error(); err != nil {
|
||||||
return fmt.Errorf("storage iterator: %w", err)
|
return nil, fmt.Errorf("storage iterator: %w", err)
|
||||||
}
|
}
|
||||||
computed := t.Hash()
|
computed := stoTrie.Hash()
|
||||||
|
|
||||||
// Update the account, progress marker, and (possibly) the batch.
|
// If account.Root was stale, rewrite the flat-state entry. Then feed
|
||||||
var (
|
// the account, now with the correct Root, into this partition's
|
||||||
flushed bool
|
// account trie.
|
||||||
flushErr error
|
|
||||||
)
|
|
||||||
batchMu.Lock()
|
|
||||||
if computed != account.Root {
|
if computed != account.Root {
|
||||||
account.Root = computed
|
account.Root = computed
|
||||||
rawdb.WriteAccountSnapshot(batch, accountHash, types.SlimAccountRLP(*account))
|
rawdb.WriteAccountSnapshot(batch, accountHash, types.SlimAccountRLP(*account))
|
||||||
updated.Add(1)
|
updated.Add(1)
|
||||||
}
|
}
|
||||||
batch.Put(updateStorageRootsProgressKey(partition), accountHash[:])
|
fullAccount, err := rlp.EncodeToBytes(account)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("encode account %x: %w", accountHash, err)
|
||||||
|
}
|
||||||
|
if err := acctTrie.Update(accountHash[:], fullAccount); err != nil {
|
||||||
|
return nil, fmt.Errorf("account stack trie update for %x: %w", accountHash, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Progress marker keeps the batch growing on a predictable
|
||||||
|
// rate. The size check drives flush + iterator reopen so
|
||||||
|
// pebble compactions aren't blocked by long-lived iterators.
|
||||||
|
rawdb.WriteGenerateTrieProgress(batch, partition, accountHash)
|
||||||
if batch.ValueSize() > ethdb.IdealBatchSize {
|
if batch.ValueSize() > ethdb.IdealBatchSize {
|
||||||
flushErr = batch.Write()
|
if err := batch.Write(); err != nil {
|
||||||
if flushErr == nil {
|
return nil, fmt.Errorf("flush batch: %w", err)
|
||||||
batch.Reset()
|
|
||||||
flushed = true
|
|
||||||
}
|
}
|
||||||
}
|
batch.Reset()
|
||||||
batchMu.Unlock()
|
|
||||||
if flushErr != nil {
|
|
||||||
return fmt.Errorf("flush batch: %w", flushErr)
|
|
||||||
}
|
|
||||||
if flushed {
|
|
||||||
iters.reopen()
|
iters.reopen()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return iters.acct.Error()
|
if err := iters.acct.Error(); err != nil {
|
||||||
|
return nil, fmt.Errorf("account iterator: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finalize the partition's account trie. For a non-empty partition
|
||||||
|
// this triggers the path=[] onTrieNode callback, populating
|
||||||
|
// rootBlob. An empty partition never emits any node and leaves
|
||||||
|
// rootBlob at nil.
|
||||||
|
acctTrie.Hash()
|
||||||
|
|
||||||
|
// Clear the progress marker since it's no longer needed once the
|
||||||
|
// partition's batch is flushed.
|
||||||
|
rawdb.DeleteGenerateTrieProgress(batch, partition)
|
||||||
|
if err := batch.Write(); err != nil {
|
||||||
|
return nil, fmt.Errorf("final partition batch write: %w", err)
|
||||||
|
}
|
||||||
|
return rootBlob, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// hashRanges returns hash pairs [start, end] that evenly partition the
|
// hashRanges returns hash pairs [start, end] that evenly partition the
|
||||||
|
|
@ -299,7 +238,7 @@ func hashRanges(total int) [][2]common.Hash {
|
||||||
)
|
)
|
||||||
ranges := make([][2]common.Hash, total)
|
ranges := make([][2]common.Hash, total)
|
||||||
var next common.Hash
|
var next common.Hash
|
||||||
for i := 0; i < total; i++ {
|
for i := range total {
|
||||||
last := common.BigToHash(new(big.Int).Add(next.Big(), step))
|
last := common.BigToHash(new(big.Int).Add(next.Big(), step))
|
||||||
if i == total-1 {
|
if i == total-1 {
|
||||||
last = common.MaxHash
|
last = common.MaxHash
|
||||||
|
|
@ -311,31 +250,148 @@ func hashRanges(total int) [][2]common.Hash {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GenerateTrie rebuilds all tries (storage + account) from flat snapshot
|
// GenerateTrie rebuilds all tries (storage + account) from flat snapshot
|
||||||
// data in the database. It first brings every account's Root into
|
// data in the database. The account hash space is partitioned into 16
|
||||||
// agreement with its flat storage, then builds tries using StackTrie with
|
// slices aligned with the first-nibble branching of the MPT root. Each
|
||||||
// streaming node writes, and verifies that the computed state root matches
|
// partition is processed by its own goroutine, which walks its slice,
|
||||||
// the expected root.
|
// reconciles stale account.Root fields with flat storage, builds the
|
||||||
|
// per-account storage tries and the partition's slice of the account
|
||||||
|
// trie. Once every partition has produced its subtree root, the top-level
|
||||||
|
// branch is assembled and its hash verified against the expected root.
|
||||||
|
//
|
||||||
|
// Resume: on entry, any partition that has a "done" marker from a
|
||||||
|
// previous run is skipped. Its subtree blob is read from the marker
|
||||||
|
// and handed to assembleRoot directly. On a mid-run crash, only the
|
||||||
|
// in-flight partition(s) are redone.
|
||||||
func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-chan struct{}) error {
|
func GenerateTrie(db ethdb.Database, scheme string, root common.Hash, cancel <-chan struct{}) error {
|
||||||
if err := updateStorageRoots(db, cancel); err != nil {
|
start := time.Now()
|
||||||
|
var (
|
||||||
|
scanned atomic.Int64
|
||||||
|
updated atomic.Int64
|
||||||
|
)
|
||||||
|
|
||||||
|
// partitionBlobs[i] holds the raw (unstripped) StackTrie root node
|
||||||
|
// blob for partition i, or nil if the partition is empty.
|
||||||
|
var partitionBlobs [numPartitions][]byte
|
||||||
|
|
||||||
|
// For each partition, either skip (prior done marker found) or run
|
||||||
|
// it. Prior runs can leave the partition's raw root blob in the done
|
||||||
|
// marker. We recover it here so assembleRoot has everything it needs.
|
||||||
|
ranges := hashRanges(numPartitions)
|
||||||
|
eg, ctx := errgroup.WithContext(context.Background())
|
||||||
|
for i, r := range ranges {
|
||||||
|
partition := byte(i)
|
||||||
|
rangeStart, rangeEnd := r[0], r[1]
|
||||||
|
if blob, ok := rawdb.ReadGenerateTriePartitionDone(db, partition); ok {
|
||||||
|
partitionBlobs[partition] = blob
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
eg.Go(func() error {
|
||||||
|
blob, err := generatePartition(ctx, cancel, db, scheme, partition, rangeStart, rangeEnd, &scanned, &updated)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
partitionBlobs[partition] = blob
|
||||||
|
// Record completion only after the partition's batch has
|
||||||
|
// flushed inside generatePartition, so this marker appears
|
||||||
|
// on disk only when every write the partition did is durable.
|
||||||
|
rawdb.WriteGenerateTriePartitionDone(db, partition, blob)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
acctIt := newKVAccountIterator(db)
|
|
||||||
defer acctIt.Release()
|
|
||||||
got, err := internal.GenerateTrieRoot(db, scheme, acctIt, common.Hash{}, internal.StackTrieGenerate, func(dst ethdb.KeyValueWriter, accountHash, codeHash common.Hash, stat *internal.GenerateStats) (common.Hash, error) {
|
|
||||||
storageIt := newKVStorageIterator(db, accountHash)
|
|
||||||
defer storageIt.Release()
|
|
||||||
|
|
||||||
hash, err := internal.GenerateTrieRoot(dst, scheme, storageIt, accountHash, internal.StackTrieGenerate, nil, stat, false, cancel)
|
// Assemble the top-level root from the partition blobs, verify it
|
||||||
if err != nil {
|
// matches the expected root, and clear all partition markers on
|
||||||
return common.Hash{}, err
|
// success.
|
||||||
}
|
got, err := assembleRoot(db, scheme, partitionBlobs)
|
||||||
return hash, nil
|
|
||||||
}, internal.NewGenerateStats(), true, cancel)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("assemble root: %w", err)
|
||||||
}
|
}
|
||||||
if got != root {
|
if got != root {
|
||||||
return fmt.Errorf("state root mismatch: got %x, want %x", got, root)
|
return fmt.Errorf("state root mismatch: got %x, want %x", got, root)
|
||||||
}
|
}
|
||||||
|
batch := db.NewBatch()
|
||||||
|
for i := range numPartitions {
|
||||||
|
rawdb.DeleteGenerateTriePartitionDone(batch, byte(i))
|
||||||
|
}
|
||||||
|
if err := batch.Write(); err != nil {
|
||||||
|
return fmt.Errorf("clear partition markers: %w", err)
|
||||||
|
}
|
||||||
|
log.Info("Generated state trie", "scanned", scanned.Load(), "updated", updated.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// assembleRoot computes the canonical state root from the 16 raw
|
||||||
|
// partition root blobs and persists any newly-constructed nodes.
|
||||||
|
// The decision about whether to strip each partition's leading-nibble
|
||||||
|
// extension depends on how many partitions ended up populated:
|
||||||
|
//
|
||||||
|
// - 0 populated: the state is empty, the root is types.EmptyRootHash,
|
||||||
|
// nothing is written.
|
||||||
|
// - 1 populated: the state's canonical root is that partition's
|
||||||
|
// subtree directly, with its leading nibble still included. We
|
||||||
|
// need to persist the partition's raw root node since generatePartition
|
||||||
|
// deliberately didn't write it at path=[].
|
||||||
|
// - 2+ populated: strip each partition so the leading-nibble extension
|
||||||
|
// isn't double-traversed by the top-level branch, then pack the 16
|
||||||
|
// stripped references into a fullNode, encode, hash, and persist that
|
||||||
|
// branch as the state root.
|
||||||
|
func assembleRoot(db ethdb.Database, scheme string, partitionBlobs [numPartitions][]byte) (common.Hash, error) {
|
||||||
|
var (
|
||||||
|
populated int
|
||||||
|
onlySlot int
|
||||||
|
)
|
||||||
|
for i := range numPartitions {
|
||||||
|
if partitionBlobs[i] != nil {
|
||||||
|
populated++
|
||||||
|
onlySlot = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if populated == 0 {
|
||||||
|
return types.EmptyRootHash, nil
|
||||||
|
}
|
||||||
|
batch := db.NewBatch()
|
||||||
|
if populated == 1 {
|
||||||
|
// Persist the partition's raw root at path=[] (path scheme) or
|
||||||
|
// at its hash (hash scheme). That node is the state root.
|
||||||
|
blob := partitionBlobs[onlySlot]
|
||||||
|
rootHash := crypto.Keccak256Hash(blob)
|
||||||
|
rawdb.WriteTrieNode(batch, common.Hash{}, nil, rootHash, blob, scheme)
|
||||||
|
if err := batch.Write(); err != nil {
|
||||||
|
return common.Hash{}, fmt.Errorf("write single-partition root: %w", err)
|
||||||
|
}
|
||||||
|
return rootHash, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// populated >= 2: strip each partition and assemble a 17-slot branch.
|
||||||
|
var children [17][]byte
|
||||||
|
for i := range numPartitions {
|
||||||
|
if partitionBlobs[i] == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
stripped, strippedBlob, err := trie.StripPartitionRoot(partitionBlobs[i], byte(i))
|
||||||
|
if err != nil {
|
||||||
|
return common.Hash{}, fmt.Errorf("strip partition %d: %w", i, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remember that strip returns nil for the common case 1.
|
||||||
|
if strippedBlob != nil {
|
||||||
|
// Strip constructed a new node that is alonger extension or leaf
|
||||||
|
// partition (case 2/3). Persist it at path=[i] so path-scheme readers
|
||||||
|
// traversing slot i of the top branch can find it.
|
||||||
|
rawdb.WriteTrieNode(batch, common.Hash{}, []byte{byte(i)}, stripped, strippedBlob, scheme)
|
||||||
|
}
|
||||||
|
children[i] = stripped.Bytes()
|
||||||
|
}
|
||||||
|
rootBlob, rootHash, err := trie.AssembleBranch(children)
|
||||||
|
if err != nil {
|
||||||
|
return common.Hash{}, err
|
||||||
|
}
|
||||||
|
rawdb.WriteTrieNode(batch, common.Hash{}, nil, rootHash, rootBlob, scheme)
|
||||||
|
if err := batch.Write(); err != nil {
|
||||||
|
return common.Hash{}, fmt.Errorf("write root branch: %w", err)
|
||||||
|
}
|
||||||
|
return rootHash, nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,9 @@ package triedb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
|
@ -238,9 +240,8 @@ func TestGenerateTrieFixesStaleRoots(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestUpdateStorageRootsCancel verifies updateStorageRoots respects the
|
// TestGenerateTrieCancel verifies GenerateTrie respects the cancel channel.
|
||||||
// cancel channel.
|
func TestGenerateTrieCancel(t *testing.T) {
|
||||||
func TestUpdateStorageRootsCancel(t *testing.T) {
|
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
db := rawdb.NewMemoryDatabase()
|
db := rawdb.NewMemoryDatabase()
|
||||||
|
|
||||||
|
|
@ -256,7 +257,7 @@ func TestUpdateStorageRootsCancel(t *testing.T) {
|
||||||
|
|
||||||
cancel := make(chan struct{})
|
cancel := make(chan struct{})
|
||||||
close(cancel)
|
close(cancel)
|
||||||
if err := updateStorageRoots(db, cancel); err != ErrCancelled {
|
if err := GenerateTrie(db, rawdb.HashScheme, common.Hash{}, cancel); err != ErrCancelled {
|
||||||
t.Fatalf("expected ErrCancelled, got %v", err)
|
t.Fatalf("expected ErrCancelled, got %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -302,3 +303,79 @@ func TestGenerateTrieOrphanStorage(t *testing.T) {
|
||||||
t.Fatalf("GenerateTrie with orphan storage failed: %v", err)
|
t.Fatalf("GenerateTrie with orphan storage failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestGenerateTriePartialResume proves that the resume path actually
|
||||||
|
// fires when a partition's done marker is present.
|
||||||
|
func TestGenerateTriePartialResume(t *testing.T) {
|
||||||
|
db := rawdb.NewMemoryDatabase()
|
||||||
|
|
||||||
|
// Build flat state. Empty storage keeps the test focused on the
|
||||||
|
// account-trie resume path.
|
||||||
|
const n = 200
|
||||||
|
accounts := make([]testAccount, 0, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
addr := common.BytesToAddress([]byte{byte(i >> 8), byte(i)})
|
||||||
|
hash := crypto.Keccak256Hash(addr[:])
|
||||||
|
acc := testAccount{
|
||||||
|
hash: hash,
|
||||||
|
account: types.StateAccount{
|
||||||
|
Nonce: uint64(i),
|
||||||
|
Balance: uint256.NewInt(uint64(i + 1)),
|
||||||
|
Root: types.EmptyRootHash,
|
||||||
|
CodeHash: types.EmptyCodeHash.Bytes(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
rawdb.WriteAccountSnapshot(db, acc.hash, types.SlimAccountRLP(acc.account))
|
||||||
|
accounts = append(accounts, acc)
|
||||||
|
}
|
||||||
|
expectedRoot := buildExpectedRoot(t, accounts)
|
||||||
|
|
||||||
|
// Step 2: run every partition once to populate trie nodes on disk
|
||||||
|
// and capture each partition's raw root blob.
|
||||||
|
var scanned, updated atomic.Int64
|
||||||
|
ranges := hashRanges(numPartitions)
|
||||||
|
blobs := make([][]byte, numPartitions)
|
||||||
|
for i, r := range ranges {
|
||||||
|
blob, err := generatePartition(context.Background(), nil, db, rawdb.HashScheme, byte(i), r[0], r[1], &scanned, &updated)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("pre-run partition %d: %v", i, err)
|
||||||
|
}
|
||||||
|
blobs[i] = blob
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 3: pre-seed done markers for even partitions only.
|
||||||
|
for i := 0; i < numPartitions; i++ {
|
||||||
|
if i%2 == 0 {
|
||||||
|
rawdb.WriteGenerateTriePartitionDone(db, byte(i), blobs[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 4: delete flat-state account snapshots for every account that
|
||||||
|
// lives in an even partition. After this, rerunning generatePartition
|
||||||
|
// for an even partition would find no accounts and produce a nil
|
||||||
|
// blob — so a correct final root requires the resume path.
|
||||||
|
deleted := 0
|
||||||
|
for _, a := range accounts {
|
||||||
|
if (a.hash[0]>>4)%2 == 0 {
|
||||||
|
rawdb.DeleteAccountSnapshot(db, a.hash)
|
||||||
|
deleted++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if deleted == 0 {
|
||||||
|
t.Fatal("test setup failure: no accounts fell in even partitions")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 5: run GenerateTrie. Success implies resume actually consulted
|
||||||
|
// the markers — without it, even partitions would yield nil blobs and
|
||||||
|
// the root check inside GenerateTrie would fail.
|
||||||
|
if err := GenerateTrie(db, rawdb.HashScheme, expectedRoot, nil); err != nil {
|
||||||
|
t.Fatalf("partial-resume GenerateTrie failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// All markers cleared on success.
|
||||||
|
for i := 0; i < numPartitions; i++ {
|
||||||
|
if _, ok := rawdb.ReadGenerateTriePartitionDone(db, byte(i)); ok {
|
||||||
|
t.Errorf("partition %d marker not cleared after successful resume", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue