diff --git a/core/state/database.go b/core/state/database.go index b46e5d500d..55fb3a0d97 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -81,11 +81,19 @@ type Trie interface { // be returned. GetAccount(address common.Address) (*types.StateAccount, error) + // PrefetchAccount attempts to resolve specific accounts from the database + // to accelerate subsequent trie operations. + PrefetchAccount([]common.Address) error + // GetStorage returns the value for key stored in the trie. The value bytes // must not be modified by the caller. If a node was not found in the database, // a trie.MissingNodeError is returned. GetStorage(addr common.Address, key []byte) ([]byte, error) + // PrefetchStorage attempts to resolve specific storage slots from the database + // to accelerate subsequent trie operations. + PrefetchStorage(addr common.Address, keys [][]byte) error + // UpdateAccount abstracts an account write to the trie. It encodes the // provided account object with associated algorithm and then updates it // in the trie with provided address. diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index 6f492cf9f2..a9faddcdff 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -388,6 +388,10 @@ func (sf *subfetcher) loop() { sf.tasks = nil sf.lock.Unlock() + var ( + addresses []common.Address + slots [][]byte + ) for _, task := range tasks { if task.addr != nil { key := *task.addr @@ -400,6 +404,7 @@ func (sf *subfetcher) loop() { sf.dupsCross++ continue } + sf.seenReadAddr[key] = struct{}{} } else { if _, ok := sf.seenReadAddr[key]; ok { sf.dupsCross++ @@ -409,7 +414,9 @@ func (sf *subfetcher) loop() { sf.dupsWrite++ continue } + sf.seenWriteAddr[key] = struct{}{} } + addresses = append(addresses, *task.addr) } else { key := *task.slot if task.read { @@ -421,6 +428,7 @@ func (sf *subfetcher) loop() { sf.dupsCross++ continue } + sf.seenReadSlot[key] = struct{}{} } else { if _, ok := sf.seenReadSlot[key]; ok { sf.dupsCross++ @@ -430,25 +438,19 @@ func (sf *subfetcher) loop() { sf.dupsWrite++ continue } + sf.seenWriteSlot[key] = struct{}{} } + slots = append(slots, key.Bytes()) } - if task.addr != nil { - sf.trie.GetAccount(*task.addr) - } else { - sf.trie.GetStorage(sf.addr, (*task.slot)[:]) + } + if len(addresses) != 0 { + if err := sf.trie.PrefetchAccount(addresses); err != nil { + log.Error("Failed to prefetch accounts", "err", err) } - if task.read { - if task.addr != nil { - sf.seenReadAddr[*task.addr] = struct{}{} - } else { - sf.seenReadSlot[*task.slot] = struct{}{} - } - } else { - if task.addr != nil { - sf.seenWriteAddr[*task.addr] = struct{}{} - } else { - sf.seenWriteSlot[*task.slot] = struct{}{} - } + } + if len(slots) != 0 { + if err := sf.trie.PrefetchStorage(sf.addr, slots); err != nil { + log.Error("Failed to prefetch storage", "err", err) } } diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index c0ce705c77..1c738c1e38 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -111,12 +111,6 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c fails.Add(1) return nil // Ugh, something went horribly wrong, bail out } - // Pre-load trie nodes for the intermediate root. - // - // This operation incurs significant memory allocations due to - // trie hashing and node decoding. TODO(rjl493456442): investigate - // ways to mitigate this overhead. - stateCpy.IntermediateRoot(true) return nil }) } diff --git a/trie/secure_trie.go b/trie/secure_trie.go index 0424ecb6e5..408fe64051 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -105,19 +105,6 @@ func (t *StateTrie) MustGet(key []byte) []byte { return t.trie.MustGet(crypto.Keccak256(key)) } -// GetStorage attempts to retrieve a storage slot with provided account address -// and slot key. The value bytes must not be modified by the caller. -// If the specified storage slot is not in the trie, nil will be returned. -// If a trie node is not found in the database, a MissingNodeError is returned. -func (t *StateTrie) GetStorage(_ common.Address, key []byte) ([]byte, error) { - enc, err := t.trie.Get(crypto.Keccak256(key)) - if err != nil || len(enc) == 0 { - return nil, err - } - _, content, _, err := rlp.Split(enc) - return content, err -} - // GetAccount attempts to retrieve an account with provided account address. // If the specified account is not in the trie, nil will be returned. // If a trie node is not found in the database, a MissingNodeError is returned. @@ -144,6 +131,39 @@ func (t *StateTrie) GetAccountByHash(addrHash common.Hash) (*types.StateAccount, return ret, err } +// PrefetchAccount attempts to resolve specific accounts from the database +// to accelerate subsequent trie operations. +func (t *StateTrie) PrefetchAccount(addresses []common.Address) error { + var keys [][]byte + for _, addr := range addresses { + keys = append(keys, crypto.Keccak256(addr.Bytes())) + } + return t.trie.Prefetch(keys) +} + +// GetStorage attempts to retrieve a storage slot with provided account address +// and slot key. The value bytes must not be modified by the caller. +// If the specified storage slot is not in the trie, nil will be returned. +// If a trie node is not found in the database, a MissingNodeError is returned. +func (t *StateTrie) GetStorage(_ common.Address, key []byte) ([]byte, error) { + enc, err := t.trie.Get(crypto.Keccak256(key)) + if err != nil || len(enc) == 0 { + return nil, err + } + _, content, _, err := rlp.Split(enc) + return content, err +} + +// PrefetchStorage attempts to resolve specific storage slots from the database +// to accelerate subsequent trie operations. +func (t *StateTrie) PrefetchStorage(_ common.Address, keys [][]byte) error { + var keylist [][]byte + for _, key := range keys { + keylist = append(keylist, crypto.Keccak256(key)) + } + return t.trie.Prefetch(keylist) +} + // GetNode attempts to retrieve a trie node by compact-encoded path. It is not // possible to use keybyte-encoding as the path might contain odd nibbles. // If the specified trie node is not in the trie, nil will be returned. diff --git a/trie/tracer.go b/trie/tracer.go index 206e8aa20d..2e2d0928b5 100644 --- a/trie/tracer.go +++ b/trie/tracer.go @@ -19,6 +19,7 @@ package trie import ( "maps" "slices" + "sync" ) // opTracer tracks the changes of trie nodes. During the trie operations, @@ -102,6 +103,7 @@ func (t *opTracer) deletedList() [][]byte { // handling the concurrency issues by themselves. type prevalueTracer struct { data map[string][]byte + lock sync.RWMutex } // newPrevalueTracer initializes the tracer for capturing resolved trie nodes. @@ -115,18 +117,27 @@ func newPrevalueTracer() *prevalueTracer { // blob internally. Do not modify the value outside this function, // as it is not deep-copied. func (t *prevalueTracer) put(path []byte, val []byte) { + t.lock.Lock() + defer t.lock.Unlock() + t.data[string(path)] = val } // get returns the cached trie node value. If the node is not found, nil will // be returned. func (t *prevalueTracer) get(path []byte) []byte { + t.lock.RLock() + defer t.lock.RUnlock() + return t.data[string(path)] } // hasList returns a list of flags indicating whether the corresponding trie nodes // specified by the path exist in the trie. func (t *prevalueTracer) hasList(list [][]byte) []bool { + t.lock.RLock() + defer t.lock.RUnlock() + exists := make([]bool, 0, len(list)) for _, path := range list { _, ok := t.data[string(path)] @@ -137,16 +148,25 @@ func (t *prevalueTracer) hasList(list [][]byte) []bool { // values returns a list of values of the cached trie nodes. func (t *prevalueTracer) values() [][]byte { + t.lock.RLock() + defer t.lock.RUnlock() + return slices.Collect(maps.Values(t.data)) } // reset resets the cached content in the prevalueTracer. func (t *prevalueTracer) reset() { + t.lock.Lock() + defer t.lock.Unlock() + clear(t.data) } // copy returns a copied prevalueTracer instance. func (t *prevalueTracer) copy() *prevalueTracer { + t.lock.RLock() + defer t.lock.RUnlock() + // Shadow clone is used, as the cached trie node values are immutable return &prevalueTracer{ data: maps.Clone(t.data), diff --git a/trie/tracer_test.go b/trie/tracer_test.go index f2a4287461..695570fd0d 100644 --- a/trie/tracer_test.go +++ b/trie/tracer_test.go @@ -70,7 +70,6 @@ func testTrieOpTracer(t *testing.T, vals []struct{ k, v string }) { } insertSet := copySet(trie.opTracer.inserts) // copy before commit deleteSet := copySet(trie.opTracer.deletes) // copy before commit - root, nodes := trie.Commit(false) db.Update(root, types.EmptyRootHash, trienode.NewWithNodeSet(nodes)) diff --git a/trie/transition.go b/trie/transition.go index ad3f782b75..1670b8e793 100644 --- a/trie/transition.go +++ b/trie/transition.go @@ -78,6 +78,17 @@ func (t *TransitionTrie) GetStorage(addr common.Address, key []byte) ([]byte, er return t.base.GetStorage(addr, key) } +// PrefetchStorage attempts to resolve specific storage slots from the database +// to accelerate subsequent trie operations. +func (t *TransitionTrie) PrefetchStorage(addr common.Address, keys [][]byte) error { + for _, key := range keys { + if _, err := t.GetStorage(addr, key); err != nil { + return err + } + } + return nil +} + // GetAccount abstract an account read from the trie. func (t *TransitionTrie) GetAccount(address common.Address) (*types.StateAccount, error) { data, err := t.overlay.GetAccount(address) @@ -94,6 +105,17 @@ func (t *TransitionTrie) GetAccount(address common.Address) (*types.StateAccount return t.base.GetAccount(address) } +// PrefetchAccount attempts to resolve specific accounts from the database +// to accelerate subsequent trie operations. +func (t *TransitionTrie) PrefetchAccount(addresses []common.Address) error { + for _, addr := range addresses { + if _, err := t.GetAccount(addr); err != nil { + return err + } + } + return nil +} + // UpdateStorage associates key with value in the trie. If value has length zero, any // existing value is deleted from the trie. The value bytes must not be modified // by the caller while they are stored in the trie. @@ -173,7 +195,7 @@ func (t *TransitionTrie) IsVerkle() bool { return true } -// UpdateStems updates a group of values, given the stem they are using. If +// UpdateStem updates a group of values, given the stem they are using. If // a value already exists, it is overwritten. func (t *TransitionTrie) UpdateStem(key []byte, values [][]byte) error { trie := t.overlay diff --git a/trie/trie.go b/trie/trie.go index 307036faa9..6c998b3159 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie/trienode" "github.com/ethereum/go-ethereum/triedb/database" + "golang.org/x/sync/errgroup" ) // Trie represents a Merkle Patricia Trie. Use New to create a trie that operates @@ -194,6 +195,51 @@ func (t *Trie) get(origNode node, key []byte, pos int) (value []byte, newnode no } } +// Prefetch attempts to resolve the leaves and intermediate trie nodes +// specified by the key list in parallel. The results are silently +// discarded to simplify the function. +func (t *Trie) Prefetch(keylist [][]byte) error { + // Short circuit if the trie is already committed and not usable. + if t.committed { + return ErrCommitted + } + // Resolve the trie nodes sequentially if there are not too many + // trie nodes in the trie. + fn, ok := t.root.(*fullNode) + if !ok || len(keylist) < 16 { + for _, key := range keylist { + _, err := t.Get(key) + if err != nil { + return err + } + } + return nil + } + var ( + keys = make(map[byte][][]byte) + eg errgroup.Group + ) + for _, key := range keylist { + hkey := keybytesToHex(key) + keys[hkey[0]] = append(keys[hkey[0]], hkey) + } + for pos, ks := range keys { + eg.Go(func() error { + for _, k := range ks { + _, newnode, didResolve, err := t.get(fn.Children[pos], k, 1) + if err == nil && didResolve { + fn.Children[pos] = newnode + } + if err != nil { + return err + } + } + return nil + }) + } + return eg.Wait() +} + // MustGetNode is a wrapper of GetNode and will omit any encountered error but // just print out an error message. func (t *Trie) MustGetNode(path []byte) ([]byte, int) { diff --git a/trie/trie_test.go b/trie/trie_test.go index 68759c37c0..22c3494f47 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -1499,3 +1499,83 @@ func testTrieCopyNewTrie(t *testing.T, entries []kv) { t.Errorf("Hash mismatch: old %v, new %v", hash, tr.Hash()) } } + +// goos: darwin +// goarch: arm64 +// pkg: github.com/ethereum/go-ethereum/trie +// cpu: Apple M1 Pro +// BenchmarkTriePrefetch +// BenchmarkTriePrefetch-8 9961 100706 ns/op +func BenchmarkTriePrefetch(b *testing.B) { + db := newTestDatabase(rawdb.NewMemoryDatabase(), rawdb.HashScheme) + tr := NewEmpty(db) + vals := make(map[string]*kv) + for i := 0; i < 3000; i++ { + value := &kv{ + k: randBytes(32), + v: randBytes(20), + t: false, + } + tr.MustUpdate(value.k, value.v) + vals[string(value.k)] = value + } + root, nodes := tr.Commit(false) + db.Update(root, types.EmptyRootHash, trienode.NewWithNodeSet(nodes)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + tr, err := New(TrieID(root), db) + if err != nil { + b.Fatalf("Failed to open the trie") + } + var keys [][]byte + for k := range vals { + keys = append(keys, []byte(k)) + if len(keys) > 64 { + break + } + } + tr.Prefetch(keys) + } +} + +// goos: darwin +// goarch: arm64 +// pkg: github.com/ethereum/go-ethereum/trie +// cpu: Apple M1 Pro +// BenchmarkTrieSeqPrefetch +// BenchmarkTrieSeqPrefetch-8 12879 96710 ns/op +func BenchmarkTrieSeqPrefetch(b *testing.B) { + db := newTestDatabase(rawdb.NewMemoryDatabase(), rawdb.HashScheme) + tr := NewEmpty(db) + vals := make(map[string]*kv) + for i := 0; i < 3000; i++ { + value := &kv{ + k: randBytes(32), + v: randBytes(20), + t: false, + } + tr.MustUpdate(value.k, value.v) + vals[string(value.k)] = value + } + root, nodes := tr.Commit(false) + db.Update(root, types.EmptyRootHash, trienode.NewWithNodeSet(nodes)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + tr, err := New(TrieID(root), db) + if err != nil { + b.Fatalf("Failed to open the trie") + } + var keys [][]byte + for k := range vals { + keys = append(keys, []byte(k)) + if len(keys) > 64 { + break + } + } + for _, k := range keys { + tr.Get(k) + } + } +} diff --git a/trie/verkle.go b/trie/verkle.go index c89a8f1d36..c8b9a6dd46 100644 --- a/trie/verkle.go +++ b/trie/verkle.go @@ -108,6 +108,17 @@ func (t *VerkleTrie) GetAccount(addr common.Address) (*types.StateAccount, error return acc, nil } +// PrefetchAccount attempts to resolve specific accounts from the database +// to accelerate subsequent trie operations. +func (t *VerkleTrie) PrefetchAccount(addresses []common.Address) error { + for _, addr := range addresses { + if _, err := t.GetAccount(addr); err != nil { + return err + } + } + return nil +} + // GetStorage implements state.Trie, retrieving the storage slot with the specified // account address and storage key. If the specified slot is not in the verkle tree, // nil will be returned. If the tree is corrupted, an error will be returned. @@ -120,6 +131,17 @@ func (t *VerkleTrie) GetStorage(addr common.Address, key []byte) ([]byte, error) return common.TrimLeftZeroes(val), nil } +// PrefetchStorage attempts to resolve specific storage slots from the database +// to accelerate subsequent trie operations. +func (t *VerkleTrie) PrefetchStorage(addr common.Address, keys [][]byte) error { + for _, key := range keys { + if _, err := t.GetStorage(addr, key); err != nil { + return err + } + } + return nil +} + // UpdateAccount implements state.Trie, writing the provided account into the tree. // If the tree is corrupted, an error will be returned. func (t *VerkleTrie) UpdateAccount(addr common.Address, acc *types.StateAccount, codeLen int) error {