core/state: integrate prefetching into merkle hasher

This commit is contained in:
Gary Rong 2026-03-26 12:16:27 +08:00 committed by CPerezz
parent 91298c8655
commit 5e23a29b73
No known key found for this signature in database
GPG key ID: 62045F34B97177DD
7 changed files with 1001 additions and 551 deletions

View file

@ -224,7 +224,7 @@ func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) {
// Hasher implements Database, returning a hasher associated with the specified
// state root.
func (db *CachingDB) Hasher(stateRoot common.Hash) (Hasher, error) {
return newMerkleHasher(stateRoot, db.triedb)
return newMerkleHasher(stateRoot, db.triedb, true, true)
}
// ReadersWithCacheStats creates a pair of state readers that share the same

View file

@ -128,6 +128,7 @@ type Prover interface {
// returns an empty state root.
type noopHasher struct{}
func (n *noopHasher) Close() {}
func (n *noopHasher) UpdateAccount([]common.Address, []AccountMut) error { return nil }
func (n *noopHasher) UpdateStorage(common.Address, []common.Hash, []common.Hash) error {
return nil

View file

@ -117,6 +117,8 @@ func (h *binaryHasher) Commit() (common.Hash, *trienode.MergedNodeSet, map[commo
return root, nodes, nil, nil
}
func (h *binaryHasher) Close() {}
func (h *binaryHasher) Copy() Hasher {
return &binaryHasher{
db: h.db,

View file

@ -29,159 +29,317 @@ import (
"github.com/ethereum/go-ethereum/triedb"
)
// wrapTrie pairs a StateTrie with an optional background prefetcher that
// preloads trie nodes ahead of mutation.
type wrapTrie struct {
*trie.StateTrie
prefetcher *prefetcher
}
func newWrapTrie(id *trie.ID, db *triedb.Database, prefetch bool, prefetchRead bool) (*wrapTrie, error) {
t, err := trie.NewStateTrie(id, db)
if err != nil {
return nil, err
}
var p *prefetcher
if prefetch {
p = newPrefetcher(t, prefetchRead)
}
return &wrapTrie{StateTrie: t, prefetcher: p}, nil
}
// term synchronously terminates the prefetcher (no-op if nil or already done).
// After termination the prefetcher reference is nilled so subsequent calls are
// a cheap pointer check.
func (tr *wrapTrie) term() {
if tr.prefetcher == nil {
return
}
tr.prefetcher.terminate()
tr.prefetcher = nil
}
// The methods below shadow the embedded StateTrie so that any direct trie
// access auto-terminates the prefetcher first. This makes data-race freedom
// structural: callers never need to remember to call term() manually.
func (tr *wrapTrie) UpdateAccount(address common.Address, acc *types.StateAccount, codeLen int) error {
tr.term()
return tr.StateTrie.UpdateAccount(address, acc, codeLen)
}
func (tr *wrapTrie) DeleteAccount(address common.Address) error {
tr.term()
return tr.StateTrie.DeleteAccount(address)
}
func (tr *wrapTrie) UpdateStorage(address common.Address, key, value []byte) error {
tr.term()
return tr.StateTrie.UpdateStorage(address, key, value)
}
func (tr *wrapTrie) DeleteStorage(address common.Address, key []byte) error {
tr.term()
return tr.StateTrie.DeleteStorage(address, key)
}
func (tr *wrapTrie) Hash() common.Hash {
tr.term()
return tr.StateTrie.Hash()
}
func (tr *wrapTrie) Commit(collectLeaf bool) (common.Hash, *trienode.NodeSet) {
tr.term()
return tr.StateTrie.Commit(collectLeaf)
}
func (tr *wrapTrie) Prove(key []byte, proofDb ethdb.KeyValueWriter) error {
tr.term()
return tr.StateTrie.Prove(key, proofDb)
}
func (tr *wrapTrie) copy() *wrapTrie {
tr.term()
return &wrapTrie{StateTrie: tr.StateTrie.Copy()}
}
func (tr *wrapTrie) prefetchAccounts(addresses []common.Address, read bool) {
if tr.prefetcher == nil {
return
}
tr.prefetcher.scheduleAccounts(addresses, read)
}
func (tr *wrapTrie) prefetchStorage(addr common.Address, keys []common.Hash, read bool) {
if tr.prefetcher == nil {
return
}
tr.prefetcher.scheduleSlots(addr, keys, read)
}
// rootReader wraps the account trie for loading the storage root. It is
// essential to use an independent trie to prevent potential data races
// with the optional prefetcher.
type rootReader struct {
tr *trie.StateTrie
}
func newRootReader(root common.Hash, db *triedb.Database) (*rootReader, error) {
t, err := trie.NewStateTrie(trie.StateTrieID(root), db)
if err != nil {
return nil, err
}
return &rootReader{tr: t}, nil
}
func (r *rootReader) readStorageRoot(address common.Address) (common.Hash, error) {
acct, err := r.tr.GetAccount(address)
if err != nil {
return common.Hash{}, err
}
if acct == nil {
return types.EmptyRootHash, nil
}
return acct.Root, nil
}
func (r *rootReader) copy() *rootReader {
return &rootReader{tr: r.tr.Copy()}
}
// merkleHasher is a Hasher implementation backed by the traditional two-layer
// Merkle Patricia Trie (separate account trie and per-account storage tries).
type merkleHasher struct {
db *triedb.Database
root common.Hash
db *triedb.Database
root common.Hash
reader *rootReader
prefetch bool
prefetchRead bool
accountTrie *trie.StateTrie
storageTries map[common.Address]*trie.StateTrie // lazily opened
acctTrie *wrapTrie
storageTries map[common.Address]*wrapTrie
// storageRoots tracks the storage root transition for every mutated
// account. Prev is recorded once (first touch) and Hash is updated
// on each UpdateAccount call.
// deletedTries preserves storage tries of accounts that were deleted
// during the block. Keyed by address; only the first deletion per
// address is recorded (the pre-block incarnation).
deletedTries map[common.Address]*wrapTrie
// storageRoots tracks the storage root transition for each resolved
// account. Prev is captured on first touch; Hash is updated by
// UpdateStorage or set to EmptyRootHash on deletion.
storageRoots map[common.Address]Hashes
lock sync.Mutex // guards storageTries (concurrent updateTrie)
storageLock sync.Mutex // guards storage trie fields
}
func newMerkleHasher(root common.Hash, db *triedb.Database) (*merkleHasher, error) {
tr, err := trie.NewStateTrie(trie.StateTrieID(root), db)
func newMerkleHasher(root common.Hash, db *triedb.Database, prefetch bool, prefetchRead bool) (*merkleHasher, error) {
tr, err := newWrapTrie(trie.StateTrieID(root), db, prefetch, prefetchRead)
if err != nil {
return nil, err
}
r, err := newRootReader(root, db)
if err != nil {
return nil, err
}
return &merkleHasher{
db: db,
root: root,
accountTrie: tr,
storageTries: make(map[common.Address]*trie.StateTrie),
prefetch: prefetch,
prefetchRead: prefetchRead,
reader: r,
acctTrie: tr,
storageTries: make(map[common.Address]*wrapTrie),
deletedTries: make(map[common.Address]*wrapTrie),
storageRoots: make(map[common.Address]Hashes),
}, nil
}
// accountStorageRoot reads the storage root of account from the account trie.
func (h *merkleHasher) accountStorageRoot(addr common.Address) common.Hash {
if acc, _ := h.accountTrie.GetAccount(addr); acc != nil {
return acc.Root
// storageRoot returns the current tracked storage root for addr. On first
// access for a given address the root is read from the account trie and
// recorded as the Prev value for the commit-time transition report.
func (h *merkleHasher) storageRoot(addr common.Address) (common.Hash, error) {
if hashes, ok := h.storageRoots[addr]; ok {
return hashes.Hash, nil
}
return types.EmptyRootHash
root, err := h.reader.readStorageRoot(addr)
if err != nil {
return common.Hash{}, err
}
h.storageRoots[addr] = Hashes{Prev: root, Hash: root}
return root, nil
}
// recordOrigin records the original (pre-mutation) storage root for addr.
// Only the first call per address has any effect.
func (h *merkleHasher) recordOrigin(addr common.Address) {
if _, ok := h.storageRoots[addr]; !ok {
root := h.accountStorageRoot(addr)
h.storageRoots[addr] = Hashes{
Prev: root,
Hash: root,
}
}
}
// openStorageTrie returns the cached storage trie for addr, or opens one from
// the database if not already cached.
func (h *merkleHasher) openStorageTrie(address common.Address, prefetch bool) (*wrapTrie, error) {
h.storageLock.Lock()
defer h.storageLock.Unlock()
// openStorageTrie returns the cached storage trie for the given address,
// or opens one from the database if not already cached.
func (h *merkleHasher) openStorageTrie(address common.Address) (*trie.StateTrie, error) {
if st, ok := h.storageTries[address]; ok {
return st, nil
if tr, ok := h.storageTries[address]; ok {
return tr, nil
}
// Record the original storage trie root if it has not already been tracked
// when the storage trie is loaded.
h.recordOrigin(address)
id := trie.StorageTrieID(h.root, crypto.Keccak256Hash(address.Bytes()), h.accountStorageRoot(address))
st, err := trie.NewStateTrie(id, h.db)
root, err := h.storageRoot(address)
if err != nil {
return nil, err
}
h.storageTries[address] = st
return st, nil
id := trie.StorageTrieID(h.root, crypto.Keccak256Hash(address.Bytes()), root)
tr, err := newWrapTrie(id, h.db, h.prefetch && prefetch, h.prefetchRead)
if err != nil {
return nil, err
}
h.storageTries[address] = tr
return tr, nil
}
func (h *merkleHasher) UpdateStorage(address common.Address, keys []common.Hash, values []common.Hash) error {
h.lock.Lock()
st, err := h.openStorageTrie(address)
func (h *merkleHasher) deleteAccount(addr common.Address) error {
// Deletion: capture the original storage root before modifying the trie.
_, err := h.storageRoot(addr)
if err != nil {
h.lock.Unlock()
return err
}
h.lock.Unlock()
h.storageRoots[addr] = Hashes{
Prev: h.storageRoots[addr].Prev,
Hash: types.EmptyRootHash,
}
// Preserve the first deleted storage trie per address for
// witness collection.
if tr, ok := h.storageTries[addr]; ok && h.deletedTries[addr] == nil {
h.deletedTries[addr] = tr
}
delete(h.storageTries, addr)
for i, key := range keys {
if values[i] == (common.Hash{}) {
if err := st.DeleteStorage(address, key[:]); err != nil {
return err
}
return h.acctTrie.DeleteAccount(addr)
}
func (h *merkleHasher) updateAccount(addr common.Address, account AccountMut) error {
root, err := h.storageRoot(addr)
if err != nil {
return err
}
data := &types.StateAccount{
Nonce: account.Account.Nonce,
Balance: account.Account.Balance,
Root: root,
CodeHash: account.Account.CodeHash,
}
return h.acctTrie.UpdateAccount(addr, data, 0)
}
// UpdateAccount implements Hasher.
func (h *merkleHasher) UpdateAccount(addresses []common.Address, accounts []AccountMut) error {
for i, addr := range addresses {
var err error
if accounts[i].Account == nil {
err = h.deleteAccount(addr)
} else {
if err := st.UpdateStorage(address, key[:], common.TrimLeftZeroes(values[i][:])); err != nil {
return err
}
err = h.updateAccount(addr, accounts[i])
}
if err != nil {
return err
}
}
return nil
}
func (h *merkleHasher) UpdateAccount(addresses []common.Address, accounts []AccountMut) error {
for i, addr := range addresses {
h.recordOrigin(addr)
acct := accounts[i]
// Deletion: remove from account trie and evict any cached
// storage trie so a re-created account starts fresh.
if acct.Account == nil {
if err := h.accountTrie.DeleteAccount(addr); err != nil {
// UpdateStorage implements Hasher.
func (h *merkleHasher) UpdateStorage(address common.Address, keys []common.Hash, values []common.Hash) error {
tr, err := h.openStorageTrie(address, false)
if err != nil {
return err
}
for i, key := range keys {
if values[i] == (common.Hash{}) {
if err := tr.DeleteStorage(address, key[:]); err != nil {
return err
}
delete(h.storageTries, addr)
h.storageRoots[addr] = Hashes{
Prev: h.storageRoots[addr].Prev,
Hash: types.EmptyRootHash,
} else {
if err := tr.UpdateStorage(address, key[:], common.TrimLeftZeroes(values[i][:])); err != nil {
return err
}
continue
}
// Determine storage root from the cached trie (if storage was
// modified) or from the account trie (unchanged storage).
storageRoot := h.accountStorageRoot(addr)
if st, ok := h.storageTries[addr]; ok {
storageRoot = st.Hash()
}
sa := &types.StateAccount{
Nonce: acct.Account.Nonce,
Balance: acct.Account.Balance,
Root: storageRoot,
CodeHash: acct.Account.CodeHash,
}
if err := h.accountTrie.UpdateAccount(addr, sa, 0); err != nil {
return err
}
h.storageRoots[addr] = Hashes{
Prev: h.storageRoots[addr].Prev,
Hash: storageRoot,
}
}
// Hash outside the lock to allow full parallelism across accounts.
hash := tr.Hash()
h.storageLock.Lock()
h.storageRoots[address] = Hashes{
Prev: h.storageRoots[address].Prev,
Hash: hash,
}
h.storageLock.Unlock()
return nil
}
func (h *merkleHasher) Hash() common.Hash {
return h.accountTrie.Hash()
return h.acctTrie.Hash()
}
// Close terminates all prefetcher goroutines. Safe to call multiple times.
func (h *merkleHasher) Close() {
h.acctTrie.term()
for _, tr := range h.storageTries {
tr.term()
}
for _, tr := range h.deletedTries {
tr.term()
}
}
func (h *merkleHasher) Commit() (common.Hash, *trienode.MergedNodeSet, map[common.Address]Hashes, error) {
nodes := trienode.NewMergedNodeSet()
// Explicitly terminate all resolved tries. Some of them may not be
// terminated due to read-only prefetching. This is essential to
// prevent goroutine leaks.
h.Close()
// Commit all dirty storage tries.
for _, st := range h.storageTries {
if _, set := st.Commit(false); set != nil {
nodes := trienode.NewMergedNodeSet()
for _, tr := range h.storageTries {
if _, set := tr.Commit(false); set != nil {
if err := nodes.Merge(set); err != nil {
return common.Hash{}, nil, nil, err
}
}
}
// Commit the account trie. collectLeaf must be true so that hashdb
// can link account trie leaves to their storage trie roots.
root, set := h.accountTrie.Commit(true)
root, set := h.acctTrie.Commit(true)
if set != nil {
if err := nodes.Merge(set); err != nil {
return common.Hash{}, nil, nil, err
@ -194,28 +352,53 @@ func (h *merkleHasher) Copy() Hasher {
cpy := &merkleHasher{
db: h.db,
root: h.root,
accountTrie: h.accountTrie.Copy(),
storageTries: make(map[common.Address]*trie.StateTrie, len(h.storageTries)),
reader: h.reader.copy(),
prefetch: false,
acctTrie: h.acctTrie.copy(),
storageTries: make(map[common.Address]*wrapTrie, len(h.storageTries)),
deletedTries: make(map[common.Address]*wrapTrie, len(h.deletedTries)),
storageRoots: maps.Clone(h.storageRoots),
}
for addr, st := range h.storageTries {
cpy.storageTries[addr] = st.Copy()
for addr, tr := range h.storageTries {
cpy.storageTries[addr] = tr.copy()
}
for addr, tr := range h.deletedTries {
cpy.deletedTries[addr] = tr.copy()
}
return cpy
}
// ProveAccount implements Prover by constructing a Merkle proof for the
// given account against the current account trie.
// ProveAccount implements Prover.
func (h *merkleHasher) ProveAccount(addr common.Address, proofDb ethdb.KeyValueWriter) error {
return h.accountTrie.Prove(crypto.Keccak256(addr.Bytes()), proofDb)
return h.acctTrie.Prove(crypto.Keccak256(addr.Bytes()), proofDb)
}
// ProveStorage implements Prover by constructing a Merkle proof for the given
// storage slot. The storage trie is opened lazily if not already cached.
// ProveStorage implements Prover.
func (h *merkleHasher) ProveStorage(addr common.Address, key common.Hash, proofDb ethdb.KeyValueWriter) error {
st, err := h.openStorageTrie(addr)
tr, err := h.openStorageTrie(addr, false)
if err != nil {
return err
}
return st.Prove(crypto.Keccak256(key.Bytes()), proofDb)
return tr.Prove(crypto.Keccak256(key.Bytes()), proofDb)
}
// PrefetchAccount implements Prefetcher, preloading the nodes of specific accounts.
func (h *merkleHasher) PrefetchAccount(addresses []common.Address, read bool) {
if !h.prefetch {
return
}
h.acctTrie.prefetchAccounts(addresses, read)
}
// PrefetchStorage implements Prefetcher. The storage trie is opened eagerly
// so the prefetcher can begin loading nodes in the background.
func (h *merkleHasher) PrefetchStorage(addr common.Address, keys []common.Hash, read bool) {
if !h.prefetch {
return
}
tr, err := h.openStorageTrie(addr, true)
if err != nil {
return
}
tr.prefetchStorage(addr, keys, read)
}

View file

@ -0,0 +1,541 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package state
import (
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/triedb"
"github.com/holiman/uint256"
)
var (
hasherAddr1 = common.HexToAddress("0x1111111111111111111111111111111111111111")
hasherAddr2 = common.HexToAddress("0x2222222222222222222222222222222222222222")
hasherAddr3 = common.HexToAddress("0x3333333333333333333333333333333333333333")
hasherSlot1 = common.HexToHash("0x01")
hasherSlot2 = common.HexToHash("0x02")
hasherSlot3 = common.HexToHash("0x03")
hasherVal1 = common.HexToHash("0xaa")
hasherVal2 = common.HexToHash("0xbb")
hasherVal3 = common.HexToHash("0xcc")
)
// hasherTestConfig captures the prefetch flags varied across subtests.
type hasherTestConfig struct {
name string
prefetch bool
prefetchRead bool
}
// hasherTestConfigs enumerates the interesting (prefetch, prefetchRead) combinations:
// - no prefetch at all
// - prefetch writes only (read prefetch requests are dropped)
// - prefetch reads and writes
var hasherTestConfigs = []hasherTestConfig{
{"noPrefetch", false, false},
{"prefetchWriteOnly", true, false},
{"prefetchAll", true, true},
}
func hasherAccount(nonce uint64, balance uint64) AccountMut {
return AccountMut{
Account: &Account{
Nonce: nonce,
Balance: uint256.NewInt(balance),
CodeHash: types.EmptyCodeHash.Bytes(),
},
}
}
func hasherDeleteAccount() AccountMut {
return AccountMut{Account: nil}
}
// newTestHasher creates a merkleHasher backed by an in-memory database.
func newTestHasher(t *testing.T, db *triedb.Database, root common.Hash, cfg hasherTestConfig) *merkleHasher {
t.Helper()
h, err := newMerkleHasher(root, db, cfg.prefetch, cfg.prefetchRead)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { h.Close() })
return h
}
// commitAndReopen commits the hasher's state and reopens a fresh hasher from
// the committed root. This simulates a block boundary.
func commitAndReopen(t *testing.T, h *merkleHasher, cfg hasherTestConfig) *merkleHasher {
t.Helper()
root, nodes, _, err := h.Commit()
if err != nil {
t.Fatal(err)
}
if nodes != nil {
if err := h.db.Update(root, h.root, 0, nodes, nil); err != nil {
t.Fatal(err)
}
if err := h.db.Commit(root, false); err != nil {
t.Fatal(err)
}
}
h2, err := newMerkleHasher(root, h.db, cfg.prefetch, cfg.prefetchRead)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { h2.Close() })
return h2
}
// makeBaseState creates a non-empty state as the starting point for tests.
// The base contains:
// - addr1: nonce=1, balance=100, storage={slot1: val1, slot2: val2}
// - addr2: nonce=2, balance=200, no storage
//
// The state is committed and flushed so the hasher returned opens from disk,
// exercising rootReader and existing-trie code paths.
func makeBaseState(t *testing.T, cfg hasherTestConfig) *merkleHasher {
t.Helper()
noPrefetch := hasherTestConfig{"base", false, false}
db := triedb.NewDatabase(rawdb.NewMemoryDatabase(), nil)
h := newTestHasher(t, db, types.EmptyRootHash, noPrefetch)
if err := h.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot1, hasherSlot2}, []common.Hash{hasherVal1, hasherVal2}); err != nil {
t.Fatal(err)
}
if err := h.UpdateAccount(
[]common.Address{hasherAddr1, hasherAddr2},
[]AccountMut{hasherAccount(1, 100), hasherAccount(2, 200)},
); err != nil {
t.Fatal(err)
}
return commitAndReopen(t, h, cfg)
}
// TestMerkleHasherBasic verifies that mutating storage and accounts on top of
// a non-empty base state produces a deterministic, non-empty root and that the
// root survives a commit+reopen cycle.
func TestMerkleHasherBasic(t *testing.T) {
for _, cfg := range hasherTestConfigs {
t.Run(cfg.name, func(t *testing.T) {
h := makeBaseState(t, cfg)
if cfg.prefetch {
h.PrefetchStorage(hasherAddr1, []common.Hash{hasherSlot3}, false)
h.PrefetchAccount([]common.Address{hasherAddr1, hasherAddr3}, false)
}
// Add slot3 to addr1 and create addr3.
if err := h.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3}, []common.Hash{hasherVal3}); err != nil {
t.Fatal(err)
}
if err := h.UpdateAccount(
[]common.Address{hasherAddr1, hasherAddr3},
[]AccountMut{hasherAccount(1, 100), hasherAccount(3, 300)},
); err != nil {
t.Fatal(err)
}
root := h.Hash()
if root == types.EmptyRootHash {
t.Fatal("expected non-empty root after mutations")
}
h2 := commitAndReopen(t, h, cfg)
if h2.Hash() != root {
t.Fatalf("root mismatch after reopen: got %x, want %x", h2.Hash(), root)
}
})
}
}
// TestMerkleHasherPrefetchReadOnly verifies that read-only prefetching (for
// accounts and storage that are never subsequently mutated) does not corrupt
// state and does not leak goroutines. Both prefetchRead=true (requests are
// processed) and prefetchRead=false (requests are dropped) are tested.
func TestMerkleHasherPrefetchReadOnly(t *testing.T) {
for _, prefetchRead := range []bool{false, true} {
name := "readDropped"
if prefetchRead {
name = "readProcessed"
}
t.Run(name, func(t *testing.T) {
cfg := hasherTestConfig{name, true, prefetchRead}
h := makeBaseState(t, cfg)
rootBefore := h.Hash()
// Prefetch addr1's account and storage (read-only). Whether
// these are actually processed depends on prefetchRead.
h.PrefetchAccount([]common.Address{hasherAddr1, hasherAddr2}, true)
h.PrefetchStorage(hasherAddr1, []common.Hash{hasherSlot1, hasherSlot2}, true)
// Only mutate addr2 (no storage) — addr1's prefetched tries
// are never accessed through a shadow method.
if err := h.UpdateAccount(
[]common.Address{hasherAddr2},
[]AccountMut{hasherAccount(2, 300)},
); err != nil {
t.Fatal(err)
}
root := h.Hash()
if root == rootBefore {
t.Fatal("expected root to change after balance update")
}
h2 := commitAndReopen(t, h, hasherTestConfig{"verify", false, false})
if h2.Hash() != root {
t.Fatalf("root mismatch: got %x, want %x", h2.Hash(), root)
}
})
}
}
// TestMerkleHasherDeleteAccount verifies that deleting an account with storage
// produces an empty storage root in the commit result, with Prev reflecting
// the original non-empty root.
func TestMerkleHasherDeleteAccount(t *testing.T) {
for _, cfg := range hasherTestConfigs {
t.Run(cfg.name, func(t *testing.T) {
h := makeBaseState(t, cfg)
if cfg.prefetch {
h.PrefetchAccount([]common.Address{hasherAddr1}, false)
h.PrefetchStorage(hasherAddr1, []common.Hash{hasherSlot1, hasherSlot2}, false)
}
// Delete addr1 (which has storage slots 1,2).
if err := h.UpdateAccount(
[]common.Address{hasherAddr1},
[]AccountMut{hasherDeleteAccount()},
); err != nil {
t.Fatal(err)
}
_, _, storageRoots, err := h.Commit()
if err != nil {
t.Fatal(err)
}
sr, ok := storageRoots[hasherAddr1]
if !ok {
t.Fatal("deleted account missing from storageRoots")
}
if sr.Hash != types.EmptyRootHash {
t.Fatalf("deleted account storage root: got %x, want EmptyRootHash", sr.Hash)
}
if sr.Prev == types.EmptyRootHash {
t.Fatal("deleted account Prev should be non-empty (had storage)")
}
})
}
}
// TestMerkleHasherDeleteRecreate verifies that deleting an account and
// recreating it with different storage in the same block produces a correct
// root that survives a commit+reopen cycle. The storageRoots report must show
// the original Prev and a new Hash.
func TestMerkleHasherDeleteRecreate(t *testing.T) {
for _, cfg := range hasherTestConfigs {
t.Run(cfg.name, func(t *testing.T) {
h := makeBaseState(t, cfg)
if cfg.prefetch {
h.PrefetchAccount([]common.Address{hasherAddr1}, false)
h.PrefetchStorage(hasherAddr1, []common.Hash{hasherSlot1, hasherSlot2}, false)
}
// Delete addr1.
if err := h.UpdateAccount([]common.Address{hasherAddr1}, []AccountMut{hasherDeleteAccount()}); err != nil {
t.Fatal(err)
}
// Recreate with slot3 only.
if err := h.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3}, []common.Hash{hasherVal3}); err != nil {
t.Fatal(err)
}
if err := h.UpdateAccount([]common.Address{hasherAddr1}, []AccountMut{hasherAccount(10, 500)}); err != nil {
t.Fatal(err)
}
root := h.Hash()
if root == types.EmptyRootHash {
t.Fatal("expected non-empty root after recreate")
}
h2 := commitAndReopen(t, h, hasherTestConfig{"verify", false, false})
sr := h.storageRoots[hasherAddr1]
if sr.Hash == types.EmptyRootHash {
t.Fatal("recreated account should have non-empty storage root")
}
if sr.Prev == types.EmptyRootHash {
t.Fatal("Prev should reflect the pre-deletion storage root")
}
if sr.Hash == sr.Prev {
t.Fatal("Hash and Prev should differ after delete+recreate with different slots")
}
if h2.Hash() != root {
t.Fatalf("root mismatch after reopen: got %x, want %x", h2.Hash(), root)
}
})
}
}
// TestMerkleHasherPrefetchDeterminism verifies that the resulting root is
// identical across all prefetch configurations for the same set of mutations.
func TestMerkleHasherPrefetchDeterminism(t *testing.T) {
var roots []common.Hash
for _, cfg := range hasherTestConfigs {
h := makeBaseState(t, cfg)
if cfg.prefetch {
h.PrefetchAccount([]common.Address{hasherAddr1, hasherAddr3}, false)
h.PrefetchStorage(hasherAddr1, []common.Hash{hasherSlot3}, false)
h.PrefetchStorage(hasherAddr3, []common.Hash{hasherSlot1}, false)
}
// Add slot3 to addr1, create addr3 with slot1.
if err := h.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3}, []common.Hash{hasherVal3}); err != nil {
t.Fatal(err)
}
if err := h.UpdateStorage(hasherAddr3, []common.Hash{hasherSlot1}, []common.Hash{hasherVal1}); err != nil {
t.Fatal(err)
}
if err := h.UpdateAccount(
[]common.Address{hasherAddr1, hasherAddr3},
[]AccountMut{hasherAccount(1, 100), hasherAccount(3, 300)},
); err != nil {
t.Fatal(err)
}
roots = append(roots, h.Hash())
}
for i := 1; i < len(roots); i++ {
if roots[i] != roots[0] {
t.Fatalf("root diverged: config[0]=%x config[%d]=%x", roots[0], i, roots[i])
}
}
}
// TestMerkleHasherCommitStorageRoots exhaustively checks the Prev/Hash pairs
// returned by Commit for every interesting mutation pattern:
//
// (1) delete account with non-empty storage
// (2) delete account with empty storage
// (3) delete + recreate with new non-empty storage
// (4) delete + recreate without storage (empty→empty after recreate)
// (5) delete + recreate: originally empty storage, recreated with storage
// (6) mutate account only, no storage (empty storage throughout)
// (7) mutate account only, non-empty storage unchanged
// (8) mutate account with modified storage
func TestMerkleHasherCommitStorageRoots(t *testing.T) {
var (
// Addresses for each case — distinct so they don't interfere.
addrDeleteNonEmpty = common.HexToAddress("0xaa01") // (1)
addrDeleteEmpty = common.HexToAddress("0xaa02") // (2)
addrRecreateStorage = common.HexToAddress("0xaa03") // (3)
addrRecreateNoStore = common.HexToAddress("0xaa04") // (4)
addrRecreateFromNone = common.HexToAddress("0xaa05") // (5)
addrMutateNoStorage = common.HexToAddress("0xaa06") // (6)
addrMutateKeepStore = common.HexToAddress("0xaa07") // (7)
addrMutateModStore = common.HexToAddress("0xaa08") // (8)
)
for _, cfg := range hasherTestConfigs {
t.Run(cfg.name, func(t *testing.T) {
// ---------- base state (committed to disk) ----------
noPrefetch := hasherTestConfig{"base", false, false}
db := triedb.NewDatabase(rawdb.NewMemoryDatabase(), nil)
base := newTestHasher(t, db, types.EmptyRootHash, noPrefetch)
// Accounts with storage.
for _, addr := range []common.Address{addrDeleteNonEmpty, addrRecreateStorage, addrRecreateNoStore, addrMutateKeepStore, addrMutateModStore} {
if err := base.UpdateStorage(addr, []common.Hash{hasherSlot1}, []common.Hash{hasherVal1}); err != nil {
t.Fatal(err)
}
}
// All accounts (some with storage above, some without).
allAddrs := []common.Address{
addrDeleteNonEmpty, addrDeleteEmpty,
addrRecreateStorage, addrRecreateNoStore, addrRecreateFromNone,
addrMutateNoStorage, addrMutateKeepStore, addrMutateModStore,
}
allAccounts := make([]AccountMut, len(allAddrs))
for i := range allAccounts {
allAccounts[i] = hasherAccount(1, 100)
}
if err := base.UpdateAccount(allAddrs, allAccounts); err != nil {
t.Fatal(err)
}
h := commitAndReopen(t, base, cfg)
// ---------- block mutations ----------
// (1) Delete account with non-empty storage.
// (2) Delete account with empty storage.
if err := h.UpdateAccount(
[]common.Address{addrDeleteNonEmpty, addrDeleteEmpty},
[]AccountMut{hasherDeleteAccount(), hasherDeleteAccount()},
); err != nil {
t.Fatal(err)
}
// (3) Delete + recreate with new storage.
if err := h.UpdateAccount([]common.Address{addrRecreateStorage}, []AccountMut{hasherDeleteAccount()}); err != nil {
t.Fatal(err)
}
if err := h.UpdateStorage(addrRecreateStorage, []common.Hash{hasherSlot2}, []common.Hash{hasherVal2}); err != nil {
t.Fatal(err)
}
if err := h.UpdateAccount([]common.Address{addrRecreateStorage}, []AccountMut{hasherAccount(2, 200)}); err != nil {
t.Fatal(err)
}
// (4) Delete + recreate without storage (had storage before).
if err := h.UpdateAccount([]common.Address{addrRecreateNoStore}, []AccountMut{hasherDeleteAccount()}); err != nil {
t.Fatal(err)
}
if err := h.UpdateAccount([]common.Address{addrRecreateNoStore}, []AccountMut{hasherAccount(2, 200)}); err != nil {
t.Fatal(err)
}
// (5) Delete + recreate: originally no storage, recreated with storage.
if err := h.UpdateAccount([]common.Address{addrRecreateFromNone}, []AccountMut{hasherDeleteAccount()}); err != nil {
t.Fatal(err)
}
if err := h.UpdateStorage(addrRecreateFromNone, []common.Hash{hasherSlot1}, []common.Hash{hasherVal3}); err != nil {
t.Fatal(err)
}
if err := h.UpdateAccount([]common.Address{addrRecreateFromNone}, []AccountMut{hasherAccount(2, 200)}); err != nil {
t.Fatal(err)
}
// (6) Mutate account only, no storage.
if err := h.UpdateAccount([]common.Address{addrMutateNoStorage}, []AccountMut{hasherAccount(2, 999)}); err != nil {
t.Fatal(err)
}
// (7) Mutate account, non-empty storage unchanged.
if err := h.UpdateAccount([]common.Address{addrMutateKeepStore}, []AccountMut{hasherAccount(2, 888)}); err != nil {
t.Fatal(err)
}
// (8) Mutate account with modified storage.
if err := h.UpdateStorage(addrMutateModStore, []common.Hash{hasherSlot1}, []common.Hash{hasherVal2}); err != nil {
t.Fatal(err)
}
if err := h.UpdateAccount([]common.Address{addrMutateModStore}, []AccountMut{hasherAccount(2, 777)}); err != nil {
t.Fatal(err)
}
_, _, roots, err := h.Commit()
if err != nil {
t.Fatal(err)
}
empty := types.EmptyRootHash
// (1) Deleted, had storage: Prev=non-empty, Hash=empty.
sr := roots[addrDeleteNonEmpty]
if sr.Prev == empty {
t.Fatal("(1) Prev should be non-empty for deleted account that had storage")
}
if sr.Hash != empty {
t.Fatal("(1) Hash should be EmptyRootHash after deletion")
}
// (2) Deleted, had no storage: Prev=empty, Hash=empty.
sr = roots[addrDeleteEmpty]
if sr.Prev != empty || sr.Hash != empty {
t.Fatalf("(2) expected both EmptyRootHash, got Prev=%x Hash=%x", sr.Prev, sr.Hash)
}
// (3) Delete+recreate with new storage: Prev=non-empty(original), Hash=non-empty(new), differ.
sr = roots[addrRecreateStorage]
if sr.Prev == empty {
t.Fatal("(3) Prev should be non-empty (had storage before deletion)")
}
if sr.Hash == empty {
t.Fatal("(3) Hash should be non-empty (recreated with storage)")
}
if sr.Hash == sr.Prev {
t.Fatal("(3) Hash and Prev should differ (different storage contents)")
}
// (4) Delete+recreate without storage (originally had storage): Prev=non-empty, Hash=empty.
sr = roots[addrRecreateNoStore]
if sr.Prev == empty {
t.Fatal("(4) Prev should be non-empty (had storage before deletion)")
}
if sr.Hash != empty {
t.Fatal("(4) Hash should be EmptyRootHash (recreated without storage)")
}
// (5) Delete+recreate: originally no storage, recreated with storage: Prev=empty, Hash=non-empty.
sr = roots[addrRecreateFromNone]
if sr.Prev != empty {
t.Fatal("(5) Prev should be EmptyRootHash (no storage before deletion)")
}
if sr.Hash == empty {
t.Fatal("(5) Hash should be non-empty (recreated with storage)")
}
// (6) Mutate account only, no storage: Prev=empty, Hash=empty.
sr = roots[addrMutateNoStorage]
if sr.Prev != empty || sr.Hash != empty {
t.Fatalf("(6) expected both EmptyRootHash, got Prev=%x Hash=%x", sr.Prev, sr.Hash)
}
// (7) Mutate account, storage unchanged: Prev=non-empty, Hash=non-empty, Prev==Hash.
sr = roots[addrMutateKeepStore]
if sr.Prev == empty {
t.Fatal("(7) Prev should be non-empty (has storage)")
}
if sr.Hash == empty {
t.Fatal("(7) Hash should be non-empty (storage unchanged)")
}
if sr.Prev != sr.Hash {
t.Fatal("(7) Prev and Hash should be equal (storage was not modified)")
}
// (8) Mutate account with modified storage: Prev=non-empty, Hash=non-empty, differ.
sr = roots[addrMutateModStore]
if sr.Prev == empty {
t.Fatal("(8) Prev should be non-empty (had storage)")
}
if sr.Hash == empty {
t.Fatal("(8) Hash should be non-empty (storage modified, not cleared)")
}
if sr.Prev == sr.Hash {
t.Fatal("(8) Prev and Hash should differ (storage was modified)")
}
})
}
}
// TestMerkleHasherCopy verifies that Copy produces an independent snapshot:
// mutations on the copy must not affect the original's hash.
func TestMerkleHasherCopy(t *testing.T) {
cfg := hasherTestConfig{"prefetchAll", true, true}
h := makeBaseState(t, cfg)
h.PrefetchAccount([]common.Address{hasherAddr1}, false)
h.PrefetchStorage(hasherAddr1, []common.Hash{hasherSlot3}, false)
if err := h.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3}, []common.Hash{hasherVal3}); err != nil {
t.Fatal(err)
}
if err := h.UpdateAccount([]common.Address{hasherAddr1}, []AccountMut{hasherAccount(1, 100)}); err != nil {
t.Fatal(err)
}
origRoot := h.Hash()
cpy := h.Copy()
defer cpy.(*merkleHasher).Close()
// Mutate the copy: delete slot3, add slot2 with new value.
if err := cpy.UpdateStorage(hasherAddr1, []common.Hash{hasherSlot3, hasherSlot2}, []common.Hash{{}, hasherVal3}); err != nil {
t.Fatal(err)
}
if err := cpy.UpdateAccount([]common.Address{hasherAddr1}, []AccountMut{hasherAccount(1, 100)}); err != nil {
t.Fatal(err)
}
if cpy.Hash() == origRoot {
t.Fatal("copy should diverge after mutation")
}
if h.Hash() != origRoot {
t.Fatal("original root changed after mutating copy")
}
}

View file

@ -22,471 +22,225 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/trie"
)
//lint:file-ignore U1000 this file intentionally keeps unused helpers for future use
var errTerminated = errors.New("fetcher is already terminated")
var (
// triePrefetchMetricsPrefix is the prefix under which to publish the metrics.
triePrefetchMetricsPrefix = "trie/prefetch/"
type slotKey struct {
addr common.Address
slot common.Hash
}
// errTerminated is returned if a fetcher is attempted to be operated after it
// has already terminated.
errTerminated = errors.New("fetcher is already terminated")
type taskKind uint8
const (
kindAccount taskKind = iota
kindStorage
)
type trieOpener func(id trie.ID, addr common.Address) (Trie, error) // Define the handler to open the trie for pulling
type prefetchTask struct {
read bool
kind taskKind
// triePrefetcher is an active prefetcher, which receives accounts or storage
// items and does trie-loading of them. The goal is to get as much useful content
// into the caches as possible.
//
// Note, the prefetcher's API is not thread safe.
type triePrefetcher struct {
root common.Hash // Root hash of the account trie for metrics
fetchers map[trie.ID]*subfetcher // Subfetchers for each trie
term chan struct{} // Channel to signal interruption
noreads bool // Whether to ignore state-read-only prefetch requests
opener trieOpener // Handler to open the trie for pulling
deliveryMissMeter *metrics.Meter
accountLoadReadMeter *metrics.Meter
accountLoadWriteMeter *metrics.Meter
accountDupReadMeter *metrics.Meter
accountDupWriteMeter *metrics.Meter
accountDupCrossMeter *metrics.Meter
accountWasteMeter *metrics.Meter
storageLoadReadMeter *metrics.Meter
storageLoadWriteMeter *metrics.Meter
storageDupReadMeter *metrics.Meter
storageDupWriteMeter *metrics.Meter
storageDupCrossMeter *metrics.Meter
storageWasteMeter *metrics.Meter
accounts []common.Address // kindAccount: addresses to prefetch
account common.Address // kindStorage: owner address
slots []common.Hash // kindStorage: slot keys to prefetch
}
func newTriePrefetcher(opener trieOpener, root common.Hash, namespace string, noreads bool) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
return &triePrefetcher{
root: root,
fetchers: make(map[trie.ID]*subfetcher), // Active prefetchers use the fetchers map
term: make(chan struct{}),
noreads: noreads,
opener: opener,
deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/read", nil),
accountLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/load/write", nil),
accountDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/read", nil),
accountDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/write", nil),
accountDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/account/dup/cross", nil),
accountWasteMeter: metrics.GetOrRegisterMeter(prefix+"/account/waste", nil),
storageLoadReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/read", nil),
storageLoadWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/load/write", nil),
storageDupReadMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/read", nil),
storageDupWriteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/write", nil),
storageDupCrossMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup/cross", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
}
// terminate iterates over all the subfetchers and issues a termination request
// to all of them. Depending on the async parameter, the method will either block
// until all subfetchers spin down, or return immediately.
func (p *triePrefetcher) terminate(async bool) {
// Short circuit if the fetcher is already closed
select {
case <-p.term:
return
default:
}
// Terminate all sub-fetchers, sync or async, depending on the request
for _, fetcher := range p.fetchers {
fetcher.terminate(async)
}
close(p.term)
}
// report aggregates the pre-fetching and usage metrics and reports them.
// nolint:unused
func (p *triePrefetcher) report() {
if !metrics.Enabled() {
return
}
for _, fetcher := range p.fetchers {
fetcher.wait() // ensure the fetcher's idle before poking in its internals
if fetcher.id.Owner == (common.Hash{}) {
p.accountLoadReadMeter.Mark(int64(len(fetcher.seenReadAddr)))
p.accountLoadWriteMeter.Mark(int64(len(fetcher.seenWriteAddr)))
p.accountDupReadMeter.Mark(int64(fetcher.dupsRead))
p.accountDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.accountDupCrossMeter.Mark(int64(fetcher.dupsCross))
for _, key := range fetcher.usedAddr {
delete(fetcher.seenReadAddr, key)
delete(fetcher.seenWriteAddr, key)
}
p.accountWasteMeter.Mark(int64(len(fetcher.seenReadAddr) + len(fetcher.seenWriteAddr)))
} else {
p.storageLoadReadMeter.Mark(int64(len(fetcher.seenReadSlot)))
p.storageLoadWriteMeter.Mark(int64(len(fetcher.seenWriteSlot)))
p.storageDupReadMeter.Mark(int64(fetcher.dupsRead))
p.storageDupWriteMeter.Mark(int64(fetcher.dupsWrite))
p.storageDupCrossMeter.Mark(int64(fetcher.dupsCross))
for _, key := range fetcher.usedSlot {
delete(fetcher.seenReadSlot, key)
delete(fetcher.seenWriteSlot, key)
}
p.storageWasteMeter.Mark(int64(len(fetcher.seenReadSlot) + len(fetcher.seenWriteSlot)))
}
}
}
// prefetchAccounts schedules a batch of accounts to prefetch.
func (p *triePrefetcher) prefetchAccounts(id trie.ID, addrs []common.Address, read bool) error {
// If the state item is only being read, but reads are disabled, return
if read && p.noreads {
return nil
}
// Ensure the subfetcher is still alive
select {
case <-p.term:
return errTerminated
default:
}
fetcher := p.fetchers[id]
if fetcher == nil {
fetcher = newSubfetcher(p.opener, id, common.Address{})
p.fetchers[id] = fetcher
}
return fetcher.scheduleAccounts(addrs, read)
}
// prefetchStorage schedules a batch of storage slots to prefetch.
func (p *triePrefetcher) prefetchStorage(id trie.ID, addr common.Address, slots []common.Hash, read bool) error {
// If the state item is only being read, but reads are disabled, return
if read && p.noreads {
return nil
}
// Ensure the subfetcher is still alive
select {
case <-p.term:
return errTerminated
default:
}
fetcher := p.fetchers[id]
if fetcher == nil {
fetcher = newSubfetcher(p.opener, id, addr)
p.fetchers[id] = fetcher
}
return fetcher.scheduleSlots(addr, slots, read)
}
// trie returns the trie matching the root hash, blocking until the fetcher of
// the given trie terminates. If no fetcher exists for the request, nil will be
// returned.
func (p *triePrefetcher) trie(id trie.ID) Trie {
// Bail if no trie was prefetched for this root
fetcher := p.fetchers[id]
if fetcher == nil {
log.Error("Prefetcher missed to load trie", "owner", id.Owner, "root", id.Root)
p.deliveryMissMeter.Mark(1)
return nil
}
// Subfetcher exists, retrieve its trie
return fetcher.peek()
}
// used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the fetcher is.
// nolint:unused
func (p *triePrefetcher) used(id trie.ID, usedAddr []common.Address, usedSlot []common.Hash) {
if fetcher := p.fetchers[id]; fetcher != nil {
fetcher.wait() // ensure the fetcher's idle before poking in its internals
fetcher.usedAddr = append(fetcher.usedAddr, usedAddr...)
fetcher.usedSlot = append(fetcher.usedSlot, usedSlot...)
}
}
// subfetcher is a trie fetcher goroutine responsible for pulling entries for a
// single trie. It is spawned when a new root is encountered and lives until the
// main prefetcher is paused and either all requested items are processed or if
// the trie being worked on is retrieved from the prefetcher.
type subfetcher struct {
id trie.ID // The identifier of the trie being populated
addr common.Address // Address of the account that the trie belongs to
trie Trie // Trie being populated with nodes
opener trieOpener // Handler to open the trie for pulling
tasks []*subfetcherTask // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue
// prefetcher is a background goroutine that preloads trie nodes for a single
// trie. It deduplicates requests and stops when explicitly terminated.
type prefetcher struct {
prefetchRead bool // Whether the state read will trigger preloading
trie Trie // Trie being populated with nodes
tasks []*prefetchTask // Items queued up for retrieval
lock sync.Mutex // Lock protecting the task queue
wake chan struct{} // Wake channel if a new task is scheduled
stop chan struct{} // Channel to interrupt processing
term chan struct{} // Channel to signal interruption
seenReadAddr map[common.Address]struct{} // Tracks the accounts already loaded via read operations
seenWriteAddr map[common.Address]struct{} // Tracks the accounts already loaded via write operations
seenReadSlot map[common.Hash]struct{} // Tracks the storage already loaded via read operations
seenWriteSlot map[common.Hash]struct{} // Tracks the storage already loaded via write operations
dupsRead int // Number of duplicate preload tasks via reads only
dupsWrite int // Number of duplicate preload tasks via writes only
dupsCross int // Number of duplicate preload tasks via read-write-crosses
usedAddr []common.Address // Tracks the accounts used in the end
usedSlot []common.Hash // Tracks the storage used in the end
seenReadAddr map[common.Address]struct{} // Dedup: accounts loaded via reads
seenWriteAddr map[common.Address]struct{} // Dedup: accounts loaded via writes
seenReadSlot map[slotKey]struct{} // Dedup: slots loaded via reads
seenWriteSlot map[slotKey]struct{} // Dedup: slots loaded via writes
}
type subfetcherTaskKind uint8
const (
kindAccount subfetcherTaskKind = iota
kindStorage
)
type subfetcherTask struct {
read bool
kind subfetcherTaskKind
// The list of accounts being pulling in kindAccount type
accounts []common.Address
// The list of storage keys being pulling in kindStorage type
account common.Address
slots []common.Hash
}
// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(opener trieOpener, id trie.ID, addr common.Address) *subfetcher {
sf := &subfetcher{
id: id,
addr: addr,
opener: opener,
// newPrefetcher creates a background goroutine to prefetch state items from the
// given trie.
func newPrefetcher(tr Trie, prefetchRead bool) *prefetcher {
p := &prefetcher{
prefetchRead: prefetchRead,
trie: tr,
wake: make(chan struct{}, 1),
stop: make(chan struct{}),
term: make(chan struct{}),
seenReadAddr: make(map[common.Address]struct{}),
seenWriteAddr: make(map[common.Address]struct{}),
seenReadSlot: make(map[common.Hash]struct{}),
seenWriteSlot: make(map[common.Hash]struct{}),
seenReadSlot: make(map[slotKey]struct{}),
seenWriteSlot: make(map[slotKey]struct{}),
}
go sf.loop()
return sf
go p.loop()
return p
}
// scheduleAccounts adds a batch of accounts to the queue to prefetch.
func (sf *subfetcher) scheduleAccounts(addrs []common.Address, read bool) error {
// Ensure the subfetcher is still alive
// scheduleAccounts adds a batch of accounts to the prefetch queue.
func (p *prefetcher) scheduleAccounts(addrs []common.Address, read bool) error {
select {
case <-sf.term:
case <-p.term:
return errTerminated
default:
}
// Append the tasks to the current queue
sf.lock.Lock()
sf.tasks = append(sf.tasks, &subfetcherTask{
if !p.prefetchRead && read {
return nil
}
p.lock.Lock()
p.tasks = append(p.tasks, &prefetchTask{
read: read,
kind: kindAccount,
accounts: addrs,
})
sf.lock.Unlock()
p.lock.Unlock()
// Notify the background thread to execute scheduled tasks
select {
case sf.wake <- struct{}{}:
// Wake signal sent
case p.wake <- struct{}{}:
default:
// Wake signal not sent as a previous one is already queued
}
return nil
}
// scheduleSlots adds a batch of storage slots to the queue to prefetch.
func (sf *subfetcher) scheduleSlots(addr common.Address, slots []common.Hash, read bool) error {
// Ensure the subfetcher is still alive
// scheduleSlots adds a batch of storage slots to the prefetch queue.
func (p *prefetcher) scheduleSlots(addr common.Address, slots []common.Hash, read bool) error {
select {
case <-sf.term:
case <-p.term:
return errTerminated
default:
}
// Append the tasks to the current queue
sf.lock.Lock()
sf.tasks = append(sf.tasks, &subfetcherTask{
if !p.prefetchRead && read {
return nil
}
p.lock.Lock()
p.tasks = append(p.tasks, &prefetchTask{
read: read,
kind: kindStorage,
account: addr,
slots: slots,
})
sf.lock.Unlock()
p.lock.Unlock()
// Notify the background thread to execute scheduled tasks
select {
case sf.wake <- struct{}{}:
// Wake signal sent
case p.wake <- struct{}{}:
default:
// Wake signal not sent as a previous one is already queued
}
return nil
}
// wait blocks until the subfetcher terminates. This method is used to block on
// an async termination before accessing internal fields from the fetcher.
func (sf *subfetcher) wait() {
<-sf.term
}
// peek retrieves the fetcher's trie, populated with any pre-fetched data. The
// returned trie will be a shallow copy, so modifying it will break subsequent
// peeks for the original data. The method will block until all the scheduled
// data has been loaded and the fethcer terminated.
func (sf *subfetcher) peek() Trie {
// Block until the fetcher terminates, then retrieve the trie
sf.wait()
return sf.trie
}
// terminate requests the subfetcher to stop accepting new tasks and spin down
// as soon as everything is loaded. Depending on the async parameter, the method
// will either block until all disk loads finish or return immediately.
func (sf *subfetcher) terminate(async bool) {
// terminate requests the prefetcher to stop and optionally waits for it.
func (p *prefetcher) terminate() {
select {
case <-sf.stop:
case <-p.stop:
default:
close(sf.stop)
close(p.stop)
}
if async {
return
}
<-sf.term
<-p.term
}
// openTrie resolves the target trie from database for prefetching.
func (sf *subfetcher) openTrie() error {
tr, err := sf.opener(sf.id, sf.addr)
if err != nil {
log.Warn("Trie prefetcher failed opening trie", "id", sf.id, "err", err)
return err
}
sf.trie = tr
return nil
}
// loop processes prefetch tasks until terminated.
func (p *prefetcher) loop() {
defer close(p.term)
// loop loads newly-scheduled trie tasks as they are received and loads them, stopping
// when requested.
func (sf *subfetcher) loop() {
// No matter how the loop stops, signal anyone waiting that it's terminated
defer close(sf.term)
if err := sf.openTrie(); err != nil {
return
}
for {
select {
case <-sf.wake:
// Execute all remaining tasks in a single run
sf.lock.Lock()
tasks := sf.tasks
sf.tasks = nil
sf.lock.Unlock()
case <-p.wake:
p.lock.Lock()
tasks := p.tasks
p.tasks = nil
p.lock.Unlock()
var (
// Account tasks
addresses []common.Address
// Slot tasks
addrs []common.Address
slots = make(map[common.Address][][]byte)
)
for _, task := range tasks {
if task.kind == kindAccount {
for _, addr := range task.accounts {
if task.read {
if _, ok := sf.seenReadAddr[addr]; ok {
sf.dupsRead++
continue
}
if _, ok := sf.seenWriteAddr[addr]; ok {
sf.dupsCross++
continue
}
sf.seenReadAddr[addr] = struct{}{}
} else {
if _, ok := sf.seenReadAddr[addr]; ok {
sf.dupsCross++
continue
}
if _, ok := sf.seenWriteAddr[addr]; ok {
sf.dupsWrite++
continue
}
sf.seenWriteAddr[addr] = struct{}{}
if p.dedupAddr(addr, task.read) {
continue
}
addresses = append(addresses, addr)
addrs = append(addrs, addr)
}
} else {
var keys [][]byte
for _, slot := range task.slots {
if task.read {
if _, ok := sf.seenReadSlot[slot]; ok {
sf.dupsRead++
continue
}
if _, ok := sf.seenWriteSlot[slot]; ok {
sf.dupsCross++
continue
}
sf.seenReadSlot[slot] = struct{}{}
} else {
if _, ok := sf.seenReadSlot[slot]; ok {
sf.dupsCross++
continue
}
if _, ok := sf.seenWriteSlot[slot]; ok {
sf.dupsWrite++
continue
}
sf.seenWriteSlot[slot] = struct{}{}
if p.dedupSlot(task.account, slot, task.read) {
continue
}
keys = append(keys, slot.Bytes())
slots[task.account] = append(slots[task.account], slot.Bytes())
}
slots[task.account] = append(slots[task.account], keys...)
}
}
if len(addresses) != 0 {
if err := sf.trie.PrefetchAccount(addresses); err != nil {
if len(addrs) > 0 {
if err := p.trie.PrefetchAccount(addrs); err != nil {
log.Error("Failed to prefetch accounts", "err", err)
}
}
for addr, keys := range slots {
if err := sf.trie.PrefetchStorage(addr, keys); err != nil {
if err := p.trie.PrefetchStorage(addr, keys); err != nil {
log.Error("Failed to prefetch storage", "err", err)
}
}
case <-sf.stop:
// Termination is requested, abort if no more tasks are pending. If
// there are some, exhaust them first.
sf.lock.Lock()
done := sf.tasks == nil
sf.lock.Unlock()
case <-p.stop:
p.lock.Lock()
done := p.tasks == nil
p.lock.Unlock()
if done {
return
}
// Some tasks are pending, loop and pick them up (that wake branch
// will be selected eventually, whilst stop remains closed to this
// branch will also run afterwards).
}
}
}
// dedupAddr returns true if addr was already seen for this read/write category.
func (p *prefetcher) dedupAddr(addr common.Address, read bool) bool {
if read {
if _, ok := p.seenReadAddr[addr]; ok {
return true
}
if _, ok := p.seenWriteAddr[addr]; ok {
return true
}
p.seenReadAddr[addr] = struct{}{}
} else {
if _, ok := p.seenReadAddr[addr]; ok {
return true
}
if _, ok := p.seenWriteAddr[addr]; ok {
return true
}
p.seenWriteAddr[addr] = struct{}{}
}
return false
}
// dedupSlot returns true if slot was already seen for this read/write category.
func (p *prefetcher) dedupSlot(addr common.Address, slot common.Hash, read bool) bool {
key := slotKey{addr: addr, slot: slot}
if read {
if _, ok := p.seenReadSlot[key]; ok {
return true
}
if _, ok := p.seenWriteSlot[key]; ok {
return true
}
p.seenReadSlot[key] = struct{}{}
} else {
if _, ok := p.seenReadSlot[key]; ok {
return true
}
if _, ok := p.seenWriteSlot[key]; ok {
return true
}
p.seenWriteSlot[key] = struct{}{}
}
return false
}

View file

@ -21,102 +21,71 @@ import (
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/tracing"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/testrand"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/triedb"
"github.com/holiman/uint256"
)
func filledStateDB() *StateDB {
state, _ := New(types.EmptyRootHash, NewDatabaseForTesting())
// Create an account and check if the retrieved balance is correct
addr := common.HexToAddress("0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe")
skey := common.HexToHash("aaa")
sval := common.HexToHash("bbb")
state.SetBalance(addr, uint256.NewInt(42), tracing.BalanceChangeUnspecified) // Change the account trie
state.SetCode(addr, []byte("hello"), tracing.CodeChangeUnspecified) // Change an external metadata
state.SetState(addr, skey, sval) // Change the storage trie
state.SetBalance(addr, uint256.NewInt(42), tracing.BalanceChangeUnspecified)
state.SetCode(addr, []byte("hello"), tracing.CodeChangeUnspecified)
state.SetState(addr, skey, sval)
for i := 0; i < 100; i++ {
sk := common.BigToHash(big.NewInt(int64(i)))
state.SetState(addr, sk, sk) // Change the storage trie
state.SetState(addr, sk, sk)
}
return state
}
func TestUseAfterTerminate(t *testing.T) {
func TestSubfetcherUseAfterTerminate(t *testing.T) {
db := filledStateDB()
opener := func(id trie.ID, addr common.Address) (Trie, error) {
if db.db.TrieDB().IsVerkle() {
return db.db.OpenTrie(id.StateRoot)
}
if id.Owner != (common.Hash{}) {
return db.db.OpenStorageTrie(id.StateRoot, addr, id.Root, nil)
}
return db.db.OpenTrie(id.StateRoot)
// Open a trie and create a subfetcher for it.
id := trie.StateTrieID(db.originalRoot)
tr, err := trie.NewStateTrie(id, db.db.TrieDB())
if err != nil {
t.Fatalf("Failed to open trie: %v", err)
}
prefetcher := newTriePrefetcher(opener, db.originalRoot, "", true)
sf := newPrefetcher(tr, false)
addr := common.HexToAddress("0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe")
// Scheduling before termination should succeed.
if err := sf.scheduleAccounts([]common.Address{addr}, false); err != nil {
t.Fatalf("Schedule failed before terminate: %v", err)
}
// Terminate synchronously — waits for pending tasks.
sf.terminate()
// Scheduling after termination should fail.
if err := sf.scheduleAccounts([]common.Address{addr}, false); err == nil {
t.Fatal("Schedule succeeded after terminate")
}
}
func TestWrapTriePrefetch(t *testing.T) {
db := filledStateDB()
// Create a wrapTrie with prefetching enabled.
id := trie.StateTrieID(db.originalRoot)
if err := prefetcher.prefetchAccounts(*id, []common.Address{addr}, false); err != nil {
t.Errorf("Prefetch failed before terminate: %v", err)
}
prefetcher.terminate(false)
if err := prefetcher.prefetchAccounts(*id, []common.Address{addr}, false); err == nil {
t.Errorf("Prefetch succeeded after terminate: %v", err)
}
if tr := prefetcher.trie(*id); tr == nil {
t.Errorf("Prefetcher returned nil trie after terminate")
}
}
func TestVerklePrefetcher(t *testing.T) {
disk := rawdb.NewMemoryDatabase()
db := triedb.NewDatabase(disk, triedb.VerkleDefaults)
sdb := NewDatabase(db, nil)
state, err := New(types.EmptyRootHash, sdb)
tr, err := newWrapTrie(id, db.db.TrieDB(), true, true)
if err != nil {
t.Fatalf("failed to initialize state: %v", err)
t.Fatalf("Failed to create wrapTrie: %v", err)
}
// Create an account and check if the retrieved balance is correct
addr := testrand.Address()
skey := testrand.Hash()
sval := testrand.Hash()
addr := common.HexToAddress("0xaffeaffeaffeaffeaffeaffeaffeaffeaffeaffe")
state.SetBalance(addr, uint256.NewInt(42), tracing.BalanceChangeUnspecified) // Change the account trie
state.SetCode(addr, []byte("hello"), tracing.CodeChangeUnspecified) // Change an external metadata
state.SetState(addr, skey, sval) // Change the storage trie
root, _ := state.Commit(0, true, false)
// Schedule some prefetch work.
tr.prefetchAccounts([]common.Address{addr}, false)
state, _ = New(root, sdb)
opener := func(id trie.ID, addr common.Address) (Trie, error) {
return sdb.OpenTrie(id.StateRoot)
}
fetcher := newTriePrefetcher(opener, root, "", false)
// Read account
id := trie.StateTrieID(root)
fetcher.prefetchAccounts(*id, []common.Address{addr}, false)
// Read storage slot
fetcher.prefetchStorage(*id, addr, []common.Hash{skey}, false)
fetcher.terminate(false)
accountTrie := fetcher.trie(*id)
storageTrie := fetcher.trie(*id)
rootA := accountTrie.Hash()
rootB := storageTrie.Hash()
if rootA != rootB {
t.Fatal("Two different tries are retrieved")
// Terminate and verify the trie is usable.
tr.term()
if tr.Hash() == (common.Hash{}) {
t.Fatal("wrapTrie hash is zero after prefetch")
}
}