mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-02-26 07:37:20 +00:00
trie, core/state: introduce trie Prefetch for optimizing preload (#32134)
This pull introduces a `Prefetch` operation in the trie to prefetch trie nodes in parallel. It is used by the `triePrefetcher` to accelerate state loading and improve overall chain processing performance.
This commit is contained in:
parent
9ce40d19a8
commit
bf8f63dcd2
10 changed files with 250 additions and 37 deletions
|
|
@ -81,11 +81,19 @@ type Trie interface {
|
|||
// be returned.
|
||||
GetAccount(address common.Address) (*types.StateAccount, error)
|
||||
|
||||
// PrefetchAccount attempts to resolve specific accounts from the database
|
||||
// to accelerate subsequent trie operations.
|
||||
PrefetchAccount([]common.Address) error
|
||||
|
||||
// GetStorage returns the value for key stored in the trie. The value bytes
|
||||
// must not be modified by the caller. If a node was not found in the database,
|
||||
// a trie.MissingNodeError is returned.
|
||||
GetStorage(addr common.Address, key []byte) ([]byte, error)
|
||||
|
||||
// PrefetchStorage attempts to resolve specific storage slots from the database
|
||||
// to accelerate subsequent trie operations.
|
||||
PrefetchStorage(addr common.Address, keys [][]byte) error
|
||||
|
||||
// UpdateAccount abstracts an account write to the trie. It encodes the
|
||||
// provided account object with associated algorithm and then updates it
|
||||
// in the trie with provided address.
|
||||
|
|
|
|||
|
|
@ -388,6 +388,10 @@ func (sf *subfetcher) loop() {
|
|||
sf.tasks = nil
|
||||
sf.lock.Unlock()
|
||||
|
||||
var (
|
||||
addresses []common.Address
|
||||
slots [][]byte
|
||||
)
|
||||
for _, task := range tasks {
|
||||
if task.addr != nil {
|
||||
key := *task.addr
|
||||
|
|
@ -400,6 +404,7 @@ func (sf *subfetcher) loop() {
|
|||
sf.dupsCross++
|
||||
continue
|
||||
}
|
||||
sf.seenReadAddr[key] = struct{}{}
|
||||
} else {
|
||||
if _, ok := sf.seenReadAddr[key]; ok {
|
||||
sf.dupsCross++
|
||||
|
|
@ -409,7 +414,9 @@ func (sf *subfetcher) loop() {
|
|||
sf.dupsWrite++
|
||||
continue
|
||||
}
|
||||
sf.seenWriteAddr[key] = struct{}{}
|
||||
}
|
||||
addresses = append(addresses, *task.addr)
|
||||
} else {
|
||||
key := *task.slot
|
||||
if task.read {
|
||||
|
|
@ -421,6 +428,7 @@ func (sf *subfetcher) loop() {
|
|||
sf.dupsCross++
|
||||
continue
|
||||
}
|
||||
sf.seenReadSlot[key] = struct{}{}
|
||||
} else {
|
||||
if _, ok := sf.seenReadSlot[key]; ok {
|
||||
sf.dupsCross++
|
||||
|
|
@ -430,25 +438,19 @@ func (sf *subfetcher) loop() {
|
|||
sf.dupsWrite++
|
||||
continue
|
||||
}
|
||||
sf.seenWriteSlot[key] = struct{}{}
|
||||
}
|
||||
slots = append(slots, key.Bytes())
|
||||
}
|
||||
}
|
||||
if task.addr != nil {
|
||||
sf.trie.GetAccount(*task.addr)
|
||||
} else {
|
||||
sf.trie.GetStorage(sf.addr, (*task.slot)[:])
|
||||
if len(addresses) != 0 {
|
||||
if err := sf.trie.PrefetchAccount(addresses); err != nil {
|
||||
log.Error("Failed to prefetch accounts", "err", err)
|
||||
}
|
||||
if task.read {
|
||||
if task.addr != nil {
|
||||
sf.seenReadAddr[*task.addr] = struct{}{}
|
||||
} else {
|
||||
sf.seenReadSlot[*task.slot] = struct{}{}
|
||||
}
|
||||
} else {
|
||||
if task.addr != nil {
|
||||
sf.seenWriteAddr[*task.addr] = struct{}{}
|
||||
} else {
|
||||
sf.seenWriteSlot[*task.slot] = struct{}{}
|
||||
}
|
||||
if len(slots) != 0 {
|
||||
if err := sf.trie.PrefetchStorage(sf.addr, slots); err != nil {
|
||||
log.Error("Failed to prefetch storage", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -111,12 +111,6 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
|
|||
fails.Add(1)
|
||||
return nil // Ugh, something went horribly wrong, bail out
|
||||
}
|
||||
// Pre-load trie nodes for the intermediate root.
|
||||
//
|
||||
// This operation incurs significant memory allocations due to
|
||||
// trie hashing and node decoding. TODO(rjl493456442): investigate
|
||||
// ways to mitigate this overhead.
|
||||
stateCpy.IntermediateRoot(true)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -105,19 +105,6 @@ func (t *StateTrie) MustGet(key []byte) []byte {
|
|||
return t.trie.MustGet(crypto.Keccak256(key))
|
||||
}
|
||||
|
||||
// GetStorage attempts to retrieve a storage slot with provided account address
|
||||
// and slot key. The value bytes must not be modified by the caller.
|
||||
// If the specified storage slot is not in the trie, nil will be returned.
|
||||
// If a trie node is not found in the database, a MissingNodeError is returned.
|
||||
func (t *StateTrie) GetStorage(_ common.Address, key []byte) ([]byte, error) {
|
||||
enc, err := t.trie.Get(crypto.Keccak256(key))
|
||||
if err != nil || len(enc) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
_, content, _, err := rlp.Split(enc)
|
||||
return content, err
|
||||
}
|
||||
|
||||
// GetAccount attempts to retrieve an account with provided account address.
|
||||
// If the specified account is not in the trie, nil will be returned.
|
||||
// If a trie node is not found in the database, a MissingNodeError is returned.
|
||||
|
|
@ -144,6 +131,39 @@ func (t *StateTrie) GetAccountByHash(addrHash common.Hash) (*types.StateAccount,
|
|||
return ret, err
|
||||
}
|
||||
|
||||
// PrefetchAccount attempts to resolve specific accounts from the database
|
||||
// to accelerate subsequent trie operations.
|
||||
func (t *StateTrie) PrefetchAccount(addresses []common.Address) error {
|
||||
var keys [][]byte
|
||||
for _, addr := range addresses {
|
||||
keys = append(keys, crypto.Keccak256(addr.Bytes()))
|
||||
}
|
||||
return t.trie.Prefetch(keys)
|
||||
}
|
||||
|
||||
// GetStorage attempts to retrieve a storage slot with provided account address
|
||||
// and slot key. The value bytes must not be modified by the caller.
|
||||
// If the specified storage slot is not in the trie, nil will be returned.
|
||||
// If a trie node is not found in the database, a MissingNodeError is returned.
|
||||
func (t *StateTrie) GetStorage(_ common.Address, key []byte) ([]byte, error) {
|
||||
enc, err := t.trie.Get(crypto.Keccak256(key))
|
||||
if err != nil || len(enc) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
_, content, _, err := rlp.Split(enc)
|
||||
return content, err
|
||||
}
|
||||
|
||||
// PrefetchStorage attempts to resolve specific storage slots from the database
|
||||
// to accelerate subsequent trie operations.
|
||||
func (t *StateTrie) PrefetchStorage(_ common.Address, keys [][]byte) error {
|
||||
var keylist [][]byte
|
||||
for _, key := range keys {
|
||||
keylist = append(keylist, crypto.Keccak256(key))
|
||||
}
|
||||
return t.trie.Prefetch(keylist)
|
||||
}
|
||||
|
||||
// GetNode attempts to retrieve a trie node by compact-encoded path. It is not
|
||||
// possible to use keybyte-encoding as the path might contain odd nibbles.
|
||||
// If the specified trie node is not in the trie, nil will be returned.
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package trie
|
|||
import (
|
||||
"maps"
|
||||
"slices"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// opTracer tracks the changes of trie nodes. During the trie operations,
|
||||
|
|
@ -102,6 +103,7 @@ func (t *opTracer) deletedList() [][]byte {
|
|||
// handling the concurrency issues by themselves.
|
||||
type prevalueTracer struct {
|
||||
data map[string][]byte
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
// newPrevalueTracer initializes the tracer for capturing resolved trie nodes.
|
||||
|
|
@ -115,18 +117,27 @@ func newPrevalueTracer() *prevalueTracer {
|
|||
// blob internally. Do not modify the value outside this function,
|
||||
// as it is not deep-copied.
|
||||
func (t *prevalueTracer) put(path []byte, val []byte) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
t.data[string(path)] = val
|
||||
}
|
||||
|
||||
// get returns the cached trie node value. If the node is not found, nil will
|
||||
// be returned.
|
||||
func (t *prevalueTracer) get(path []byte) []byte {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
return t.data[string(path)]
|
||||
}
|
||||
|
||||
// hasList returns a list of flags indicating whether the corresponding trie nodes
|
||||
// specified by the path exist in the trie.
|
||||
func (t *prevalueTracer) hasList(list [][]byte) []bool {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
exists := make([]bool, 0, len(list))
|
||||
for _, path := range list {
|
||||
_, ok := t.data[string(path)]
|
||||
|
|
@ -137,16 +148,25 @@ func (t *prevalueTracer) hasList(list [][]byte) []bool {
|
|||
|
||||
// values returns a list of values of the cached trie nodes.
|
||||
func (t *prevalueTracer) values() [][]byte {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
return slices.Collect(maps.Values(t.data))
|
||||
}
|
||||
|
||||
// reset resets the cached content in the prevalueTracer.
|
||||
func (t *prevalueTracer) reset() {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
clear(t.data)
|
||||
}
|
||||
|
||||
// copy returns a copied prevalueTracer instance.
|
||||
func (t *prevalueTracer) copy() *prevalueTracer {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
// Shadow clone is used, as the cached trie node values are immutable
|
||||
return &prevalueTracer{
|
||||
data: maps.Clone(t.data),
|
||||
|
|
|
|||
|
|
@ -70,7 +70,6 @@ func testTrieOpTracer(t *testing.T, vals []struct{ k, v string }) {
|
|||
}
|
||||
insertSet := copySet(trie.opTracer.inserts) // copy before commit
|
||||
deleteSet := copySet(trie.opTracer.deletes) // copy before commit
|
||||
|
||||
root, nodes := trie.Commit(false)
|
||||
db.Update(root, types.EmptyRootHash, trienode.NewWithNodeSet(nodes))
|
||||
|
||||
|
|
|
|||
|
|
@ -78,6 +78,17 @@ func (t *TransitionTrie) GetStorage(addr common.Address, key []byte) ([]byte, er
|
|||
return t.base.GetStorage(addr, key)
|
||||
}
|
||||
|
||||
// PrefetchStorage attempts to resolve specific storage slots from the database
|
||||
// to accelerate subsequent trie operations.
|
||||
func (t *TransitionTrie) PrefetchStorage(addr common.Address, keys [][]byte) error {
|
||||
for _, key := range keys {
|
||||
if _, err := t.GetStorage(addr, key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetAccount abstract an account read from the trie.
|
||||
func (t *TransitionTrie) GetAccount(address common.Address) (*types.StateAccount, error) {
|
||||
data, err := t.overlay.GetAccount(address)
|
||||
|
|
@ -94,6 +105,17 @@ func (t *TransitionTrie) GetAccount(address common.Address) (*types.StateAccount
|
|||
return t.base.GetAccount(address)
|
||||
}
|
||||
|
||||
// PrefetchAccount attempts to resolve specific accounts from the database
|
||||
// to accelerate subsequent trie operations.
|
||||
func (t *TransitionTrie) PrefetchAccount(addresses []common.Address) error {
|
||||
for _, addr := range addresses {
|
||||
if _, err := t.GetAccount(addr); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateStorage associates key with value in the trie. If value has length zero, any
|
||||
// existing value is deleted from the trie. The value bytes must not be modified
|
||||
// by the caller while they are stored in the trie.
|
||||
|
|
@ -173,7 +195,7 @@ func (t *TransitionTrie) IsVerkle() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// UpdateStems updates a group of values, given the stem they are using. If
|
||||
// UpdateStem updates a group of values, given the stem they are using. If
|
||||
// a value already exists, it is overwritten.
|
||||
func (t *TransitionTrie) UpdateStem(key []byte, values [][]byte) error {
|
||||
trie := t.overlay
|
||||
|
|
|
|||
46
trie/trie.go
46
trie/trie.go
|
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/trie/trienode"
|
||||
"github.com/ethereum/go-ethereum/triedb/database"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// Trie represents a Merkle Patricia Trie. Use New to create a trie that operates
|
||||
|
|
@ -194,6 +195,51 @@ func (t *Trie) get(origNode node, key []byte, pos int) (value []byte, newnode no
|
|||
}
|
||||
}
|
||||
|
||||
// Prefetch attempts to resolve the leaves and intermediate trie nodes
|
||||
// specified by the key list in parallel. The results are silently
|
||||
// discarded to simplify the function.
|
||||
func (t *Trie) Prefetch(keylist [][]byte) error {
|
||||
// Short circuit if the trie is already committed and not usable.
|
||||
if t.committed {
|
||||
return ErrCommitted
|
||||
}
|
||||
// Resolve the trie nodes sequentially if there are not too many
|
||||
// trie nodes in the trie.
|
||||
fn, ok := t.root.(*fullNode)
|
||||
if !ok || len(keylist) < 16 {
|
||||
for _, key := range keylist {
|
||||
_, err := t.Get(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
var (
|
||||
keys = make(map[byte][][]byte)
|
||||
eg errgroup.Group
|
||||
)
|
||||
for _, key := range keylist {
|
||||
hkey := keybytesToHex(key)
|
||||
keys[hkey[0]] = append(keys[hkey[0]], hkey)
|
||||
}
|
||||
for pos, ks := range keys {
|
||||
eg.Go(func() error {
|
||||
for _, k := range ks {
|
||||
_, newnode, didResolve, err := t.get(fn.Children[pos], k, 1)
|
||||
if err == nil && didResolve {
|
||||
fn.Children[pos] = newnode
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
// MustGetNode is a wrapper of GetNode and will omit any encountered error but
|
||||
// just print out an error message.
|
||||
func (t *Trie) MustGetNode(path []byte) ([]byte, int) {
|
||||
|
|
|
|||
|
|
@ -1499,3 +1499,83 @@ func testTrieCopyNewTrie(t *testing.T, entries []kv) {
|
|||
t.Errorf("Hash mismatch: old %v, new %v", hash, tr.Hash())
|
||||
}
|
||||
}
|
||||
|
||||
// goos: darwin
|
||||
// goarch: arm64
|
||||
// pkg: github.com/ethereum/go-ethereum/trie
|
||||
// cpu: Apple M1 Pro
|
||||
// BenchmarkTriePrefetch
|
||||
// BenchmarkTriePrefetch-8 9961 100706 ns/op
|
||||
func BenchmarkTriePrefetch(b *testing.B) {
|
||||
db := newTestDatabase(rawdb.NewMemoryDatabase(), rawdb.HashScheme)
|
||||
tr := NewEmpty(db)
|
||||
vals := make(map[string]*kv)
|
||||
for i := 0; i < 3000; i++ {
|
||||
value := &kv{
|
||||
k: randBytes(32),
|
||||
v: randBytes(20),
|
||||
t: false,
|
||||
}
|
||||
tr.MustUpdate(value.k, value.v)
|
||||
vals[string(value.k)] = value
|
||||
}
|
||||
root, nodes := tr.Commit(false)
|
||||
db.Update(root, types.EmptyRootHash, trienode.NewWithNodeSet(nodes))
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
tr, err := New(TrieID(root), db)
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to open the trie")
|
||||
}
|
||||
var keys [][]byte
|
||||
for k := range vals {
|
||||
keys = append(keys, []byte(k))
|
||||
if len(keys) > 64 {
|
||||
break
|
||||
}
|
||||
}
|
||||
tr.Prefetch(keys)
|
||||
}
|
||||
}
|
||||
|
||||
// goos: darwin
|
||||
// goarch: arm64
|
||||
// pkg: github.com/ethereum/go-ethereum/trie
|
||||
// cpu: Apple M1 Pro
|
||||
// BenchmarkTrieSeqPrefetch
|
||||
// BenchmarkTrieSeqPrefetch-8 12879 96710 ns/op
|
||||
func BenchmarkTrieSeqPrefetch(b *testing.B) {
|
||||
db := newTestDatabase(rawdb.NewMemoryDatabase(), rawdb.HashScheme)
|
||||
tr := NewEmpty(db)
|
||||
vals := make(map[string]*kv)
|
||||
for i := 0; i < 3000; i++ {
|
||||
value := &kv{
|
||||
k: randBytes(32),
|
||||
v: randBytes(20),
|
||||
t: false,
|
||||
}
|
||||
tr.MustUpdate(value.k, value.v)
|
||||
vals[string(value.k)] = value
|
||||
}
|
||||
root, nodes := tr.Commit(false)
|
||||
db.Update(root, types.EmptyRootHash, trienode.NewWithNodeSet(nodes))
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
tr, err := New(TrieID(root), db)
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to open the trie")
|
||||
}
|
||||
var keys [][]byte
|
||||
for k := range vals {
|
||||
keys = append(keys, []byte(k))
|
||||
if len(keys) > 64 {
|
||||
break
|
||||
}
|
||||
}
|
||||
for _, k := range keys {
|
||||
tr.Get(k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -108,6 +108,17 @@ func (t *VerkleTrie) GetAccount(addr common.Address) (*types.StateAccount, error
|
|||
return acc, nil
|
||||
}
|
||||
|
||||
// PrefetchAccount attempts to resolve specific accounts from the database
|
||||
// to accelerate subsequent trie operations.
|
||||
func (t *VerkleTrie) PrefetchAccount(addresses []common.Address) error {
|
||||
for _, addr := range addresses {
|
||||
if _, err := t.GetAccount(addr); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetStorage implements state.Trie, retrieving the storage slot with the specified
|
||||
// account address and storage key. If the specified slot is not in the verkle tree,
|
||||
// nil will be returned. If the tree is corrupted, an error will be returned.
|
||||
|
|
@ -120,6 +131,17 @@ func (t *VerkleTrie) GetStorage(addr common.Address, key []byte) ([]byte, error)
|
|||
return common.TrimLeftZeroes(val), nil
|
||||
}
|
||||
|
||||
// PrefetchStorage attempts to resolve specific storage slots from the database
|
||||
// to accelerate subsequent trie operations.
|
||||
func (t *VerkleTrie) PrefetchStorage(addr common.Address, keys [][]byte) error {
|
||||
for _, key := range keys {
|
||||
if _, err := t.GetStorage(addr, key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateAccount implements state.Trie, writing the provided account into the tree.
|
||||
// If the tree is corrupted, an error will be returned.
|
||||
func (t *VerkleTrie) UpdateAccount(addr common.Address, acc *types.StateAccount, codeLen int) error {
|
||||
|
|
|
|||
Loading…
Reference in a new issue