From 5e23a29b7384957ce93477d28e574f20f8042623 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 26 Mar 2026 12:16:27 +0800 Subject: [PATCH] core/state: integrate prefetching into merkle hasher --- core/state/database.go | 2 +- core/state/database_hasher.go | 1 + core/state/database_hasher_binary.go | 2 + core/state/database_hasher_merkle.go | 397 +++++++++++----- core/state/database_hasher_merkle_test.go | 541 ++++++++++++++++++++++ core/state/trie_prefetcher.go | 504 ++++++-------------- core/state/trie_prefetcher_test.go | 105 ++--- 7 files changed, 1001 insertions(+), 551 deletions(-) create mode 100644 core/state/database_hasher_merkle_test.go diff --git a/core/state/database.go b/core/state/database.go index 3ea8a68745..dfdae2fa06 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -224,7 +224,7 @@ func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) { // Hasher implements Database, returning a hasher associated with the specified // state root. func (db *CachingDB) Hasher(stateRoot common.Hash) (Hasher, error) { - return newMerkleHasher(stateRoot, db.triedb) + return newMerkleHasher(stateRoot, db.triedb, true, true) } // ReadersWithCacheStats creates a pair of state readers that share the same diff --git a/core/state/database_hasher.go b/core/state/database_hasher.go index e07f666f07..9610191b91 100644 --- a/core/state/database_hasher.go +++ b/core/state/database_hasher.go @@ -128,6 +128,7 @@ type Prover interface { // returns an empty state root. type noopHasher struct{} +func (n *noopHasher) Close() {} func (n *noopHasher) UpdateAccount([]common.Address, []AccountMut) error { return nil } func (n *noopHasher) UpdateStorage(common.Address, []common.Hash, []common.Hash) error { return nil diff --git a/core/state/database_hasher_binary.go b/core/state/database_hasher_binary.go index 6d983770e8..3d3d8587a7 100644 --- a/core/state/database_hasher_binary.go +++ b/core/state/database_hasher_binary.go @@ -117,6 +117,8 @@ func (h *binaryHasher) Commit() (common.Hash, *trienode.MergedNodeSet, map[commo return root, nodes, nil, nil } +func (h *binaryHasher) Close() {} + func (h *binaryHasher) Copy() Hasher { return &binaryHasher{ db: h.db, diff --git a/core/state/database_hasher_merkle.go b/core/state/database_hasher_merkle.go index 48d3d48855..28aaa513f3 100644 --- a/core/state/database_hasher_merkle.go +++ b/core/state/database_hasher_merkle.go @@ -29,159 +29,317 @@ import ( "github.com/ethereum/go-ethereum/triedb" ) +// wrapTrie pairs a StateTrie with an optional background prefetcher that +// preloads trie nodes ahead of mutation. +type wrapTrie struct { + *trie.StateTrie + prefetcher *prefetcher +} + +func newWrapTrie(id *trie.ID, db *triedb.Database, prefetch bool, prefetchRead bool) (*wrapTrie, error) { + t, err := trie.NewStateTrie(id, db) + if err != nil { + return nil, err + } + var p *prefetcher + if prefetch { + p = newPrefetcher(t, prefetchRead) + } + return &wrapTrie{StateTrie: t, prefetcher: p}, nil +} + +// term synchronously terminates the prefetcher (no-op if nil or already done). +// After termination the prefetcher reference is nilled so subsequent calls are +// a cheap pointer check. +func (tr *wrapTrie) term() { + if tr.prefetcher == nil { + return + } + tr.prefetcher.terminate() + tr.prefetcher = nil +} + +// The methods below shadow the embedded StateTrie so that any direct trie +// access auto-terminates the prefetcher first. This makes data-race freedom +// structural: callers never need to remember to call term() manually. + +func (tr *wrapTrie) UpdateAccount(address common.Address, acc *types.StateAccount, codeLen int) error { + tr.term() + return tr.StateTrie.UpdateAccount(address, acc, codeLen) +} + +func (tr *wrapTrie) DeleteAccount(address common.Address) error { + tr.term() + return tr.StateTrie.DeleteAccount(address) +} + +func (tr *wrapTrie) UpdateStorage(address common.Address, key, value []byte) error { + tr.term() + return tr.StateTrie.UpdateStorage(address, key, value) +} + +func (tr *wrapTrie) DeleteStorage(address common.Address, key []byte) error { + tr.term() + return tr.StateTrie.DeleteStorage(address, key) +} + +func (tr *wrapTrie) Hash() common.Hash { + tr.term() + return tr.StateTrie.Hash() +} + +func (tr *wrapTrie) Commit(collectLeaf bool) (common.Hash, *trienode.NodeSet) { + tr.term() + return tr.StateTrie.Commit(collectLeaf) +} + +func (tr *wrapTrie) Prove(key []byte, proofDb ethdb.KeyValueWriter) error { + tr.term() + return tr.StateTrie.Prove(key, proofDb) +} + +func (tr *wrapTrie) copy() *wrapTrie { + tr.term() + return &wrapTrie{StateTrie: tr.StateTrie.Copy()} +} + +func (tr *wrapTrie) prefetchAccounts(addresses []common.Address, read bool) { + if tr.prefetcher == nil { + return + } + tr.prefetcher.scheduleAccounts(addresses, read) +} + +func (tr *wrapTrie) prefetchStorage(addr common.Address, keys []common.Hash, read bool) { + if tr.prefetcher == nil { + return + } + tr.prefetcher.scheduleSlots(addr, keys, read) +} + +// rootReader wraps the account trie for loading the storage root. It is +// essential to use an independent trie to prevent potential data races +// with the optional prefetcher. +type rootReader struct { + tr *trie.StateTrie +} + +func newRootReader(root common.Hash, db *triedb.Database) (*rootReader, error) { + t, err := trie.NewStateTrie(trie.StateTrieID(root), db) + if err != nil { + return nil, err + } + return &rootReader{tr: t}, nil +} + +func (r *rootReader) readStorageRoot(address common.Address) (common.Hash, error) { + acct, err := r.tr.GetAccount(address) + if err != nil { + return common.Hash{}, err + } + if acct == nil { + return types.EmptyRootHash, nil + } + return acct.Root, nil +} + +func (r *rootReader) copy() *rootReader { + return &rootReader{tr: r.tr.Copy()} +} + // merkleHasher is a Hasher implementation backed by the traditional two-layer // Merkle Patricia Trie (separate account trie and per-account storage tries). type merkleHasher struct { - db *triedb.Database - root common.Hash + db *triedb.Database + root common.Hash + reader *rootReader + prefetch bool + prefetchRead bool - accountTrie *trie.StateTrie - storageTries map[common.Address]*trie.StateTrie // lazily opened + acctTrie *wrapTrie + storageTries map[common.Address]*wrapTrie - // storageRoots tracks the storage root transition for every mutated - // account. Prev is recorded once (first touch) and Hash is updated - // on each UpdateAccount call. + // deletedTries preserves storage tries of accounts that were deleted + // during the block. Keyed by address; only the first deletion per + // address is recorded (the pre-block incarnation). + deletedTries map[common.Address]*wrapTrie + + // storageRoots tracks the storage root transition for each resolved + // account. Prev is captured on first touch; Hash is updated by + // UpdateStorage or set to EmptyRootHash on deletion. storageRoots map[common.Address]Hashes - lock sync.Mutex // guards storageTries (concurrent updateTrie) + storageLock sync.Mutex // guards storage trie fields } -func newMerkleHasher(root common.Hash, db *triedb.Database) (*merkleHasher, error) { - tr, err := trie.NewStateTrie(trie.StateTrieID(root), db) +func newMerkleHasher(root common.Hash, db *triedb.Database, prefetch bool, prefetchRead bool) (*merkleHasher, error) { + tr, err := newWrapTrie(trie.StateTrieID(root), db, prefetch, prefetchRead) + if err != nil { + return nil, err + } + r, err := newRootReader(root, db) if err != nil { return nil, err } return &merkleHasher{ db: db, root: root, - accountTrie: tr, - storageTries: make(map[common.Address]*trie.StateTrie), + prefetch: prefetch, + prefetchRead: prefetchRead, + reader: r, + acctTrie: tr, + storageTries: make(map[common.Address]*wrapTrie), + deletedTries: make(map[common.Address]*wrapTrie), storageRoots: make(map[common.Address]Hashes), }, nil } -// accountStorageRoot reads the storage root of account from the account trie. -func (h *merkleHasher) accountStorageRoot(addr common.Address) common.Hash { - if acc, _ := h.accountTrie.GetAccount(addr); acc != nil { - return acc.Root +// storageRoot returns the current tracked storage root for addr. On first +// access for a given address the root is read from the account trie and +// recorded as the Prev value for the commit-time transition report. +func (h *merkleHasher) storageRoot(addr common.Address) (common.Hash, error) { + if hashes, ok := h.storageRoots[addr]; ok { + return hashes.Hash, nil } - return types.EmptyRootHash + root, err := h.reader.readStorageRoot(addr) + if err != nil { + return common.Hash{}, err + } + h.storageRoots[addr] = Hashes{Prev: root, Hash: root} + return root, nil } -// recordOrigin records the original (pre-mutation) storage root for addr. -// Only the first call per address has any effect. -func (h *merkleHasher) recordOrigin(addr common.Address) { - if _, ok := h.storageRoots[addr]; !ok { - root := h.accountStorageRoot(addr) - h.storageRoots[addr] = Hashes{ - Prev: root, - Hash: root, - } - } -} +// openStorageTrie returns the cached storage trie for addr, or opens one from +// the database if not already cached. +func (h *merkleHasher) openStorageTrie(address common.Address, prefetch bool) (*wrapTrie, error) { + h.storageLock.Lock() + defer h.storageLock.Unlock() -// openStorageTrie returns the cached storage trie for the given address, -// or opens one from the database if not already cached. -func (h *merkleHasher) openStorageTrie(address common.Address) (*trie.StateTrie, error) { - if st, ok := h.storageTries[address]; ok { - return st, nil + if tr, ok := h.storageTries[address]; ok { + return tr, nil } - // Record the original storage trie root if it has not already been tracked - // when the storage trie is loaded. - h.recordOrigin(address) - - id := trie.StorageTrieID(h.root, crypto.Keccak256Hash(address.Bytes()), h.accountStorageRoot(address)) - st, err := trie.NewStateTrie(id, h.db) + root, err := h.storageRoot(address) if err != nil { return nil, err } - h.storageTries[address] = st - return st, nil + id := trie.StorageTrieID(h.root, crypto.Keccak256Hash(address.Bytes()), root) + tr, err := newWrapTrie(id, h.db, h.prefetch && prefetch, h.prefetchRead) + if err != nil { + return nil, err + } + h.storageTries[address] = tr + return tr, nil } -func (h *merkleHasher) UpdateStorage(address common.Address, keys []common.Hash, values []common.Hash) error { - h.lock.Lock() - st, err := h.openStorageTrie(address) +func (h *merkleHasher) deleteAccount(addr common.Address) error { + // Deletion: capture the original storage root before modifying the trie. + _, err := h.storageRoot(addr) if err != nil { - h.lock.Unlock() return err } - h.lock.Unlock() + h.storageRoots[addr] = Hashes{ + Prev: h.storageRoots[addr].Prev, + Hash: types.EmptyRootHash, + } + // Preserve the first deleted storage trie per address for + // witness collection. + if tr, ok := h.storageTries[addr]; ok && h.deletedTries[addr] == nil { + h.deletedTries[addr] = tr + } + delete(h.storageTries, addr) - for i, key := range keys { - if values[i] == (common.Hash{}) { - if err := st.DeleteStorage(address, key[:]); err != nil { - return err - } + return h.acctTrie.DeleteAccount(addr) +} + +func (h *merkleHasher) updateAccount(addr common.Address, account AccountMut) error { + root, err := h.storageRoot(addr) + if err != nil { + return err + } + data := &types.StateAccount{ + Nonce: account.Account.Nonce, + Balance: account.Account.Balance, + Root: root, + CodeHash: account.Account.CodeHash, + } + return h.acctTrie.UpdateAccount(addr, data, 0) +} + +// UpdateAccount implements Hasher. +func (h *merkleHasher) UpdateAccount(addresses []common.Address, accounts []AccountMut) error { + for i, addr := range addresses { + var err error + if accounts[i].Account == nil { + err = h.deleteAccount(addr) } else { - if err := st.UpdateStorage(address, key[:], common.TrimLeftZeroes(values[i][:])); err != nil { - return err - } + err = h.updateAccount(addr, accounts[i]) + } + if err != nil { + return err } } return nil } -func (h *merkleHasher) UpdateAccount(addresses []common.Address, accounts []AccountMut) error { - for i, addr := range addresses { - h.recordOrigin(addr) - acct := accounts[i] - - // Deletion: remove from account trie and evict any cached - // storage trie so a re-created account starts fresh. - if acct.Account == nil { - if err := h.accountTrie.DeleteAccount(addr); err != nil { +// UpdateStorage implements Hasher. +func (h *merkleHasher) UpdateStorage(address common.Address, keys []common.Hash, values []common.Hash) error { + tr, err := h.openStorageTrie(address, false) + if err != nil { + return err + } + for i, key := range keys { + if values[i] == (common.Hash{}) { + if err := tr.DeleteStorage(address, key[:]); err != nil { return err } - delete(h.storageTries, addr) - - h.storageRoots[addr] = Hashes{ - Prev: h.storageRoots[addr].Prev, - Hash: types.EmptyRootHash, + } else { + if err := tr.UpdateStorage(address, key[:], common.TrimLeftZeroes(values[i][:])); err != nil { + return err } - continue - } - // Determine storage root from the cached trie (if storage was - // modified) or from the account trie (unchanged storage). - storageRoot := h.accountStorageRoot(addr) - if st, ok := h.storageTries[addr]; ok { - storageRoot = st.Hash() - } - sa := &types.StateAccount{ - Nonce: acct.Account.Nonce, - Balance: acct.Account.Balance, - Root: storageRoot, - CodeHash: acct.Account.CodeHash, - } - if err := h.accountTrie.UpdateAccount(addr, sa, 0); err != nil { - return err - } - h.storageRoots[addr] = Hashes{ - Prev: h.storageRoots[addr].Prev, - Hash: storageRoot, } } + // Hash outside the lock to allow full parallelism across accounts. + hash := tr.Hash() + + h.storageLock.Lock() + h.storageRoots[address] = Hashes{ + Prev: h.storageRoots[address].Prev, + Hash: hash, + } + h.storageLock.Unlock() return nil } func (h *merkleHasher) Hash() common.Hash { - return h.accountTrie.Hash() + return h.acctTrie.Hash() +} + +// Close terminates all prefetcher goroutines. Safe to call multiple times. +func (h *merkleHasher) Close() { + h.acctTrie.term() + for _, tr := range h.storageTries { + tr.term() + } + for _, tr := range h.deletedTries { + tr.term() + } } func (h *merkleHasher) Commit() (common.Hash, *trienode.MergedNodeSet, map[common.Address]Hashes, error) { - nodes := trienode.NewMergedNodeSet() + // Explicitly terminate all resolved tries. Some of them may not be + // terminated due to read-only prefetching. This is essential to + // prevent goroutine leaks. + h.Close() - // Commit all dirty storage tries. - for _, st := range h.storageTries { - if _, set := st.Commit(false); set != nil { + nodes := trienode.NewMergedNodeSet() + for _, tr := range h.storageTries { + if _, set := tr.Commit(false); set != nil { if err := nodes.Merge(set); err != nil { return common.Hash{}, nil, nil, err } } } - // Commit the account trie. collectLeaf must be true so that hashdb - // can link account trie leaves to their storage trie roots. - root, set := h.accountTrie.Commit(true) + root, set := h.acctTrie.Commit(true) if set != nil { if err := nodes.Merge(set); err != nil { return common.Hash{}, nil, nil, err @@ -194,28 +352,53 @@ func (h *merkleHasher) Copy() Hasher { cpy := &merkleHasher{ db: h.db, root: h.root, - accountTrie: h.accountTrie.Copy(), - storageTries: make(map[common.Address]*trie.StateTrie, len(h.storageTries)), + reader: h.reader.copy(), + prefetch: false, + acctTrie: h.acctTrie.copy(), + storageTries: make(map[common.Address]*wrapTrie, len(h.storageTries)), + deletedTries: make(map[common.Address]*wrapTrie, len(h.deletedTries)), storageRoots: maps.Clone(h.storageRoots), } - for addr, st := range h.storageTries { - cpy.storageTries[addr] = st.Copy() + for addr, tr := range h.storageTries { + cpy.storageTries[addr] = tr.copy() + } + for addr, tr := range h.deletedTries { + cpy.deletedTries[addr] = tr.copy() } return cpy } -// ProveAccount implements Prover by constructing a Merkle proof for the -// given account against the current account trie. +// ProveAccount implements Prover. func (h *merkleHasher) ProveAccount(addr common.Address, proofDb ethdb.KeyValueWriter) error { - return h.accountTrie.Prove(crypto.Keccak256(addr.Bytes()), proofDb) + return h.acctTrie.Prove(crypto.Keccak256(addr.Bytes()), proofDb) } -// ProveStorage implements Prover by constructing a Merkle proof for the given -// storage slot. The storage trie is opened lazily if not already cached. +// ProveStorage implements Prover. func (h *merkleHasher) ProveStorage(addr common.Address, key common.Hash, proofDb ethdb.KeyValueWriter) error { - st, err := h.openStorageTrie(addr) + tr, err := h.openStorageTrie(addr, false) if err != nil { return err } - return st.Prove(crypto.Keccak256(key.Bytes()), proofDb) + return tr.Prove(crypto.Keccak256(key.Bytes()), proofDb) +} + +// PrefetchAccount implements Prefetcher, preloading the nodes of specific accounts. +func (h *merkleHasher) PrefetchAccount(addresses []common.Address, read bool) { + if !h.prefetch { + return + } + h.acctTrie.prefetchAccounts(addresses, read) +} + +// PrefetchStorage implements Prefetcher. The storage trie is opened eagerly +// so the prefetcher can begin loading nodes in the background. +func (h *merkleHasher) PrefetchStorage(addr common.Address, keys []common.Hash, read bool) { + if !h.prefetch { + return + } + tr, err := h.openStorageTrie(addr, true) + if err != nil { + return + } + tr.prefetchStorage(addr, keys, read) } diff --git a/core/state/database_hasher_merkle_test.go b/core/state/database_hasher_merkle_test.go new file mode 100644 index 0000000000..559420e42c --- /dev/null +++ b/core/state/database_hasher_merkle_test.go @@ -0,0 +1,541 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package state + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/triedb" + "github.com/holiman/uint256" +) + +var ( + hasherAddr1 = common.HexToAddress("0x1111111111111111111111111111111111111111") + hasherAddr2 = common.HexToAddress("0x2222222222222222222222222222222222222222") + hasherAddr3 = common.HexToAddress("0x3333333333333333333333333333333333333333") + + hasherSlot1 = common.HexToHash("0x01") + hasherSlot2 = common.HexToHash("0x02") + hasherSlot3 = common.HexToHash("0x03") + + hasherVal1 = common.HexToHash("0xaa") + hasherVal2 = common.HexToHash("0xbb") + hasherVal3 = common.HexToHash("0xcc") +) + +// hasherTestConfig captures the prefetch flags varied across subtests. +type hasherTestConfig struct { + name string + prefetch bool + prefetchRead bool +} + +// hasherTestConfigs enumerates the interesting (prefetch, prefetchRead) combinations: +// - no prefetch at all +// - prefetch writes only (read prefetch requests are dropped) +// - prefetch reads and writes +var hasherTestConfigs = []hasherTestConfig{ + {"noPrefetch", false, false}, + {"prefetchWriteOnly", true, false}, + {"prefetchAll", true, true}, +} + +func hasherAccount(nonce uint64, balance uint64) AccountMut { + return AccountMut{ + Account: &Account{ + Nonce: nonce, + Balance: uint256.NewInt(balance), + CodeHash: types.EmptyCodeHash.Bytes(), + }, + } +} + +func hasherDeleteAccount() AccountMut { + return AccountMut{Account: nil} +} + +// newTestHasher creates a merkleHasher backed by an in-memory database. +func newTestHasher(t *testing.T, db *triedb.Database, root common.Hash, cfg hasherTestConfig) *merkleHasher { + t.Helper() + + h, err := newMerkleHasher(root, db, cfg.prefetch, cfg.prefetchRead) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { h.Close() }) + return h +} + +// commitAndReopen commits the hasher's state and reopens a fresh hasher from +// the committed root. This simulates a block boundary. +func commitAndReopen(t *testing.T, h *merkleHasher, cfg hasherTestConfig) *merkleHasher { + t.Helper() + + root, nodes, _, err := h.Commit() + if err != nil { + t.Fatal(err) + } + if nodes != nil { + if err := h.db.Update(root, h.root, 0, nodes, nil); err != nil { + t.Fatal(err) + } + if err := h.db.Commit(root, false); err != nil { + t.Fatal(err) + } + } + h2, err := newMerkleHasher(root, h.db, cfg.prefetch, cfg.prefetchRead) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { h2.Close() }) + return h2 +} + +// makeBaseState creates a non-empty state as the starting point for tests. +// The base contains: +// - addr1: nonce=1, balance=100, storage={slot1: val1, slot2: val2} +// - addr2: nonce=2, balance=200, no storage +// +// The state is committed and flushed so the hasher returned opens from disk, +// exercising rootReader and existing-trie code paths. +func makeBaseState(t *testing.T, cfg hasherTestConfig) *merkleHasher { + t.Helper() + + noPrefetch := hasherTestConfig{"base", false, false} + db := triedb.NewDatabase(rawdb.NewMemoryDatabase(), nil) + h := newTestHasher(t, db, types.EmptyRootHash, noPrefetch) + + if err := h.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot1, hasherSlot2}, []common.Hash{hasherVal1, hasherVal2}); err != nil { + t.Fatal(err) + } + if err := h.UpdateAccount( + []common.Address{hasherAddr1, hasherAddr2}, + []AccountMut{hasherAccount(1, 100), hasherAccount(2, 200)}, + ); err != nil { + t.Fatal(err) + } + return commitAndReopen(t, h, cfg) +} + +// TestMerkleHasherBasic verifies that mutating storage and accounts on top of +// a non-empty base state produces a deterministic, non-empty root and that the +// root survives a commit+reopen cycle. +func TestMerkleHasherBasic(t *testing.T) { + for _, cfg := range hasherTestConfigs { + t.Run(cfg.name, func(t *testing.T) { + h := makeBaseState(t, cfg) + + if cfg.prefetch { + h.PrefetchStorage(hasherAddr1, []common.Hash{hasherSlot3}, false) + h.PrefetchAccount([]common.Address{hasherAddr1, hasherAddr3}, false) + } + // Add slot3 to addr1 and create addr3. + if err := h.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3}, []common.Hash{hasherVal3}); err != nil { + t.Fatal(err) + } + if err := h.UpdateAccount( + []common.Address{hasherAddr1, hasherAddr3}, + []AccountMut{hasherAccount(1, 100), hasherAccount(3, 300)}, + ); err != nil { + t.Fatal(err) + } + root := h.Hash() + if root == types.EmptyRootHash { + t.Fatal("expected non-empty root after mutations") + } + h2 := commitAndReopen(t, h, cfg) + if h2.Hash() != root { + t.Fatalf("root mismatch after reopen: got %x, want %x", h2.Hash(), root) + } + }) + } +} + +// TestMerkleHasherPrefetchReadOnly verifies that read-only prefetching (for +// accounts and storage that are never subsequently mutated) does not corrupt +// state and does not leak goroutines. Both prefetchRead=true (requests are +// processed) and prefetchRead=false (requests are dropped) are tested. +func TestMerkleHasherPrefetchReadOnly(t *testing.T) { + for _, prefetchRead := range []bool{false, true} { + name := "readDropped" + if prefetchRead { + name = "readProcessed" + } + t.Run(name, func(t *testing.T) { + cfg := hasherTestConfig{name, true, prefetchRead} + h := makeBaseState(t, cfg) + rootBefore := h.Hash() + + // Prefetch addr1's account and storage (read-only). Whether + // these are actually processed depends on prefetchRead. + h.PrefetchAccount([]common.Address{hasherAddr1, hasherAddr2}, true) + h.PrefetchStorage(hasherAddr1, []common.Hash{hasherSlot1, hasherSlot2}, true) + + // Only mutate addr2 (no storage) — addr1's prefetched tries + // are never accessed through a shadow method. + if err := h.UpdateAccount( + []common.Address{hasherAddr2}, + []AccountMut{hasherAccount(2, 300)}, + ); err != nil { + t.Fatal(err) + } + root := h.Hash() + if root == rootBefore { + t.Fatal("expected root to change after balance update") + } + h2 := commitAndReopen(t, h, hasherTestConfig{"verify", false, false}) + if h2.Hash() != root { + t.Fatalf("root mismatch: got %x, want %x", h2.Hash(), root) + } + }) + } +} + +// TestMerkleHasherDeleteAccount verifies that deleting an account with storage +// produces an empty storage root in the commit result, with Prev reflecting +// the original non-empty root. +func TestMerkleHasherDeleteAccount(t *testing.T) { + for _, cfg := range hasherTestConfigs { + t.Run(cfg.name, func(t *testing.T) { + h := makeBaseState(t, cfg) + + if cfg.prefetch { + h.PrefetchAccount([]common.Address{hasherAddr1}, false) + h.PrefetchStorage(hasherAddr1, []common.Hash{hasherSlot1, hasherSlot2}, false) + } + // Delete addr1 (which has storage slots 1,2). + if err := h.UpdateAccount( + []common.Address{hasherAddr1}, + []AccountMut{hasherDeleteAccount()}, + ); err != nil { + t.Fatal(err) + } + _, _, storageRoots, err := h.Commit() + if err != nil { + t.Fatal(err) + } + sr, ok := storageRoots[hasherAddr1] + if !ok { + t.Fatal("deleted account missing from storageRoots") + } + if sr.Hash != types.EmptyRootHash { + t.Fatalf("deleted account storage root: got %x, want EmptyRootHash", sr.Hash) + } + if sr.Prev == types.EmptyRootHash { + t.Fatal("deleted account Prev should be non-empty (had storage)") + } + }) + } +} + +// TestMerkleHasherDeleteRecreate verifies that deleting an account and +// recreating it with different storage in the same block produces a correct +// root that survives a commit+reopen cycle. The storageRoots report must show +// the original Prev and a new Hash. +func TestMerkleHasherDeleteRecreate(t *testing.T) { + for _, cfg := range hasherTestConfigs { + t.Run(cfg.name, func(t *testing.T) { + h := makeBaseState(t, cfg) + + if cfg.prefetch { + h.PrefetchAccount([]common.Address{hasherAddr1}, false) + h.PrefetchStorage(hasherAddr1, []common.Hash{hasherSlot1, hasherSlot2}, false) + } + // Delete addr1. + if err := h.UpdateAccount([]common.Address{hasherAddr1}, []AccountMut{hasherDeleteAccount()}); err != nil { + t.Fatal(err) + } + // Recreate with slot3 only. + if err := h.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3}, []common.Hash{hasherVal3}); err != nil { + t.Fatal(err) + } + if err := h.UpdateAccount([]common.Address{hasherAddr1}, []AccountMut{hasherAccount(10, 500)}); err != nil { + t.Fatal(err) + } + root := h.Hash() + if root == types.EmptyRootHash { + t.Fatal("expected non-empty root after recreate") + } + h2 := commitAndReopen(t, h, hasherTestConfig{"verify", false, false}) + + sr := h.storageRoots[hasherAddr1] + if sr.Hash == types.EmptyRootHash { + t.Fatal("recreated account should have non-empty storage root") + } + if sr.Prev == types.EmptyRootHash { + t.Fatal("Prev should reflect the pre-deletion storage root") + } + if sr.Hash == sr.Prev { + t.Fatal("Hash and Prev should differ after delete+recreate with different slots") + } + if h2.Hash() != root { + t.Fatalf("root mismatch after reopen: got %x, want %x", h2.Hash(), root) + } + }) + } +} + +// TestMerkleHasherPrefetchDeterminism verifies that the resulting root is +// identical across all prefetch configurations for the same set of mutations. +func TestMerkleHasherPrefetchDeterminism(t *testing.T) { + var roots []common.Hash + for _, cfg := range hasherTestConfigs { + h := makeBaseState(t, cfg) + + if cfg.prefetch { + h.PrefetchAccount([]common.Address{hasherAddr1, hasherAddr3}, false) + h.PrefetchStorage(hasherAddr1, []common.Hash{hasherSlot3}, false) + h.PrefetchStorage(hasherAddr3, []common.Hash{hasherSlot1}, false) + } + // Add slot3 to addr1, create addr3 with slot1. + if err := h.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3}, []common.Hash{hasherVal3}); err != nil { + t.Fatal(err) + } + if err := h.UpdateStorage(hasherAddr3, []common.Hash{hasherSlot1}, []common.Hash{hasherVal1}); err != nil { + t.Fatal(err) + } + if err := h.UpdateAccount( + []common.Address{hasherAddr1, hasherAddr3}, + []AccountMut{hasherAccount(1, 100), hasherAccount(3, 300)}, + ); err != nil { + t.Fatal(err) + } + roots = append(roots, h.Hash()) + } + for i := 1; i < len(roots); i++ { + if roots[i] != roots[0] { + t.Fatalf("root diverged: config[0]=%x config[%d]=%x", roots[0], i, roots[i]) + } + } +} + +// TestMerkleHasherCommitStorageRoots exhaustively checks the Prev/Hash pairs +// returned by Commit for every interesting mutation pattern: +// +// (1) delete account with non-empty storage +// (2) delete account with empty storage +// (3) delete + recreate with new non-empty storage +// (4) delete + recreate without storage (empty→empty after recreate) +// (5) delete + recreate: originally empty storage, recreated with storage +// (6) mutate account only, no storage (empty storage throughout) +// (7) mutate account only, non-empty storage unchanged +// (8) mutate account with modified storage +func TestMerkleHasherCommitStorageRoots(t *testing.T) { + var ( + // Addresses for each case — distinct so they don't interfere. + addrDeleteNonEmpty = common.HexToAddress("0xaa01") // (1) + addrDeleteEmpty = common.HexToAddress("0xaa02") // (2) + addrRecreateStorage = common.HexToAddress("0xaa03") // (3) + addrRecreateNoStore = common.HexToAddress("0xaa04") // (4) + addrRecreateFromNone = common.HexToAddress("0xaa05") // (5) + addrMutateNoStorage = common.HexToAddress("0xaa06") // (6) + addrMutateKeepStore = common.HexToAddress("0xaa07") // (7) + addrMutateModStore = common.HexToAddress("0xaa08") // (8) + ) + for _, cfg := range hasherTestConfigs { + t.Run(cfg.name, func(t *testing.T) { + // ---------- base state (committed to disk) ---------- + noPrefetch := hasherTestConfig{"base", false, false} + db := triedb.NewDatabase(rawdb.NewMemoryDatabase(), nil) + base := newTestHasher(t, db, types.EmptyRootHash, noPrefetch) + + // Accounts with storage. + for _, addr := range []common.Address{addrDeleteNonEmpty, addrRecreateStorage, addrRecreateNoStore, addrMutateKeepStore, addrMutateModStore} { + if err := base.UpdateStorage(addr, []common.Hash{hasherSlot1}, []common.Hash{hasherVal1}); err != nil { + t.Fatal(err) + } + } + // All accounts (some with storage above, some without). + allAddrs := []common.Address{ + addrDeleteNonEmpty, addrDeleteEmpty, + addrRecreateStorage, addrRecreateNoStore, addrRecreateFromNone, + addrMutateNoStorage, addrMutateKeepStore, addrMutateModStore, + } + allAccounts := make([]AccountMut, len(allAddrs)) + for i := range allAccounts { + allAccounts[i] = hasherAccount(1, 100) + } + if err := base.UpdateAccount(allAddrs, allAccounts); err != nil { + t.Fatal(err) + } + h := commitAndReopen(t, base, cfg) + + // ---------- block mutations ---------- + + // (1) Delete account with non-empty storage. + // (2) Delete account with empty storage. + if err := h.UpdateAccount( + []common.Address{addrDeleteNonEmpty, addrDeleteEmpty}, + []AccountMut{hasherDeleteAccount(), hasherDeleteAccount()}, + ); err != nil { + t.Fatal(err) + } + // (3) Delete + recreate with new storage. + if err := h.UpdateAccount([]common.Address{addrRecreateStorage}, []AccountMut{hasherDeleteAccount()}); err != nil { + t.Fatal(err) + } + if err := h.UpdateStorage(addrRecreateStorage, []common.Hash{hasherSlot2}, []common.Hash{hasherVal2}); err != nil { + t.Fatal(err) + } + if err := h.UpdateAccount([]common.Address{addrRecreateStorage}, []AccountMut{hasherAccount(2, 200)}); err != nil { + t.Fatal(err) + } + // (4) Delete + recreate without storage (had storage before). + if err := h.UpdateAccount([]common.Address{addrRecreateNoStore}, []AccountMut{hasherDeleteAccount()}); err != nil { + t.Fatal(err) + } + if err := h.UpdateAccount([]common.Address{addrRecreateNoStore}, []AccountMut{hasherAccount(2, 200)}); err != nil { + t.Fatal(err) + } + // (5) Delete + recreate: originally no storage, recreated with storage. + if err := h.UpdateAccount([]common.Address{addrRecreateFromNone}, []AccountMut{hasherDeleteAccount()}); err != nil { + t.Fatal(err) + } + if err := h.UpdateStorage(addrRecreateFromNone, []common.Hash{hasherSlot1}, []common.Hash{hasherVal3}); err != nil { + t.Fatal(err) + } + if err := h.UpdateAccount([]common.Address{addrRecreateFromNone}, []AccountMut{hasherAccount(2, 200)}); err != nil { + t.Fatal(err) + } + // (6) Mutate account only, no storage. + if err := h.UpdateAccount([]common.Address{addrMutateNoStorage}, []AccountMut{hasherAccount(2, 999)}); err != nil { + t.Fatal(err) + } + // (7) Mutate account, non-empty storage unchanged. + if err := h.UpdateAccount([]common.Address{addrMutateKeepStore}, []AccountMut{hasherAccount(2, 888)}); err != nil { + t.Fatal(err) + } + // (8) Mutate account with modified storage. + if err := h.UpdateStorage(addrMutateModStore, []common.Hash{hasherSlot1}, []common.Hash{hasherVal2}); err != nil { + t.Fatal(err) + } + if err := h.UpdateAccount([]common.Address{addrMutateModStore}, []AccountMut{hasherAccount(2, 777)}); err != nil { + t.Fatal(err) + } + _, _, roots, err := h.Commit() + if err != nil { + t.Fatal(err) + } + empty := types.EmptyRootHash + + // (1) Deleted, had storage: Prev=non-empty, Hash=empty. + sr := roots[addrDeleteNonEmpty] + if sr.Prev == empty { + t.Fatal("(1) Prev should be non-empty for deleted account that had storage") + } + if sr.Hash != empty { + t.Fatal("(1) Hash should be EmptyRootHash after deletion") + } + // (2) Deleted, had no storage: Prev=empty, Hash=empty. + sr = roots[addrDeleteEmpty] + if sr.Prev != empty || sr.Hash != empty { + t.Fatalf("(2) expected both EmptyRootHash, got Prev=%x Hash=%x", sr.Prev, sr.Hash) + } + // (3) Delete+recreate with new storage: Prev=non-empty(original), Hash=non-empty(new), differ. + sr = roots[addrRecreateStorage] + if sr.Prev == empty { + t.Fatal("(3) Prev should be non-empty (had storage before deletion)") + } + if sr.Hash == empty { + t.Fatal("(3) Hash should be non-empty (recreated with storage)") + } + if sr.Hash == sr.Prev { + t.Fatal("(3) Hash and Prev should differ (different storage contents)") + } + // (4) Delete+recreate without storage (originally had storage): Prev=non-empty, Hash=empty. + sr = roots[addrRecreateNoStore] + if sr.Prev == empty { + t.Fatal("(4) Prev should be non-empty (had storage before deletion)") + } + if sr.Hash != empty { + t.Fatal("(4) Hash should be EmptyRootHash (recreated without storage)") + } + // (5) Delete+recreate: originally no storage, recreated with storage: Prev=empty, Hash=non-empty. + sr = roots[addrRecreateFromNone] + if sr.Prev != empty { + t.Fatal("(5) Prev should be EmptyRootHash (no storage before deletion)") + } + if sr.Hash == empty { + t.Fatal("(5) Hash should be non-empty (recreated with storage)") + } + // (6) Mutate account only, no storage: Prev=empty, Hash=empty. + sr = roots[addrMutateNoStorage] + if sr.Prev != empty || sr.Hash != empty { + t.Fatalf("(6) expected both EmptyRootHash, got Prev=%x Hash=%x", sr.Prev, sr.Hash) + } + // (7) Mutate account, storage unchanged: Prev=non-empty, Hash=non-empty, Prev==Hash. + sr = roots[addrMutateKeepStore] + if sr.Prev == empty { + t.Fatal("(7) Prev should be non-empty (has storage)") + } + if sr.Hash == empty { + t.Fatal("(7) Hash should be non-empty (storage unchanged)") + } + if sr.Prev != sr.Hash { + t.Fatal("(7) Prev and Hash should be equal (storage was not modified)") + } + // (8) Mutate account with modified storage: Prev=non-empty, Hash=non-empty, differ. + sr = roots[addrMutateModStore] + if sr.Prev == empty { + t.Fatal("(8) Prev should be non-empty (had storage)") + } + if sr.Hash == empty { + t.Fatal("(8) Hash should be non-empty (storage modified, not cleared)") + } + if sr.Prev == sr.Hash { + t.Fatal("(8) Prev and Hash should differ (storage was modified)") + } + }) + } +} + +// TestMerkleHasherCopy verifies that Copy produces an independent snapshot: +// mutations on the copy must not affect the original's hash. +func TestMerkleHasherCopy(t *testing.T) { + cfg := hasherTestConfig{"prefetchAll", true, true} + h := makeBaseState(t, cfg) + + h.PrefetchAccount([]common.Address{hasherAddr1}, false) + h.PrefetchStorage(hasherAddr1, []common.Hash{hasherSlot3}, false) + if err := h.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3}, []common.Hash{hasherVal3}); err != nil { + t.Fatal(err) + } + if err := h.UpdateAccount([]common.Address{hasherAddr1}, []AccountMut{hasherAccount(1, 100)}); err != nil { + t.Fatal(err) + } + origRoot := h.Hash() + + cpy := h.Copy() + defer cpy.(*merkleHasher).Close() + + // Mutate the copy: delete slot3, add slot2 with new value. + if err := cpy.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3, hasherSlot2}, []common.Hash{{}, hasherVal3}); err != nil { + t.Fatal(err) + } + if err := cpy.UpdateAccount([]common.Address{hasherAddr1}, []AccountMut{hasherAccount(1, 100)}); err != nil { + t.Fatal(err) + } + if cpy.Hash() == origRoot { + t.Fatal("copy should diverge after mutation") + } + if h.Hash() != origRoot { + t.Fatal("original root changed after mutating copy") + } +} diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index cb64e0fb77..69ebf599f2 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -22,471 +22,225 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/trie" ) -//lint:file-ignore U1000 this file intentionally keeps unused helpers for future use +var errTerminated = errors.New("fetcher is already terminated") -var ( - // triePrefetchMetricsPrefix is the prefix under which to publish the metrics. - triePrefetchMetricsPrefix = "trie/prefetch/" +type slotKey struct { + addr common.Address + slot common.Hash +} - // errTerminated is returned if a fetcher is attempted to be operated after it - // has already terminated. - errTerminated = errors.New("fetcher is already terminated") +type taskKind uint8 + +const ( + kindAccount taskKind = iota + kindStorage ) -type trieOpener func(id trie.ID, addr common.Address) (Trie, error) // Define the handler to open the trie for pulling +type prefetchTask struct { + read bool + kind taskKind -// triePrefetcher is an active prefetcher, which receives accounts or storage -// items and does trie-loading of them. The goal is to get as much useful content -// into the caches as possible. -// -// Note, the prefetcher's API is not thread safe. -type triePrefetcher struct { - root common.Hash // Root hash of the account trie for metrics - fetchers map[trie.ID]*subfetcher // Subfetchers for each trie - term chan struct{} // Channel to signal interruption - noreads bool // Whether to ignore state-read-only prefetch requests - opener trieOpener // Handler to open the trie for pulling - - deliveryMissMeter *metrics.Meter - - accountLoadReadMeter *metrics.Meter - accountLoadWriteMeter *metrics.Meter - accountDupReadMeter *metrics.Meter - accountDupWriteMeter *metrics.Meter - accountDupCrossMeter *metrics.Meter - accountWasteMeter *metrics.Meter - - storageLoadReadMeter *metrics.Meter - storageLoadWriteMeter *metrics.Meter - storageDupReadMeter *metrics.Meter - storageDupWriteMeter *metrics.Meter - storageDupCrossMeter *metrics.Meter - storageWasteMeter *metrics.Meter + accounts []common.Address // kindAccount: addresses to prefetch + account common.Address // kindStorage: owner address + slots []common.Hash // kindStorage: slot keys to prefetch } -func newTriePrefetcher(opener trieOpener, root common.Hash, namespace string, noreads bool) *triePrefetcher { - prefix := triePrefetchMetricsPrefix + namespace - return &triePrefetcher{ - root: root, - fetchers: make(map[trie.ID]*subfetcher), // Active prefetchers use the fetchers map - term: make(chan struct{}), - noreads: noreads, - opener: opener, - - deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil), - - accountLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/read", nil), - accountLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/write", nil), - accountDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/read", nil), - accountDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/write", nil), - accountDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/cross", nil), - accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil), - - storageLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/read", nil), - storageLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/write", nil), - storageDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/read", nil), - storageDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/write", nil), - storageDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/cross", nil), - storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), - } -} - -// terminate iterates over all the subfetchers and issues a termination request -// to all of them. Depending on the async parameter, the method will either block -// until all subfetchers spin down, or return immediately. -func (p *triePrefetcher) terminate(async bool) { - // Short circuit if the fetcher is already closed - select { - case <-p.term: - return - default: - } - // Terminate all sub-fetchers, sync or async, depending on the request - for _, fetcher := range p.fetchers { - fetcher.terminate(async) - } - close(p.term) -} - -// report aggregates the pre-fetching and usage metrics and reports them. -// nolint:unused -func (p *triePrefetcher) report() { - if !metrics.Enabled() { - return - } - for _, fetcher := range p.fetchers { - fetcher.wait() // ensure the fetcher's idle before poking in its internals - - if fetcher.id.Owner == (common.Hash{}) { - p.accountLoadReadMeter.Mark(int64(len(fetcher.seenReadAddr))) - p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWriteAddr))) - - p.accountDupReadMeter.Mark(int64(fetcher.dupsRead)) - p.accountDupWriteMeter.Mark(int64(fetcher.dupsWrite)) - p.accountDupCrossMeter.Mark(int64(fetcher.dupsCross)) - - for _, key := range fetcher.usedAddr { - delete(fetcher.seenReadAddr, key) - delete(fetcher.seenWriteAddr, key) - } - p.accountWasteMeter.Mark(int64(len(fetcher.seenReadAddr) + len(fetcher.seenWriteAddr))) - } else { - p.storageLoadReadMeter.Mark(int64(len(fetcher.seenReadSlot))) - p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWriteSlot))) - - p.storageDupReadMeter.Mark(int64(fetcher.dupsRead)) - p.storageDupWriteMeter.Mark(int64(fetcher.dupsWrite)) - p.storageDupCrossMeter.Mark(int64(fetcher.dupsCross)) - - for _, key := range fetcher.usedSlot { - delete(fetcher.seenReadSlot, key) - delete(fetcher.seenWriteSlot, key) - } - p.storageWasteMeter.Mark(int64(len(fetcher.seenReadSlot) + len(fetcher.seenWriteSlot))) - } - } -} - -// prefetchAccounts schedules a batch of accounts to prefetch. -func (p *triePrefetcher) prefetchAccounts(id trie.ID, addrs []common.Address, read bool) error { - // If the state item is only being read, but reads are disabled, return - if read && p.noreads { - return nil - } - // Ensure the subfetcher is still alive - select { - case <-p.term: - return errTerminated - default: - } - fetcher := p.fetchers[id] - if fetcher == nil { - fetcher = newSubfetcher(p.opener, id, common.Address{}) - p.fetchers[id] = fetcher - } - return fetcher.scheduleAccounts(addrs, read) -} - -// prefetchStorage schedules a batch of storage slots to prefetch. -func (p *triePrefetcher) prefetchStorage(id trie.ID, addr common.Address, slots []common.Hash, read bool) error { - // If the state item is only being read, but reads are disabled, return - if read && p.noreads { - return nil - } - // Ensure the subfetcher is still alive - select { - case <-p.term: - return errTerminated - default: - } - fetcher := p.fetchers[id] - if fetcher == nil { - fetcher = newSubfetcher(p.opener, id, addr) - p.fetchers[id] = fetcher - } - return fetcher.scheduleSlots(addr, slots, read) -} - -// trie returns the trie matching the root hash, blocking until the fetcher of -// the given trie terminates. If no fetcher exists for the request, nil will be -// returned. -func (p *triePrefetcher) trie(id trie.ID) Trie { - // Bail if no trie was prefetched for this root - fetcher := p.fetchers[id] - if fetcher == nil { - log.Error("Prefetcher missed to load trie", "owner", id.Owner, "root", id.Root) - p.deliveryMissMeter.Mark(1) - return nil - } - // Subfetcher exists, retrieve its trie - return fetcher.peek() -} - -// used marks a batch of state items used to allow creating statistics as to -// how useful or wasteful the fetcher is. -// nolint:unused -func (p *triePrefetcher) used(id trie.ID, usedAddr []common.Address, usedSlot []common.Hash) { - if fetcher := p.fetchers[id]; fetcher != nil { - fetcher.wait() // ensure the fetcher's idle before poking in its internals - - fetcher.usedAddr = append(fetcher.usedAddr, usedAddr...) - fetcher.usedSlot = append(fetcher.usedSlot, usedSlot...) - } -} - -// subfetcher is a trie fetcher goroutine responsible for pulling entries for a -// single trie. It is spawned when a new root is encountered and lives until the -// main prefetcher is paused and either all requested items are processed or if -// the trie being worked on is retrieved from the prefetcher. -type subfetcher struct { - id trie.ID // The identifier of the trie being populated - addr common.Address // Address of the account that the trie belongs to - trie Trie // Trie being populated with nodes - opener trieOpener // Handler to open the trie for pulling - - tasks []*subfetcherTask // Items queued up for retrieval - lock sync.Mutex // Lock protecting the task queue +// prefetcher is a background goroutine that preloads trie nodes for a single +// trie. It deduplicates requests and stops when explicitly terminated. +type prefetcher struct { + prefetchRead bool // Whether the state read will trigger preloading + trie Trie // Trie being populated with nodes + tasks []*prefetchTask // Items queued up for retrieval + lock sync.Mutex // Lock protecting the task queue wake chan struct{} // Wake channel if a new task is scheduled stop chan struct{} // Channel to interrupt processing term chan struct{} // Channel to signal interruption - seenReadAddr map[common.Address]struct{} // Tracks the accounts already loaded via read operations - seenWriteAddr map[common.Address]struct{} // Tracks the accounts already loaded via write operations - seenReadSlot map[common.Hash]struct{} // Tracks the storage already loaded via read operations - seenWriteSlot map[common.Hash]struct{} // Tracks the storage already loaded via write operations - - dupsRead int // Number of duplicate preload tasks via reads only - dupsWrite int // Number of duplicate preload tasks via writes only - dupsCross int // Number of duplicate preload tasks via read-write-crosses - - usedAddr []common.Address // Tracks the accounts used in the end - usedSlot []common.Hash // Tracks the storage used in the end + seenReadAddr map[common.Address]struct{} // Dedup: accounts loaded via reads + seenWriteAddr map[common.Address]struct{} // Dedup: accounts loaded via writes + seenReadSlot map[slotKey]struct{} // Dedup: slots loaded via reads + seenWriteSlot map[slotKey]struct{} // Dedup: slots loaded via writes } -type subfetcherTaskKind uint8 - -const ( - kindAccount subfetcherTaskKind = iota - kindStorage -) - -type subfetcherTask struct { - read bool - kind subfetcherTaskKind - - // The list of accounts being pulling in kindAccount type - accounts []common.Address - - // The list of storage keys being pulling in kindStorage type - account common.Address - slots []common.Hash -} - -// newSubfetcher creates a goroutine to prefetch state items belonging to a -// particular root hash. -func newSubfetcher(opener trieOpener, id trie.ID, addr common.Address) *subfetcher { - sf := &subfetcher{ - id: id, - addr: addr, - opener: opener, +// newPrefetcher creates a background goroutine to prefetch state items from the +// given trie. +func newPrefetcher(tr Trie, prefetchRead bool) *prefetcher { + p := &prefetcher{ + prefetchRead: prefetchRead, + trie: tr, wake: make(chan struct{}, 1), stop: make(chan struct{}), term: make(chan struct{}), seenReadAddr: make(map[common.Address]struct{}), seenWriteAddr: make(map[common.Address]struct{}), - seenReadSlot: make(map[common.Hash]struct{}), - seenWriteSlot: make(map[common.Hash]struct{}), + seenReadSlot: make(map[slotKey]struct{}), + seenWriteSlot: make(map[slotKey]struct{}), } - go sf.loop() - return sf + go p.loop() + return p } -// scheduleAccounts adds a batch of accounts to the queue to prefetch. -func (sf *subfetcher) scheduleAccounts(addrs []common.Address, read bool) error { - // Ensure the subfetcher is still alive +// scheduleAccounts adds a batch of accounts to the prefetch queue. +func (p *prefetcher) scheduleAccounts(addrs []common.Address, read bool) error { select { - case <-sf.term: + case <-p.term: return errTerminated default: } - // Append the tasks to the current queue - sf.lock.Lock() - sf.tasks = append(sf.tasks, &subfetcherTask{ + if !p.prefetchRead && read { + return nil + } + p.lock.Lock() + p.tasks = append(p.tasks, &prefetchTask{ read: read, kind: kindAccount, accounts: addrs, }) - sf.lock.Unlock() + p.lock.Unlock() - // Notify the background thread to execute scheduled tasks select { - case sf.wake <- struct{}{}: - // Wake signal sent + case p.wake <- struct{}{}: default: - // Wake signal not sent as a previous one is already queued } return nil } -// scheduleSlots adds a batch of storage slots to the queue to prefetch. -func (sf *subfetcher) scheduleSlots(addr common.Address, slots []common.Hash, read bool) error { - // Ensure the subfetcher is still alive +// scheduleSlots adds a batch of storage slots to the prefetch queue. +func (p *prefetcher) scheduleSlots(addr common.Address, slots []common.Hash, read bool) error { select { - case <-sf.term: + case <-p.term: return errTerminated default: } - // Append the tasks to the current queue - sf.lock.Lock() - sf.tasks = append(sf.tasks, &subfetcherTask{ + if !p.prefetchRead && read { + return nil + } + p.lock.Lock() + p.tasks = append(p.tasks, &prefetchTask{ read: read, kind: kindStorage, account: addr, slots: slots, }) - sf.lock.Unlock() + p.lock.Unlock() - // Notify the background thread to execute scheduled tasks select { - case sf.wake <- struct{}{}: - // Wake signal sent + case p.wake <- struct{}{}: default: - // Wake signal not sent as a previous one is already queued } return nil } -// wait blocks until the subfetcher terminates. This method is used to block on -// an async termination before accessing internal fields from the fetcher. -func (sf *subfetcher) wait() { - <-sf.term -} - -// peek retrieves the fetcher's trie, populated with any pre-fetched data. The -// returned trie will be a shallow copy, so modifying it will break subsequent -// peeks for the original data. The method will block until all the scheduled -// data has been loaded and the fethcer terminated. -func (sf *subfetcher) peek() Trie { - // Block until the fetcher terminates, then retrieve the trie - sf.wait() - return sf.trie -} - -// terminate requests the subfetcher to stop accepting new tasks and spin down -// as soon as everything is loaded. Depending on the async parameter, the method -// will either block until all disk loads finish or return immediately. -func (sf *subfetcher) terminate(async bool) { +// terminate requests the prefetcher to stop and optionally waits for it. +func (p *prefetcher) terminate() { select { - case <-sf.stop: + case <-p.stop: default: - close(sf.stop) + close(p.stop) } - if async { - return - } - <-sf.term + <-p.term } -// openTrie resolves the target trie from database for prefetching. -func (sf *subfetcher) openTrie() error { - tr, err := sf.opener(sf.id, sf.addr) - if err != nil { - log.Warn("Trie prefetcher failed opening trie", "id", sf.id, "err", err) - return err - } - sf.trie = tr - return nil -} +// loop processes prefetch tasks until terminated. +func (p *prefetcher) loop() { + defer close(p.term) -// loop loads newly-scheduled trie tasks as they are received and loads them, stopping -// when requested. -func (sf *subfetcher) loop() { - // No matter how the loop stops, signal anyone waiting that it's terminated - defer close(sf.term) - - if err := sf.openTrie(); err != nil { - return - } for { select { - case <-sf.wake: - // Execute all remaining tasks in a single run - sf.lock.Lock() - tasks := sf.tasks - sf.tasks = nil - sf.lock.Unlock() + case <-p.wake: + p.lock.Lock() + tasks := p.tasks + p.tasks = nil + p.lock.Unlock() var ( - // Account tasks - addresses []common.Address - - // Slot tasks + addrs []common.Address slots = make(map[common.Address][][]byte) ) for _, task := range tasks { if task.kind == kindAccount { for _, addr := range task.accounts { - if task.read { - if _, ok := sf.seenReadAddr[addr]; ok { - sf.dupsRead++ - continue - } - if _, ok := sf.seenWriteAddr[addr]; ok { - sf.dupsCross++ - continue - } - sf.seenReadAddr[addr] = struct{}{} - } else { - if _, ok := sf.seenReadAddr[addr]; ok { - sf.dupsCross++ - continue - } - if _, ok := sf.seenWriteAddr[addr]; ok { - sf.dupsWrite++ - continue - } - sf.seenWriteAddr[addr] = struct{}{} + if p.dedupAddr(addr, task.read) { + continue } - addresses = append(addresses, addr) + addrs = append(addrs, addr) } } else { - var keys [][]byte for _, slot := range task.slots { - if task.read { - if _, ok := sf.seenReadSlot[slot]; ok { - sf.dupsRead++ - continue - } - if _, ok := sf.seenWriteSlot[slot]; ok { - sf.dupsCross++ - continue - } - sf.seenReadSlot[slot] = struct{}{} - } else { - if _, ok := sf.seenReadSlot[slot]; ok { - sf.dupsCross++ - continue - } - if _, ok := sf.seenWriteSlot[slot]; ok { - sf.dupsWrite++ - continue - } - sf.seenWriteSlot[slot] = struct{}{} + if p.dedupSlot(task.account, slot, task.read) { + continue } - keys = append(keys, slot.Bytes()) + slots[task.account] = append(slots[task.account], slot.Bytes()) } - slots[task.account] = append(slots[task.account], keys...) } } - if len(addresses) != 0 { - if err := sf.trie.PrefetchAccount(addresses); err != nil { + if len(addrs) > 0 { + if err := p.trie.PrefetchAccount(addrs); err != nil { log.Error("Failed to prefetch accounts", "err", err) } } for addr, keys := range slots { - if err := sf.trie.PrefetchStorage(addr, keys); err != nil { + if err := p.trie.PrefetchStorage(addr, keys); err != nil { log.Error("Failed to prefetch storage", "err", err) } } - case <-sf.stop: - // Termination is requested, abort if no more tasks are pending. If - // there are some, exhaust them first. - sf.lock.Lock() - done := sf.tasks == nil - sf.lock.Unlock() + case <-p.stop: + p.lock.Lock() + done := p.tasks == nil + p.lock.Unlock() if done { return } - // Some tasks are pending, loop and pick them up (that wake branch - // will be selected eventually, whilst stop remains closed to this - // branch will also run afterwards). } } } + +// dedupAddr returns true if addr was already seen for this read/write category. +func (p *prefetcher) dedupAddr(addr common.Address, read bool) bool { + if read { + if _, ok := p.seenReadAddr[addr]; ok { + return true + } + if _, ok := p.seenWriteAddr[addr]; ok { + return true + } + p.seenReadAddr[addr] = struct{}{} + } else { + if _, ok := p.seenReadAddr[addr]; ok { + return true + } + if _, ok := p.seenWriteAddr[addr]; ok { + return true + } + p.seenWriteAddr[addr] = struct{}{} + } + return false +} + +// dedupSlot returns true if slot was already seen for this read/write category. +func (p *prefetcher) dedupSlot(addr common.Address, slot common.Hash, read bool) bool { + key := slotKey{addr: addr, slot: slot} + if read { + if _, ok := p.seenReadSlot[key]; ok { + return true + } + if _, ok := p.seenWriteSlot[key]; ok { + return true + } + p.seenReadSlot[key] = struct{}{} + } else { + if _, ok := p.seenReadSlot[key]; ok { + return true + } + if _, ok := p.seenWriteSlot[key]; ok { + return true + } + p.seenWriteSlot[key] = struct{}{} + } + return false +} diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index efec28ce88..556c48ed4c 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -21,102 +21,71 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/tracing" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/internal/testrand" "github.com/ethereum/go-ethereum/trie" - "github.com/ethereum/go-ethereum/triedb" "github.com/holiman/uint256" ) func filledStateDB() *StateDB { state, _ := New(types.EmptyRootHash, NewDatabaseForTesting()) - // Create an account and check if the retrieved balance is correct addr := common.HexToAddress("0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe") skey := common.HexToHash("aaa") sval := common.HexToHash("bbb") - state.SetBalance(addr, uint256.NewInt(42), tracing.BalanceChangeUnspecified) // Change the account trie - state.SetCode(addr, []byte("hello"), tracing.CodeChangeUnspecified) // Change an external metadata - state.SetState(addr, skey, sval) // Change the storage trie + state.SetBalance(addr, uint256.NewInt(42), tracing.BalanceChangeUnspecified) + state.SetCode(addr, []byte("hello"), tracing.CodeChangeUnspecified) + state.SetState(addr, skey, sval) for i := 0; i < 100; i++ { sk := common.BigToHash(big.NewInt(int64(i))) - state.SetState(addr, sk, sk) // Change the storage trie + state.SetState(addr, sk, sk) } return state } -func TestUseAfterTerminate(t *testing.T) { +func TestSubfetcherUseAfterTerminate(t *testing.T) { db := filledStateDB() - opener := func(id trie.ID, addr common.Address) (Trie, error) { - if db.db.TrieDB().IsVerkle() { - return db.db.OpenTrie(id.StateRoot) - } - if id.Owner != (common.Hash{}) { - return db.db.OpenStorageTrie(id.StateRoot, addr, id.Root, nil) - } - return db.db.OpenTrie(id.StateRoot) + // Open a trie and create a subfetcher for it. + id := trie.StateTrieID(db.originalRoot) + tr, err := trie.NewStateTrie(id, db.db.TrieDB()) + if err != nil { + t.Fatalf("Failed to open trie: %v", err) } - prefetcher := newTriePrefetcher(opener, db.originalRoot, "", true) + sf := newPrefetcher(tr, false) addr := common.HexToAddress("0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe") + // Scheduling before termination should succeed. + if err := sf.scheduleAccounts([]common.Address{addr}, false); err != nil { + t.Fatalf("Schedule failed before terminate: %v", err) + } + // Terminate synchronously — waits for pending tasks. + sf.terminate() + + // Scheduling after termination should fail. + if err := sf.scheduleAccounts([]common.Address{addr}, false); err == nil { + t.Fatal("Schedule succeeded after terminate") + } +} + +func TestWrapTriePrefetch(t *testing.T) { + db := filledStateDB() + + // Create a wrapTrie with prefetching enabled. id := trie.StateTrieID(db.originalRoot) - if err := prefetcher.prefetchAccounts(*id, []common.Address{addr}, false); err != nil { - t.Errorf("Prefetch failed before terminate: %v", err) - } - prefetcher.terminate(false) - - if err := prefetcher.prefetchAccounts(*id, []common.Address{addr}, false); err == nil { - t.Errorf("Prefetch succeeded after terminate: %v", err) - } - if tr := prefetcher.trie(*id); tr == nil { - t.Errorf("Prefetcher returned nil trie after terminate") - } -} - -func TestVerklePrefetcher(t *testing.T) { - disk := rawdb.NewMemoryDatabase() - db := triedb.NewDatabase(disk, triedb.VerkleDefaults) - sdb := NewDatabase(db, nil) - - state, err := New(types.EmptyRootHash, sdb) + tr, err := newWrapTrie(id, db.db.TrieDB(), true, true) if err != nil { - t.Fatalf("failed to initialize state: %v", err) + t.Fatalf("Failed to create wrapTrie: %v", err) } - // Create an account and check if the retrieved balance is correct - addr := testrand.Address() - skey := testrand.Hash() - sval := testrand.Hash() + addr := common.HexToAddress("0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe") - state.SetBalance(addr, uint256.NewInt(42), tracing.BalanceChangeUnspecified) // Change the account trie - state.SetCode(addr, []byte("hello"), tracing.CodeChangeUnspecified) // Change an external metadata - state.SetState(addr, skey, sval) // Change the storage trie - root, _ := state.Commit(0, true, false) + // Schedule some prefetch work. + tr.prefetchAccounts([]common.Address{addr}, false) - state, _ = New(root, sdb) - - opener := func(id trie.ID, addr common.Address) (Trie, error) { - return sdb.OpenTrie(id.StateRoot) - } - fetcher := newTriePrefetcher(opener, root, "", false) - - // Read account - id := trie.StateTrieID(root) - fetcher.prefetchAccounts(*id, []common.Address{addr}, false) - - // Read storage slot - fetcher.prefetchStorage(*id, addr, []common.Hash{skey}, false) - - fetcher.terminate(false) - accountTrie := fetcher.trie(*id) - storageTrie := fetcher.trie(*id) - - rootA := accountTrie.Hash() - rootB := storageTrie.Hash() - if rootA != rootB { - t.Fatal("Two different tries are retrieved") + // Terminate and verify the trie is usable. + tr.term() + if tr.Hash() == (common.Hash{}) { + t.Fatal("wrapTrie hash is zero after prefetch") } }