forked from forks/go-ethereum
core: implement in-block prefetcher (#31557)
This pull request enhances the block prefetcher by executing transactions in parallel to warm the cache alongside the main block processor. Unlike the original prefetcher, which only executes the next block and is limited to chain syncing, the new implementation can be applied to any block. This makes it useful not only during chain sync but also for regular block insertion after the initial sync. --------- Co-authored-by: Marius van der Wijden <m.vanderwijden@live.de>
This commit is contained in:
parent
0f48cbf017
commit
485ff4bbff
8 changed files with 352 additions and 127 deletions
|
|
@ -92,8 +92,10 @@ var (
|
|||
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
|
||||
blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
|
||||
|
||||
blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil)
|
||||
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
|
||||
blockPrefetchExecuteTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/executes", nil)
|
||||
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
|
||||
blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil)
|
||||
blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil)
|
||||
|
||||
errInsertionInterrupted = errors.New("insertion is interrupted")
|
||||
errChainStopped = errors.New("blockchain is stopped")
|
||||
|
|
@ -1758,18 +1760,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
|
|||
bc.reportBlock(block, nil, err)
|
||||
return nil, it.index, err
|
||||
}
|
||||
// No validation errors for the first block (or chain prefix skipped)
|
||||
var activeState *state.StateDB
|
||||
defer func() {
|
||||
// The chain importer is starting and stopping trie prefetchers. If a bad
|
||||
// block or other error is hit however, an early return may not properly
|
||||
// terminate the background threads. This defer ensures that we clean up
|
||||
// and dangling prefetcher, without deferring each and holding on live refs.
|
||||
if activeState != nil {
|
||||
activeState.StopPrefetcher()
|
||||
}
|
||||
}()
|
||||
|
||||
// Track the singleton witness from this chain insertion (if any)
|
||||
var witness *stateless.Witness
|
||||
|
||||
|
|
@ -1825,63 +1815,20 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness
|
|||
continue
|
||||
}
|
||||
// Retrieve the parent block and it's state to execute on top
|
||||
start := time.Now()
|
||||
parent := it.previous()
|
||||
if parent == nil {
|
||||
parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1)
|
||||
}
|
||||
statedb, err := state.New(parent.Root, bc.statedb)
|
||||
if err != nil {
|
||||
return nil, it.index, err
|
||||
}
|
||||
|
||||
// If we are past Byzantium, enable prefetching to pull in trie node paths
|
||||
// while processing transactions. Before Byzantium the prefetcher is mostly
|
||||
// useless due to the intermediate root hashing after each transaction.
|
||||
if bc.chainConfig.IsByzantium(block.Number()) {
|
||||
// Generate witnesses either if we're self-testing, or if it's the
|
||||
// only block being inserted. A bit crude, but witnesses are huge,
|
||||
// so we refuse to make an entire chain of them.
|
||||
if bc.vmConfig.StatelessSelfValidation || (makeWitness && len(chain) == 1) {
|
||||
witness, err = stateless.NewWitness(block.Header(), bc)
|
||||
if err != nil {
|
||||
return nil, it.index, err
|
||||
}
|
||||
}
|
||||
statedb.StartPrefetcher("chain", witness)
|
||||
}
|
||||
activeState = statedb
|
||||
|
||||
// If we have a followup block, run that against the current state to pre-cache
|
||||
// transactions and probabilistically some of the account/storage trie nodes.
|
||||
var followupInterrupt atomic.Bool
|
||||
if !bc.cacheConfig.TrieCleanNoPrefetch {
|
||||
if followup, err := it.peek(); followup != nil && err == nil {
|
||||
throwaway, _ := state.New(parent.Root, bc.statedb)
|
||||
|
||||
go func(start time.Time, followup *types.Block, throwaway *state.StateDB) {
|
||||
// Disable tracing for prefetcher executions.
|
||||
vmCfg := bc.vmConfig
|
||||
vmCfg.Tracer = nil
|
||||
bc.prefetcher.Prefetch(followup, throwaway, vmCfg, &followupInterrupt)
|
||||
|
||||
blockPrefetchExecuteTimer.Update(time.Since(start))
|
||||
if followupInterrupt.Load() {
|
||||
blockPrefetchInterruptMeter.Mark(1)
|
||||
}
|
||||
}(time.Now(), followup, throwaway)
|
||||
}
|
||||
}
|
||||
|
||||
// The traced section of block import.
|
||||
res, err := bc.processBlock(block, statedb, start, setHead)
|
||||
followupInterrupt.Store(true)
|
||||
start := time.Now()
|
||||
res, err := bc.processBlock(parent.Root, block, setHead, makeWitness && len(chain) == 1)
|
||||
if err != nil {
|
||||
return nil, it.index, err
|
||||
}
|
||||
// Report the import stats before returning the various results
|
||||
stats.processed++
|
||||
stats.usedGas += res.usedGas
|
||||
witness = res.witness
|
||||
|
||||
var snapDiffItems, snapBufItems common.StorageSize
|
||||
if bc.snaps != nil {
|
||||
|
|
@ -1937,11 +1884,74 @@ type blockProcessingResult struct {
|
|||
usedGas uint64
|
||||
procTime time.Duration
|
||||
status WriteStatus
|
||||
witness *stateless.Witness
|
||||
}
|
||||
|
||||
// processBlock executes and validates the given block. If there was no error
|
||||
// it writes the block and associated state to database.
|
||||
func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, start time.Time, setHead bool) (_ *blockProcessingResult, blockEndErr error) {
|
||||
func (bc *BlockChain) processBlock(parentRoot common.Hash, block *types.Block, setHead bool, makeWitness bool) (_ *blockProcessingResult, blockEndErr error) {
|
||||
var (
|
||||
err error
|
||||
startTime = time.Now()
|
||||
statedb *state.StateDB
|
||||
interrupt atomic.Bool
|
||||
)
|
||||
defer interrupt.Store(true) // terminate the prefetch at the end
|
||||
|
||||
if bc.cacheConfig.TrieCleanNoPrefetch {
|
||||
statedb, err = state.New(parentRoot, bc.statedb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
// If prefetching is enabled, run that against the current state to pre-cache
|
||||
// transactions and probabilistically some of the account/storage trie nodes.
|
||||
//
|
||||
// Note: the main processor and prefetcher share the same reader with a local
|
||||
// cache for mitigating the overhead of state access.
|
||||
reader, err := bc.statedb.ReaderWithCache(parentRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
throwaway, err := state.NewWithReader(parentRoot, bc.statedb, reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
statedb, err = state.NewWithReader(parentRoot, bc.statedb, reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go func(start time.Time, throwaway *state.StateDB, block *types.Block) {
|
||||
// Disable tracing for prefetcher executions.
|
||||
vmCfg := bc.vmConfig
|
||||
vmCfg.Tracer = nil
|
||||
bc.prefetcher.Prefetch(block, throwaway, vmCfg, &interrupt)
|
||||
|
||||
blockPrefetchExecuteTimer.Update(time.Since(start))
|
||||
if interrupt.Load() {
|
||||
blockPrefetchInterruptMeter.Mark(1)
|
||||
}
|
||||
}(time.Now(), throwaway, block)
|
||||
}
|
||||
|
||||
// If we are past Byzantium, enable prefetching to pull in trie node paths
|
||||
// while processing transactions. Before Byzantium the prefetcher is mostly
|
||||
// useless due to the intermediate root hashing after each transaction.
|
||||
var witness *stateless.Witness
|
||||
if bc.chainConfig.IsByzantium(block.Number()) {
|
||||
// Generate witnesses either if we're self-testing, or if it's the
|
||||
// only block being inserted. A bit crude, but witnesses are huge,
|
||||
// so we refuse to make an entire chain of them.
|
||||
if bc.vmConfig.StatelessSelfValidation || makeWitness {
|
||||
witness, err = stateless.NewWitness(block.Header(), bc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
statedb.StartPrefetcher("chain", witness)
|
||||
defer statedb.StopPrefetcher()
|
||||
}
|
||||
|
||||
if bc.logger != nil && bc.logger.OnBlockStart != nil {
|
||||
bc.logger.OnBlockStart(tracing.BlockEvent{
|
||||
Block: block,
|
||||
|
|
@ -2000,7 +2010,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s
|
|||
}
|
||||
}
|
||||
xvtime := time.Since(xvstart)
|
||||
proctime := time.Since(start) // processing + validation + cross validation
|
||||
proctime := time.Since(startTime) // processing + validation + cross validation
|
||||
|
||||
// Update the metrics touched during block processing and validation
|
||||
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing)
|
||||
|
|
@ -2041,9 +2051,14 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s
|
|||
triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them
|
||||
|
||||
blockWriteTimer.Update(time.Since(wstart) - max(statedb.AccountCommits, statedb.StorageCommits) /* concurrent */ - statedb.SnapshotCommits - statedb.TrieDBCommits)
|
||||
blockInsertTimer.UpdateSince(start)
|
||||
blockInsertTimer.UpdateSince(startTime)
|
||||
|
||||
return &blockProcessingResult{usedGas: res.GasUsed, procTime: proctime, status: status}, nil
|
||||
return &blockProcessingResult{
|
||||
usedGas: res.GasUsed,
|
||||
procTime: proctime,
|
||||
status: status,
|
||||
witness: witness,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// insertSideChain is called when an import batch hits upon a pruned ancestor
|
||||
|
|
|
|||
|
|
@ -138,6 +138,7 @@ func (it *insertIterator) next() (*types.Block, error) {
|
|||
//
|
||||
// Both header and body validation errors (nil too) is cached into the iterator
|
||||
// to avoid duplicating work on the following next() call.
|
||||
// nolint:unused
|
||||
func (it *insertIterator) peek() (*types.Block, error) {
|
||||
// If we reached the end of the chain, abort
|
||||
if it.index+1 >= len(it.chain) {
|
||||
|
|
|
|||
|
|
@ -34,10 +34,10 @@ import (
|
|||
|
||||
const (
|
||||
// Number of codehash->size associations to keep.
|
||||
codeSizeCacheSize = 100000
|
||||
codeSizeCacheSize = 1_000_000 // 4 megabytes in total
|
||||
|
||||
// Cache size granted for caching clean code.
|
||||
codeCacheSize = 64 * 1024 * 1024
|
||||
codeCacheSize = 256 * 1024 * 1024
|
||||
|
||||
// Number of address->curve point associations to keep.
|
||||
pointCacheSize = 4096
|
||||
|
|
@ -208,6 +208,15 @@ func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) {
|
|||
return newReader(newCachingCodeReader(db.disk, db.codeCache, db.codeSizeCache), combined), nil
|
||||
}
|
||||
|
||||
// ReaderWithCache creates a state reader with internal local cache.
|
||||
func (db *CachingDB) ReaderWithCache(stateRoot common.Hash) (Reader, error) {
|
||||
reader, err := db.Reader(stateRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newReaderWithCache(reader), nil
|
||||
}
|
||||
|
||||
// OpenTrie opens the main account trie at a specific root hash.
|
||||
func (db *CachingDB) OpenTrie(root common.Hash) (Trie, error) {
|
||||
if db.triedb.IsVerkle() {
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package state
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/lru"
|
||||
|
|
@ -32,6 +33,24 @@ import (
|
|||
"github.com/ethereum/go-ethereum/triedb/database"
|
||||
)
|
||||
|
||||
// bufferPool holds the buffers for keccak calculation.
|
||||
var bufferPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return crypto.NewKeccakState()
|
||||
},
|
||||
}
|
||||
|
||||
// allocBuff allocates the keccak buffer from the pool
|
||||
func allocBuff() crypto.KeccakState {
|
||||
return bufferPool.Get().(crypto.KeccakState)
|
||||
}
|
||||
|
||||
// releaseBuff returns the provided keccak buffer to the pool. It's unnecessary
|
||||
// to clear the buffer, as it will be cleared before the calculation.
|
||||
func releaseBuff(buff crypto.KeccakState) {
|
||||
bufferPool.Put(buff)
|
||||
}
|
||||
|
||||
// ContractCodeReader defines the interface for accessing contract code.
|
||||
type ContractCodeReader interface {
|
||||
// Code retrieves a particular contract's code.
|
||||
|
|
@ -51,6 +70,9 @@ type ContractCodeReader interface {
|
|||
|
||||
// StateReader defines the interface for accessing accounts and storage slots
|
||||
// associated with a specific state.
|
||||
//
|
||||
// StateReader is assumed to be thread-safe and implementation must take care
|
||||
// of the concurrency issue by themselves.
|
||||
type StateReader interface {
|
||||
// Account retrieves the account associated with a particular address.
|
||||
//
|
||||
|
|
@ -70,6 +92,9 @@ type StateReader interface {
|
|||
|
||||
// Reader defines the interface for accessing accounts, storage slots and contract
|
||||
// code associated with a specific state.
|
||||
//
|
||||
// Reader is assumed to be thread-safe and implementation must take care of the
|
||||
// concurrency issue by themselves.
|
||||
type Reader interface {
|
||||
ContractCodeReader
|
||||
StateReader
|
||||
|
|
@ -77,6 +102,8 @@ type Reader interface {
|
|||
|
||||
// cachingCodeReader implements ContractCodeReader, accessing contract code either in
|
||||
// local key-value store or the shared code cache.
|
||||
//
|
||||
// cachingCodeReader is safe for concurrent access.
|
||||
type cachingCodeReader struct {
|
||||
db ethdb.KeyValueReader
|
||||
|
||||
|
|
@ -123,18 +150,14 @@ func (r *cachingCodeReader) CodeSize(addr common.Address, codeHash common.Hash)
|
|||
return len(code), nil
|
||||
}
|
||||
|
||||
// flatReader wraps a database state reader.
|
||||
// flatReader wraps a database state reader and is safe for concurrent access.
|
||||
type flatReader struct {
|
||||
reader database.StateReader
|
||||
buff crypto.KeccakState
|
||||
}
|
||||
|
||||
// newFlatReader constructs a state reader with on the given state root.
|
||||
func newFlatReader(reader database.StateReader) *flatReader {
|
||||
return &flatReader{
|
||||
reader: reader,
|
||||
buff: crypto.NewKeccakState(),
|
||||
}
|
||||
return &flatReader{reader: reader}
|
||||
}
|
||||
|
||||
// Account implements StateReader, retrieving the account specified by the address.
|
||||
|
|
@ -144,7 +167,10 @@ func newFlatReader(reader database.StateReader) *flatReader {
|
|||
//
|
||||
// The returned account might be nil if it's not existent.
|
||||
func (r *flatReader) Account(addr common.Address) (*types.StateAccount, error) {
|
||||
account, err := r.reader.Account(crypto.HashData(r.buff, addr.Bytes()))
|
||||
buff := allocBuff()
|
||||
defer releaseBuff(buff)
|
||||
|
||||
account, err := r.reader.Account(crypto.HashData(buff, addr.Bytes()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -174,8 +200,11 @@ func (r *flatReader) Account(addr common.Address) (*types.StateAccount, error) {
|
|||
//
|
||||
// The returned storage slot might be empty if it's not existent.
|
||||
func (r *flatReader) Storage(addr common.Address, key common.Hash) (common.Hash, error) {
|
||||
addrHash := crypto.HashData(r.buff, addr.Bytes())
|
||||
slotHash := crypto.HashData(r.buff, key.Bytes())
|
||||
buff := allocBuff()
|
||||
defer releaseBuff(buff)
|
||||
|
||||
addrHash := crypto.HashData(buff, addr.Bytes())
|
||||
slotHash := crypto.HashData(buff, key.Bytes())
|
||||
ret, err := r.reader.Storage(addrHash, slotHash)
|
||||
if err != nil {
|
||||
return common.Hash{}, err
|
||||
|
|
@ -196,13 +225,20 @@ func (r *flatReader) Storage(addr common.Address, key common.Hash) (common.Hash,
|
|||
|
||||
// trieReader implements the StateReader interface, providing functions to access
|
||||
// state from the referenced trie.
|
||||
//
|
||||
// trieReader is safe for concurrent read.
|
||||
type trieReader struct {
|
||||
root common.Hash // State root which uniquely represent a state
|
||||
db *triedb.Database // Database for loading trie
|
||||
buff crypto.KeccakState // Buffer for keccak256 hashing
|
||||
mainTrie Trie // Main trie, resolved in constructor
|
||||
root common.Hash // State root which uniquely represent a state
|
||||
db *triedb.Database // Database for loading trie
|
||||
buff crypto.KeccakState // Buffer for keccak256 hashing
|
||||
|
||||
// Main trie, resolved in constructor. Note either the Merkle-Patricia-tree
|
||||
// or Verkle-tree is not safe for concurrent read.
|
||||
mainTrie Trie
|
||||
|
||||
subRoots map[common.Address]common.Hash // Set of storage roots, cached when the account is resolved
|
||||
subTries map[common.Address]Trie // Group of storage tries, cached when it's resolved
|
||||
lock sync.Mutex // Lock for protecting concurrent read
|
||||
}
|
||||
|
||||
// trieReader constructs a trie reader of the specific state. An error will be
|
||||
|
|
@ -230,11 +266,8 @@ func newTrieReader(root common.Hash, db *triedb.Database, cache *utils.PointCach
|
|||
}, nil
|
||||
}
|
||||
|
||||
// Account implements StateReader, retrieving the account specified by the address.
|
||||
//
|
||||
// An error will be returned if the trie state is corrupted. An nil account
|
||||
// will be returned if it's not existent in the trie.
|
||||
func (r *trieReader) Account(addr common.Address) (*types.StateAccount, error) {
|
||||
// account is the inner version of Account and assumes the r.lock is already held.
|
||||
func (r *trieReader) account(addr common.Address) (*types.StateAccount, error) {
|
||||
account, err := r.mainTrie.GetAccount(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -247,12 +280,26 @@ func (r *trieReader) Account(addr common.Address) (*types.StateAccount, error) {
|
|||
return account, nil
|
||||
}
|
||||
|
||||
// Account implements StateReader, retrieving the account specified by the address.
|
||||
//
|
||||
// An error will be returned if the trie state is corrupted. An nil account
|
||||
// will be returned if it's not existent in the trie.
|
||||
func (r *trieReader) Account(addr common.Address) (*types.StateAccount, error) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
return r.account(addr)
|
||||
}
|
||||
|
||||
// Storage implements StateReader, retrieving the storage slot specified by the
|
||||
// address and slot key.
|
||||
//
|
||||
// An error will be returned if the trie state is corrupted. An empty storage
|
||||
// slot will be returned if it's not existent in the trie.
|
||||
func (r *trieReader) Storage(addr common.Address, key common.Hash) (common.Hash, error) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
var (
|
||||
tr Trie
|
||||
found bool
|
||||
|
|
@ -268,7 +315,7 @@ func (r *trieReader) Storage(addr common.Address, key common.Hash) (common.Hash,
|
|||
// The storage slot is accessed without account caching. It's unexpected
|
||||
// behavior but try to resolve the account first anyway.
|
||||
if !ok {
|
||||
_, err := r.Account(addr)
|
||||
_, err := r.account(addr)
|
||||
if err != nil {
|
||||
return common.Hash{}, err
|
||||
}
|
||||
|
|
@ -293,6 +340,9 @@ func (r *trieReader) Storage(addr common.Address, key common.Hash) (common.Hash,
|
|||
// multiStateReader is the aggregation of a list of StateReader interface,
|
||||
// providing state access by leveraging all readers. The checking priority
|
||||
// is determined by the position in the reader list.
|
||||
//
|
||||
// multiStateReader is safe for concurrent read and assumes all underlying
|
||||
// readers are thread-safe as well.
|
||||
type multiStateReader struct {
|
||||
readers []StateReader // List of state readers, sorted by checking priority
|
||||
}
|
||||
|
|
@ -358,3 +408,95 @@ func newReader(codeReader ContractCodeReader, stateReader StateReader) *reader {
|
|||
StateReader: stateReader,
|
||||
}
|
||||
}
|
||||
|
||||
// readerWithCache is a wrapper around Reader that maintains additional state caches
|
||||
// to support concurrent state access.
|
||||
type readerWithCache struct {
|
||||
Reader // safe for concurrent read
|
||||
|
||||
// Previously resolved state entries.
|
||||
accounts map[common.Address]*types.StateAccount
|
||||
accountLock sync.RWMutex
|
||||
|
||||
// List of storage buckets, each of which is thread-safe.
|
||||
// This reader is typically used in scenarios requiring concurrent
|
||||
// access to storage. Using multiple buckets helps mitigate
|
||||
// the overhead caused by locking.
|
||||
storageBuckets [16]struct {
|
||||
lock sync.RWMutex
|
||||
storages map[common.Address]map[common.Hash]common.Hash
|
||||
}
|
||||
}
|
||||
|
||||
// newReaderWithCache constructs the reader with local cache.
|
||||
func newReaderWithCache(reader Reader) *readerWithCache {
|
||||
r := &readerWithCache{
|
||||
Reader: reader,
|
||||
accounts: make(map[common.Address]*types.StateAccount),
|
||||
}
|
||||
for i := range r.storageBuckets {
|
||||
r.storageBuckets[i].storages = make(map[common.Address]map[common.Hash]common.Hash)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// Account implements StateReader, retrieving the account specified by the address.
|
||||
// The returned account might be nil if it's not existent.
|
||||
//
|
||||
// An error will be returned if the state is corrupted in the underlying reader.
|
||||
func (r *readerWithCache) Account(addr common.Address) (*types.StateAccount, error) {
|
||||
// Try to resolve the requested account in the local cache
|
||||
r.accountLock.RLock()
|
||||
acct, ok := r.accounts[addr]
|
||||
r.accountLock.RUnlock()
|
||||
if ok {
|
||||
return acct, nil
|
||||
}
|
||||
// Try to resolve the requested account from the underlying reader
|
||||
acct, err := r.Reader.Account(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.accountLock.Lock()
|
||||
r.accounts[addr] = acct
|
||||
r.accountLock.Unlock()
|
||||
return acct, nil
|
||||
}
|
||||
|
||||
// Storage implements StateReader, retrieving the storage slot specified by the
|
||||
// address and slot key. The returned storage slot might be empty if it's not
|
||||
// existent.
|
||||
//
|
||||
// An error will be returned if the state is corrupted in the underlying reader.
|
||||
func (r *readerWithCache) Storage(addr common.Address, slot common.Hash) (common.Hash, error) {
|
||||
var (
|
||||
value common.Hash
|
||||
ok bool
|
||||
bucket = &r.storageBuckets[addr[0]&0x0f]
|
||||
)
|
||||
// Try to resolve the requested storage slot in the local cache
|
||||
bucket.lock.RLock()
|
||||
slots, ok := bucket.storages[addr]
|
||||
if ok {
|
||||
value, ok = slots[slot]
|
||||
}
|
||||
bucket.lock.RUnlock()
|
||||
if ok {
|
||||
return value, nil
|
||||
}
|
||||
// Try to resolve the requested storage slot from the underlying reader
|
||||
value, err := r.Reader.Storage(addr, slot)
|
||||
if err != nil {
|
||||
return common.Hash{}, err
|
||||
}
|
||||
bucket.lock.Lock()
|
||||
slots, ok = bucket.storages[addr]
|
||||
if !ok {
|
||||
slots = make(map[common.Hash]common.Hash)
|
||||
bucket.storages[addr] = slots
|
||||
}
|
||||
slots[slot] = value
|
||||
bucket.lock.Unlock()
|
||||
|
||||
return value, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -159,11 +159,17 @@ type StateDB struct {
|
|||
|
||||
// New creates a new state from a given trie.
|
||||
func New(root common.Hash, db Database) (*StateDB, error) {
|
||||
tr, err := db.OpenTrie(root)
|
||||
reader, err := db.Reader(root)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reader, err := db.Reader(root)
|
||||
return NewWithReader(root, db, reader)
|
||||
}
|
||||
|
||||
// NewWithReader creates a new state for the specified state root. Unlike New,
|
||||
// this function accepts an additional Reader which is bound to the given root.
|
||||
func NewWithReader(root common.Hash, db Database, reader Reader) (*StateDB, error) {
|
||||
tr, err := db.OpenTrie(root)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -392,6 +398,12 @@ func (s *StateDB) Database() Database {
|
|||
return s.db
|
||||
}
|
||||
|
||||
// Reader retrieves the low level database reader supporting the
|
||||
// lower level operations.
|
||||
func (s *StateDB) Reader() Reader {
|
||||
return s.reader
|
||||
}
|
||||
|
||||
func (s *StateDB) HasSelfDestructed(addr common.Address) bool {
|
||||
stateObject := s.getStateObject(addr)
|
||||
if stateObject != nil {
|
||||
|
|
@ -650,11 +662,10 @@ func (s *StateDB) CreateContract(addr common.Address) {
|
|||
// Snapshots of the copied state cannot be applied to the copy.
|
||||
func (s *StateDB) Copy() *StateDB {
|
||||
// Copy all the basic fields, initialize the memory ones
|
||||
reader, _ := s.db.Reader(s.originalRoot) // impossible to fail
|
||||
state := &StateDB{
|
||||
db: s.db,
|
||||
trie: mustCopyTrie(s.trie),
|
||||
reader: reader,
|
||||
reader: s.reader,
|
||||
originalRoot: s.originalRoot,
|
||||
stateObjects: make(map[common.Address]*stateObject, len(s.stateObjects)),
|
||||
stateObjectsDestruct: make(map[common.Address]*stateObject, len(s.stateObjectsDestruct)),
|
||||
|
|
|
|||
|
|
@ -17,17 +17,22 @@
|
|||
package core
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/core/vm"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// statePrefetcher is a basic Prefetcher, which blindly executes a block on top
|
||||
// of an arbitrary state with the goal of prefetching potentially useful state
|
||||
// data from disk before the main block processor start executing.
|
||||
// statePrefetcher is a basic Prefetcher that executes transactions from a block
|
||||
// on top of the parent state, aiming to prefetch potentially useful state data
|
||||
// from disk. Transactions are executed in parallel to fully leverage the
|
||||
// SSD's read performance.
|
||||
type statePrefetcher struct {
|
||||
config *params.ChainConfig // Chain configuration options
|
||||
chain *HeaderChain // Canonical block chain
|
||||
|
|
@ -43,41 +48,81 @@ func newStatePrefetcher(config *params.ChainConfig, chain *HeaderChain) *statePr
|
|||
|
||||
// Prefetch processes the state changes according to the Ethereum rules by running
|
||||
// the transaction messages using the statedb, but any changes are discarded. The
|
||||
// only goal is to pre-cache transaction signatures and state trie nodes.
|
||||
// only goal is to warm the state caches.
|
||||
func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool) {
|
||||
var (
|
||||
header = block.Header()
|
||||
gaspool = new(GasPool).AddGas(block.GasLimit())
|
||||
blockContext = NewEVMBlockContext(header, p.chain, nil)
|
||||
evm = vm.NewEVM(blockContext, statedb, p.config, cfg)
|
||||
signer = types.MakeSigner(p.config, header.Number, header.Time)
|
||||
fails atomic.Int64
|
||||
header = block.Header()
|
||||
signer = types.MakeSigner(p.config, header.Number, header.Time)
|
||||
workers errgroup.Group
|
||||
reader = statedb.Reader()
|
||||
)
|
||||
// Iterate over and process the individual transactions
|
||||
byzantium := p.config.IsByzantium(block.Number())
|
||||
for i, tx := range block.Transactions() {
|
||||
// If block precaching was interrupted, abort
|
||||
if interrupt != nil && interrupt.Load() {
|
||||
return
|
||||
}
|
||||
// Convert the transaction into an executable message and pre-cache its sender
|
||||
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
|
||||
if err != nil {
|
||||
return // Also invalid block, bail out
|
||||
}
|
||||
statedb.SetTxContext(tx.Hash(), i)
|
||||
workers.SetLimit(runtime.NumCPU() / 2)
|
||||
|
||||
// We attempt to apply a transaction. The goal is not to execute
|
||||
// the transaction successfully, rather to warm up touched data slots.
|
||||
if _, err := ApplyMessage(evm, msg, gaspool); err != nil {
|
||||
return // Ugh, something went horribly wrong, bail out
|
||||
}
|
||||
// If we're pre-byzantium, pre-load trie nodes for the intermediate root
|
||||
if !byzantium {
|
||||
statedb.IntermediateRoot(true)
|
||||
}
|
||||
}
|
||||
// If were post-byzantium, pre-load trie nodes for the final root hash
|
||||
if byzantium {
|
||||
statedb.IntermediateRoot(true)
|
||||
// Iterate over and process the individual transactions
|
||||
for i, tx := range block.Transactions() {
|
||||
stateCpy := statedb.Copy() // closure
|
||||
workers.Go(func() error {
|
||||
// If block precaching was interrupted, abort
|
||||
if interrupt != nil && interrupt.Load() {
|
||||
return nil
|
||||
}
|
||||
// Preload the touched accounts and storage slots in advance
|
||||
sender, err := types.Sender(signer, tx)
|
||||
if err != nil {
|
||||
fails.Add(1)
|
||||
return nil
|
||||
}
|
||||
reader.Account(sender)
|
||||
|
||||
if tx.To() != nil {
|
||||
account, _ := reader.Account(*tx.To())
|
||||
|
||||
// Preload the contract code if the destination has non-empty code
|
||||
if account != nil && !bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) {
|
||||
reader.Code(*tx.To(), common.BytesToHash(account.CodeHash))
|
||||
}
|
||||
}
|
||||
for _, list := range tx.AccessList() {
|
||||
reader.Account(list.Address)
|
||||
if len(list.StorageKeys) > 0 {
|
||||
for _, slot := range list.StorageKeys {
|
||||
reader.Storage(list.Address, slot)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Execute the message to preload the implicit touched states
|
||||
evm := vm.NewEVM(NewEVMBlockContext(header, p.chain, nil), stateCpy, p.config, cfg)
|
||||
|
||||
// Convert the transaction into an executable message and pre-cache its sender
|
||||
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
|
||||
if err != nil {
|
||||
fails.Add(1)
|
||||
return nil // Also invalid block, bail out
|
||||
}
|
||||
// Disable the nonce check
|
||||
msg.SkipNonceChecks = true
|
||||
|
||||
stateCpy.SetTxContext(tx.Hash(), i)
|
||||
|
||||
// We attempt to apply a transaction. The goal is not to execute
|
||||
// the transaction successfully, rather to warm up touched data slots.
|
||||
if _, err := ApplyMessage(evm, msg, new(GasPool).AddGas(block.GasLimit())); err != nil {
|
||||
fails.Add(1)
|
||||
return nil // Ugh, something went horribly wrong, bail out
|
||||
}
|
||||
// Pre-load trie nodes for the intermediate root.
|
||||
//
|
||||
// This operation incurs significant memory allocations due to
|
||||
// trie hashing and node decoding. TODO(rjl493456442): investigate
|
||||
// ways to mitigate this overhead.
|
||||
stateCpy.IntermediateRoot(true)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
workers.Wait()
|
||||
|
||||
blockPrefetchTxsValidMeter.Mark(int64(len(block.Transactions())) - fails.Load())
|
||||
blockPrefetchTxsInvalidMeter.Mark(fails.Load())
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -159,7 +159,9 @@ type Message struct {
|
|||
|
||||
// When SkipNonceChecks is true, the message nonce is not checked against the
|
||||
// account nonce in state.
|
||||
// This field will be set to true for operations like RPC eth_call.
|
||||
//
|
||||
// This field will be set to true for operations like RPC eth_call
|
||||
// or the state prefetching.
|
||||
SkipNonceChecks bool
|
||||
|
||||
// When SkipFromEOACheck is true, the message sender is not checked to be an EOA.
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ var (
|
|||
nodeDiskFalseMeter = metrics.NewRegisteredMeter("pathdb/disk/false", nil)
|
||||
nodeDiffFalseMeter = metrics.NewRegisteredMeter("pathdb/diff/false", nil)
|
||||
|
||||
commitTimeTimer = metrics.NewRegisteredTimer("pathdb/commit/time", nil)
|
||||
commitTimeTimer = metrics.NewRegisteredResettingTimer("pathdb/commit/time", nil)
|
||||
commitNodesMeter = metrics.NewRegisteredMeter("pathdb/commit/nodes", nil)
|
||||
commitBytesMeter = metrics.NewRegisteredMeter("pathdb/commit/bytes", nil)
|
||||
|
||||
|
|
@ -57,7 +57,7 @@ var (
|
|||
gcStorageMeter = metrics.NewRegisteredMeter("pathdb/gc/storage/count", nil)
|
||||
gcStorageBytesMeter = metrics.NewRegisteredMeter("pathdb/gc/storage/bytes", nil)
|
||||
|
||||
historyBuildTimeMeter = metrics.NewRegisteredTimer("pathdb/history/time", nil)
|
||||
historyBuildTimeMeter = metrics.NewRegisteredResettingTimer("pathdb/history/time", nil)
|
||||
historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil)
|
||||
historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil)
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in a new issue