diff --git a/core/blockchain.go b/core/blockchain.go index b0c1b119fc..3c691600eb 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -92,8 +92,10 @@ var ( blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) - blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil) - blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) + blockPrefetchExecuteTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/executes", nil) + blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) + blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil) + blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil) errInsertionInterrupted = errors.New("insertion is interrupted") errChainStopped = errors.New("blockchain is stopped") @@ -1758,18 +1760,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness bc.reportBlock(block, nil, err) return nil, it.index, err } - // No validation errors for the first block (or chain prefix skipped) - var activeState *state.StateDB - defer func() { - // The chain importer is starting and stopping trie prefetchers. If a bad - // block or other error is hit however, an early return may not properly - // terminate the background threads. This defer ensures that we clean up - // and dangling prefetcher, without deferring each and holding on live refs. - if activeState != nil { - activeState.StopPrefetcher() - } - }() - // Track the singleton witness from this chain insertion (if any) var witness *stateless.Witness @@ -1825,63 +1815,20 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness continue } // Retrieve the parent block and it's state to execute on top - start := time.Now() parent := it.previous() if parent == nil { parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) } - statedb, err := state.New(parent.Root, bc.statedb) - if err != nil { - return nil, it.index, err - } - - // If we are past Byzantium, enable prefetching to pull in trie node paths - // while processing transactions. Before Byzantium the prefetcher is mostly - // useless due to the intermediate root hashing after each transaction. - if bc.chainConfig.IsByzantium(block.Number()) { - // Generate witnesses either if we're self-testing, or if it's the - // only block being inserted. A bit crude, but witnesses are huge, - // so we refuse to make an entire chain of them. - if bc.vmConfig.StatelessSelfValidation || (makeWitness && len(chain) == 1) { - witness, err = stateless.NewWitness(block.Header(), bc) - if err != nil { - return nil, it.index, err - } - } - statedb.StartPrefetcher("chain", witness) - } - activeState = statedb - - // If we have a followup block, run that against the current state to pre-cache - // transactions and probabilistically some of the account/storage trie nodes. - var followupInterrupt atomic.Bool - if !bc.cacheConfig.TrieCleanNoPrefetch { - if followup, err := it.peek(); followup != nil && err == nil { - throwaway, _ := state.New(parent.Root, bc.statedb) - - go func(start time.Time, followup *types.Block, throwaway *state.StateDB) { - // Disable tracing for prefetcher executions. - vmCfg := bc.vmConfig - vmCfg.Tracer = nil - bc.prefetcher.Prefetch(followup, throwaway, vmCfg, &followupInterrupt) - - blockPrefetchExecuteTimer.Update(time.Since(start)) - if followupInterrupt.Load() { - blockPrefetchInterruptMeter.Mark(1) - } - }(time.Now(), followup, throwaway) - } - } - // The traced section of block import. - res, err := bc.processBlock(block, statedb, start, setHead) - followupInterrupt.Store(true) + start := time.Now() + res, err := bc.processBlock(parent.Root, block, setHead, makeWitness && len(chain) == 1) if err != nil { return nil, it.index, err } // Report the import stats before returning the various results stats.processed++ stats.usedGas += res.usedGas + witness = res.witness var snapDiffItems, snapBufItems common.StorageSize if bc.snaps != nil { @@ -1937,11 +1884,74 @@ type blockProcessingResult struct { usedGas uint64 procTime time.Duration status WriteStatus + witness *stateless.Witness } // processBlock executes and validates the given block. If there was no error // it writes the block and associated state to database. -func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, start time.Time, setHead bool) (_ *blockProcessingResult, blockEndErr error) { +func (bc *BlockChain) processBlock(parentRoot common.Hash, block *types.Block, setHead bool, makeWitness bool) (_ *blockProcessingResult, blockEndErr error) { + var ( + err error + startTime = time.Now() + statedb *state.StateDB + interrupt atomic.Bool + ) + defer interrupt.Store(true) // terminate the prefetch at the end + + if bc.cacheConfig.TrieCleanNoPrefetch { + statedb, err = state.New(parentRoot, bc.statedb) + if err != nil { + return nil, err + } + } else { + // If prefetching is enabled, run that against the current state to pre-cache + // transactions and probabilistically some of the account/storage trie nodes. + // + // Note: the main processor and prefetcher share the same reader with a local + // cache for mitigating the overhead of state access. + reader, err := bc.statedb.ReaderWithCache(parentRoot) + if err != nil { + return nil, err + } + throwaway, err := state.NewWithReader(parentRoot, bc.statedb, reader) + if err != nil { + return nil, err + } + statedb, err = state.NewWithReader(parentRoot, bc.statedb, reader) + if err != nil { + return nil, err + } + go func(start time.Time, throwaway *state.StateDB, block *types.Block) { + // Disable tracing for prefetcher executions. + vmCfg := bc.vmConfig + vmCfg.Tracer = nil + bc.prefetcher.Prefetch(block, throwaway, vmCfg, &interrupt) + + blockPrefetchExecuteTimer.Update(time.Since(start)) + if interrupt.Load() { + blockPrefetchInterruptMeter.Mark(1) + } + }(time.Now(), throwaway, block) + } + + // If we are past Byzantium, enable prefetching to pull in trie node paths + // while processing transactions. Before Byzantium the prefetcher is mostly + // useless due to the intermediate root hashing after each transaction. + var witness *stateless.Witness + if bc.chainConfig.IsByzantium(block.Number()) { + // Generate witnesses either if we're self-testing, or if it's the + // only block being inserted. A bit crude, but witnesses are huge, + // so we refuse to make an entire chain of them. + if bc.vmConfig.StatelessSelfValidation || makeWitness { + witness, err = stateless.NewWitness(block.Header(), bc) + if err != nil { + return nil, err + } + } + statedb.StartPrefetcher("chain", witness) + defer statedb.StopPrefetcher() + } + if bc.logger != nil && bc.logger.OnBlockStart != nil { bc.logger.OnBlockStart(tracing.BlockEvent{ Block: block, @@ -2000,7 +2010,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s } } xvtime := time.Since(xvstart) - proctime := time.Since(start) // processing + validation + cross validation + proctime := time.Since(startTime) // processing + validation + cross validation // Update the metrics touched during block processing and validation accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing) @@ -2041,9 +2051,14 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them blockWriteTimer.Update(time.Since(wstart) - max(statedb.AccountCommits, statedb.StorageCommits) /* concurrent */ - statedb.SnapshotCommits - statedb.TrieDBCommits) - blockInsertTimer.UpdateSince(start) + blockInsertTimer.UpdateSince(startTime) - return &blockProcessingResult{usedGas: res.GasUsed, procTime: proctime, status: status}, nil + return &blockProcessingResult{ + usedGas: res.GasUsed, + procTime: proctime, + status: status, + witness: witness, + }, nil } // insertSideChain is called when an import batch hits upon a pruned ancestor diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index ec3f771818..b4bd444606 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -138,6 +138,7 @@ func (it *insertIterator) next() (*types.Block, error) { // // Both header and body validation errors (nil too) is cached into the iterator // to avoid duplicating work on the following next() call. +// nolint:unused func (it *insertIterator) peek() (*types.Block, error) { // If we reached the end of the chain, abort if it.index+1 >= len(it.chain) { diff --git a/core/state/database.go b/core/state/database.go index faf4954650..cef59cccfb 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -34,10 +34,10 @@ import ( const ( // Number of codehash->size associations to keep. - codeSizeCacheSize = 100000 + codeSizeCacheSize = 1_000_000 // 4 megabytes in total // Cache size granted for caching clean code. - codeCacheSize = 64 * 1024 * 1024 + codeCacheSize = 256 * 1024 * 1024 // Number of address->curve point associations to keep. pointCacheSize = 4096 @@ -208,6 +208,15 @@ func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) { return newReader(newCachingCodeReader(db.disk, db.codeCache, db.codeSizeCache), combined), nil } +// ReaderWithCache creates a state reader with internal local cache. +func (db *CachingDB) ReaderWithCache(stateRoot common.Hash) (Reader, error) { + reader, err := db.Reader(stateRoot) + if err != nil { + return nil, err + } + return newReaderWithCache(reader), nil +} + // OpenTrie opens the main account trie at a specific root hash. func (db *CachingDB) OpenTrie(root common.Hash) (Trie, error) { if db.triedb.IsVerkle() { diff --git a/core/state/reader.go b/core/state/reader.go index a0f15dfcc8..5ad0385e9e 100644 --- a/core/state/reader.go +++ b/core/state/reader.go @@ -18,6 +18,7 @@ package state import ( "errors" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/lru" @@ -32,6 +33,24 @@ import ( "github.com/ethereum/go-ethereum/triedb/database" ) +// bufferPool holds the buffers for keccak calculation. +var bufferPool = sync.Pool{ + New: func() interface{} { + return crypto.NewKeccakState() + }, +} + +// allocBuff allocates the keccak buffer from the pool +func allocBuff() crypto.KeccakState { + return bufferPool.Get().(crypto.KeccakState) +} + +// releaseBuff returns the provided keccak buffer to the pool. It's unnecessary +// to clear the buffer, as it will be cleared before the calculation. +func releaseBuff(buff crypto.KeccakState) { + bufferPool.Put(buff) +} + // ContractCodeReader defines the interface for accessing contract code. type ContractCodeReader interface { // Code retrieves a particular contract's code. @@ -51,6 +70,9 @@ type ContractCodeReader interface { // StateReader defines the interface for accessing accounts and storage slots // associated with a specific state. +// +// StateReader is assumed to be thread-safe and implementation must take care +// of the concurrency issue by themselves. type StateReader interface { // Account retrieves the account associated with a particular address. // @@ -70,6 +92,9 @@ type StateReader interface { // Reader defines the interface for accessing accounts, storage slots and contract // code associated with a specific state. +// +// Reader is assumed to be thread-safe and implementation must take care of the +// concurrency issue by themselves. type Reader interface { ContractCodeReader StateReader @@ -77,6 +102,8 @@ type Reader interface { // cachingCodeReader implements ContractCodeReader, accessing contract code either in // local key-value store or the shared code cache. +// +// cachingCodeReader is safe for concurrent access. type cachingCodeReader struct { db ethdb.KeyValueReader @@ -123,18 +150,14 @@ func (r *cachingCodeReader) CodeSize(addr common.Address, codeHash common.Hash) return len(code), nil } -// flatReader wraps a database state reader. +// flatReader wraps a database state reader and is safe for concurrent access. type flatReader struct { reader database.StateReader - buff crypto.KeccakState } // newFlatReader constructs a state reader with on the given state root. func newFlatReader(reader database.StateReader) *flatReader { - return &flatReader{ - reader: reader, - buff: crypto.NewKeccakState(), - } + return &flatReader{reader: reader} } // Account implements StateReader, retrieving the account specified by the address. @@ -144,7 +167,10 @@ func newFlatReader(reader database.StateReader) *flatReader { // // The returned account might be nil if it's not existent. func (r *flatReader) Account(addr common.Address) (*types.StateAccount, error) { - account, err := r.reader.Account(crypto.HashData(r.buff, addr.Bytes())) + buff := allocBuff() + defer releaseBuff(buff) + + account, err := r.reader.Account(crypto.HashData(buff, addr.Bytes())) if err != nil { return nil, err } @@ -174,8 +200,11 @@ func (r *flatReader) Account(addr common.Address) (*types.StateAccount, error) { // // The returned storage slot might be empty if it's not existent. func (r *flatReader) Storage(addr common.Address, key common.Hash) (common.Hash, error) { - addrHash := crypto.HashData(r.buff, addr.Bytes()) - slotHash := crypto.HashData(r.buff, key.Bytes()) + buff := allocBuff() + defer releaseBuff(buff) + + addrHash := crypto.HashData(buff, addr.Bytes()) + slotHash := crypto.HashData(buff, key.Bytes()) ret, err := r.reader.Storage(addrHash, slotHash) if err != nil { return common.Hash{}, err @@ -196,13 +225,20 @@ func (r *flatReader) Storage(addr common.Address, key common.Hash) (common.Hash, // trieReader implements the StateReader interface, providing functions to access // state from the referenced trie. +// +// trieReader is safe for concurrent read. type trieReader struct { - root common.Hash // State root which uniquely represent a state - db *triedb.Database // Database for loading trie - buff crypto.KeccakState // Buffer for keccak256 hashing - mainTrie Trie // Main trie, resolved in constructor + root common.Hash // State root which uniquely represent a state + db *triedb.Database // Database for loading trie + buff crypto.KeccakState // Buffer for keccak256 hashing + + // Main trie, resolved in constructor. Note either the Merkle-Patricia-tree + // or Verkle-tree is not safe for concurrent read. + mainTrie Trie + subRoots map[common.Address]common.Hash // Set of storage roots, cached when the account is resolved subTries map[common.Address]Trie // Group of storage tries, cached when it's resolved + lock sync.Mutex // Lock for protecting concurrent read } // trieReader constructs a trie reader of the specific state. An error will be @@ -230,11 +266,8 @@ func newTrieReader(root common.Hash, db *triedb.Database, cache *utils.PointCach }, nil } -// Account implements StateReader, retrieving the account specified by the address. -// -// An error will be returned if the trie state is corrupted. An nil account -// will be returned if it's not existent in the trie. -func (r *trieReader) Account(addr common.Address) (*types.StateAccount, error) { +// account is the inner version of Account and assumes the r.lock is already held. +func (r *trieReader) account(addr common.Address) (*types.StateAccount, error) { account, err := r.mainTrie.GetAccount(addr) if err != nil { return nil, err @@ -247,12 +280,26 @@ func (r *trieReader) Account(addr common.Address) (*types.StateAccount, error) { return account, nil } +// Account implements StateReader, retrieving the account specified by the address. +// +// An error will be returned if the trie state is corrupted. An nil account +// will be returned if it's not existent in the trie. +func (r *trieReader) Account(addr common.Address) (*types.StateAccount, error) { + r.lock.Lock() + defer r.lock.Unlock() + + return r.account(addr) +} + // Storage implements StateReader, retrieving the storage slot specified by the // address and slot key. // // An error will be returned if the trie state is corrupted. An empty storage // slot will be returned if it's not existent in the trie. func (r *trieReader) Storage(addr common.Address, key common.Hash) (common.Hash, error) { + r.lock.Lock() + defer r.lock.Unlock() + var ( tr Trie found bool @@ -268,7 +315,7 @@ func (r *trieReader) Storage(addr common.Address, key common.Hash) (common.Hash, // The storage slot is accessed without account caching. It's unexpected // behavior but try to resolve the account first anyway. if !ok { - _, err := r.Account(addr) + _, err := r.account(addr) if err != nil { return common.Hash{}, err } @@ -293,6 +340,9 @@ func (r *trieReader) Storage(addr common.Address, key common.Hash) (common.Hash, // multiStateReader is the aggregation of a list of StateReader interface, // providing state access by leveraging all readers. The checking priority // is determined by the position in the reader list. +// +// multiStateReader is safe for concurrent read and assumes all underlying +// readers are thread-safe as well. type multiStateReader struct { readers []StateReader // List of state readers, sorted by checking priority } @@ -358,3 +408,95 @@ func newReader(codeReader ContractCodeReader, stateReader StateReader) *reader { StateReader: stateReader, } } + +// readerWithCache is a wrapper around Reader that maintains additional state caches +// to support concurrent state access. +type readerWithCache struct { + Reader // safe for concurrent read + + // Previously resolved state entries. + accounts map[common.Address]*types.StateAccount + accountLock sync.RWMutex + + // List of storage buckets, each of which is thread-safe. + // This reader is typically used in scenarios requiring concurrent + // access to storage. Using multiple buckets helps mitigate + // the overhead caused by locking. + storageBuckets [16]struct { + lock sync.RWMutex + storages map[common.Address]map[common.Hash]common.Hash + } +} + +// newReaderWithCache constructs the reader with local cache. +func newReaderWithCache(reader Reader) *readerWithCache { + r := &readerWithCache{ + Reader: reader, + accounts: make(map[common.Address]*types.StateAccount), + } + for i := range r.storageBuckets { + r.storageBuckets[i].storages = make(map[common.Address]map[common.Hash]common.Hash) + } + return r +} + +// Account implements StateReader, retrieving the account specified by the address. +// The returned account might be nil if it's not existent. +// +// An error will be returned if the state is corrupted in the underlying reader. +func (r *readerWithCache) Account(addr common.Address) (*types.StateAccount, error) { + // Try to resolve the requested account in the local cache + r.accountLock.RLock() + acct, ok := r.accounts[addr] + r.accountLock.RUnlock() + if ok { + return acct, nil + } + // Try to resolve the requested account from the underlying reader + acct, err := r.Reader.Account(addr) + if err != nil { + return nil, err + } + r.accountLock.Lock() + r.accounts[addr] = acct + r.accountLock.Unlock() + return acct, nil +} + +// Storage implements StateReader, retrieving the storage slot specified by the +// address and slot key. The returned storage slot might be empty if it's not +// existent. +// +// An error will be returned if the state is corrupted in the underlying reader. +func (r *readerWithCache) Storage(addr common.Address, slot common.Hash) (common.Hash, error) { + var ( + value common.Hash + ok bool + bucket = &r.storageBuckets[addr[0]&0x0f] + ) + // Try to resolve the requested storage slot in the local cache + bucket.lock.RLock() + slots, ok := bucket.storages[addr] + if ok { + value, ok = slots[slot] + } + bucket.lock.RUnlock() + if ok { + return value, nil + } + // Try to resolve the requested storage slot from the underlying reader + value, err := r.Reader.Storage(addr, slot) + if err != nil { + return common.Hash{}, err + } + bucket.lock.Lock() + slots, ok = bucket.storages[addr] + if !ok { + slots = make(map[common.Hash]common.Hash) + bucket.storages[addr] = slots + } + slots[slot] = value + bucket.lock.Unlock() + + return value, nil +} diff --git a/core/state/statedb.go b/core/state/statedb.go index e3f5b9e1a0..9378cae7de 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -159,11 +159,17 @@ type StateDB struct { // New creates a new state from a given trie. func New(root common.Hash, db Database) (*StateDB, error) { - tr, err := db.OpenTrie(root) + reader, err := db.Reader(root) if err != nil { return nil, err } - reader, err := db.Reader(root) + return NewWithReader(root, db, reader) +} + +// NewWithReader creates a new state for the specified state root. Unlike New, +// this function accepts an additional Reader which is bound to the given root. +func NewWithReader(root common.Hash, db Database, reader Reader) (*StateDB, error) { + tr, err := db.OpenTrie(root) if err != nil { return nil, err } @@ -392,6 +398,12 @@ func (s *StateDB) Database() Database { return s.db } +// Reader retrieves the low level database reader supporting the +// lower level operations. +func (s *StateDB) Reader() Reader { + return s.reader +} + func (s *StateDB) HasSelfDestructed(addr common.Address) bool { stateObject := s.getStateObject(addr) if stateObject != nil { @@ -650,11 +662,10 @@ func (s *StateDB) CreateContract(addr common.Address) { // Snapshots of the copied state cannot be applied to the copy. func (s *StateDB) Copy() *StateDB { // Copy all the basic fields, initialize the memory ones - reader, _ := s.db.Reader(s.originalRoot) // impossible to fail state := &StateDB{ db: s.db, trie: mustCopyTrie(s.trie), - reader: reader, + reader: s.reader, originalRoot: s.originalRoot, stateObjects: make(map[common.Address]*stateObject, len(s.stateObjects)), stateObjectsDestruct: make(map[common.Address]*stateObject, len(s.stateObjectsDestruct)), diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 805df5ef62..f3129f57cd 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -17,17 +17,22 @@ package core import ( + "bytes" + "runtime" "sync/atomic" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/params" + "golang.org/x/sync/errgroup" ) -// statePrefetcher is a basic Prefetcher, which blindly executes a block on top -// of an arbitrary state with the goal of prefetching potentially useful state -// data from disk before the main block processor start executing. +// statePrefetcher is a basic Prefetcher that executes transactions from a block +// on top of the parent state, aiming to prefetch potentially useful state data +// from disk. Transactions are executed in parallel to fully leverage the +// SSD's read performance. type statePrefetcher struct { config *params.ChainConfig // Chain configuration options chain *HeaderChain // Canonical block chain @@ -43,41 +48,81 @@ func newStatePrefetcher(config *params.ChainConfig, chain *HeaderChain) *statePr // Prefetch processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb, but any changes are discarded. The -// only goal is to pre-cache transaction signatures and state trie nodes. +// only goal is to warm the state caches. func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, interrupt *atomic.Bool) { var ( - header = block.Header() - gaspool = new(GasPool).AddGas(block.GasLimit()) - blockContext = NewEVMBlockContext(header, p.chain, nil) - evm = vm.NewEVM(blockContext, statedb, p.config, cfg) - signer = types.MakeSigner(p.config, header.Number, header.Time) + fails atomic.Int64 + header = block.Header() + signer = types.MakeSigner(p.config, header.Number, header.Time) + workers errgroup.Group + reader = statedb.Reader() ) - // Iterate over and process the individual transactions - byzantium := p.config.IsByzantium(block.Number()) - for i, tx := range block.Transactions() { - // If block precaching was interrupted, abort - if interrupt != nil && interrupt.Load() { - return - } - // Convert the transaction into an executable message and pre-cache its sender - msg, err := TransactionToMessage(tx, signer, header.BaseFee) - if err != nil { - return // Also invalid block, bail out - } - statedb.SetTxContext(tx.Hash(), i) + workers.SetLimit(runtime.NumCPU() / 2) - // We attempt to apply a transaction. The goal is not to execute - // the transaction successfully, rather to warm up touched data slots. - if _, err := ApplyMessage(evm, msg, gaspool); err != nil { - return // Ugh, something went horribly wrong, bail out - } - // If we're pre-byzantium, pre-load trie nodes for the intermediate root - if !byzantium { - statedb.IntermediateRoot(true) - } - } - // If were post-byzantium, pre-load trie nodes for the final root hash - if byzantium { - statedb.IntermediateRoot(true) + // Iterate over and process the individual transactions + for i, tx := range block.Transactions() { + stateCpy := statedb.Copy() // closure + workers.Go(func() error { + // If block precaching was interrupted, abort + if interrupt != nil && interrupt.Load() { + return nil + } + // Preload the touched accounts and storage slots in advance + sender, err := types.Sender(signer, tx) + if err != nil { + fails.Add(1) + return nil + } + reader.Account(sender) + + if tx.To() != nil { + account, _ := reader.Account(*tx.To()) + + // Preload the contract code if the destination has non-empty code + if account != nil && !bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) { + reader.Code(*tx.To(), common.BytesToHash(account.CodeHash)) + } + } + for _, list := range tx.AccessList() { + reader.Account(list.Address) + if len(list.StorageKeys) > 0 { + for _, slot := range list.StorageKeys { + reader.Storage(list.Address, slot) + } + } + } + // Execute the message to preload the implicit touched states + evm := vm.NewEVM(NewEVMBlockContext(header, p.chain, nil), stateCpy, p.config, cfg) + + // Convert the transaction into an executable message and pre-cache its sender + msg, err := TransactionToMessage(tx, signer, header.BaseFee) + if err != nil { + fails.Add(1) + return nil // Also invalid block, bail out + } + // Disable the nonce check + msg.SkipNonceChecks = true + + stateCpy.SetTxContext(tx.Hash(), i) + + // We attempt to apply a transaction. The goal is not to execute + // the transaction successfully, rather to warm up touched data slots. + if _, err := ApplyMessage(evm, msg, new(GasPool).AddGas(block.GasLimit())); err != nil { + fails.Add(1) + return nil // Ugh, something went horribly wrong, bail out + } + // Pre-load trie nodes for the intermediate root. + // + // This operation incurs significant memory allocations due to + // trie hashing and node decoding. TODO(rjl493456442): investigate + // ways to mitigate this overhead. + stateCpy.IntermediateRoot(true) + return nil + }) } + workers.Wait() + + blockPrefetchTxsValidMeter.Mark(int64(len(block.Transactions())) - fails.Load()) + blockPrefetchTxsInvalidMeter.Mark(fails.Load()) + return } diff --git a/core/state_transition.go b/core/state_transition.go index f9c9a2ab5a..ff2051ddd2 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -159,7 +159,9 @@ type Message struct { // When SkipNonceChecks is true, the message nonce is not checked against the // account nonce in state. - // This field will be set to true for operations like RPC eth_call. + // + // This field will be set to true for operations like RPC eth_call + // or the state prefetching. SkipNonceChecks bool // When SkipFromEOACheck is true, the message sender is not checked to be an EOA. diff --git a/triedb/pathdb/metrics.go b/triedb/pathdb/metrics.go index 45dad6f1ae..abe2dfe1f6 100644 --- a/triedb/pathdb/metrics.go +++ b/triedb/pathdb/metrics.go @@ -46,7 +46,7 @@ var ( nodeDiskFalseMeter = metrics.NewRegisteredMeter("pathdb/disk/false", nil) nodeDiffFalseMeter = metrics.NewRegisteredMeter("pathdb/diff/false", nil) - commitTimeTimer = metrics.NewRegisteredTimer("pathdb/commit/time", nil) + commitTimeTimer = metrics.NewRegisteredResettingTimer("pathdb/commit/time", nil) commitNodesMeter = metrics.NewRegisteredMeter("pathdb/commit/nodes", nil) commitBytesMeter = metrics.NewRegisteredMeter("pathdb/commit/bytes", nil) @@ -57,7 +57,7 @@ var ( gcStorageMeter = metrics.NewRegisteredMeter("pathdb/gc/storage/count", nil) gcStorageBytesMeter = metrics.NewRegisteredMeter("pathdb/gc/storage/bytes", nil) - historyBuildTimeMeter = metrics.NewRegisteredTimer("pathdb/history/time", nil) + historyBuildTimeMeter = metrics.NewRegisteredResettingTimer("pathdb/history/time", nil) historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil) historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil) )