mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-13 11:36:37 +00:00
cmd,core,tests: introduce new BAL execution flags, log BAL slow blocks, surface more metrics including prefetcher time (#34892)
Adapts some of the changes from https://github.com/ethereum/go-ethereum/pull/34861 . Some other metrics which are recorded manually during execution in that PR, but can be deduced from the BAL are TBD. I've added two bal feature flags: * `--bal.prefetchworkers <uint>`: this tunes the number of concurrent go-routines that will be used to perform state fetching tasks by the BAL prefetcher. Default is `runtime.NumCPUs`, the current behavior in `bal-devnet-3`. * `--bal.blockingprefetch`: If set, state prefetching will block the execution of transactions and state root update. --------- Co-authored-by: CPerezz <cperezz19@pm.me>
This commit is contained in:
parent
2bb95a19a4
commit
697fe91750
11 changed files with 166 additions and 107 deletions
|
|
@ -159,6 +159,8 @@ var (
|
|||
utils.BeaconCheckpointFlag,
|
||||
utils.BeaconCheckpointFileFlag,
|
||||
utils.LogSlowBlockFlag,
|
||||
utils.PrefetchWorkersFlag,
|
||||
utils.BlockingPrefetch,
|
||||
}, utils.NetworkFlags, utils.DatabaseFlags)
|
||||
|
||||
rpcFlags = []cli.Flag{
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
godebug "runtime/debug"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
|
@ -713,6 +714,19 @@ var (
|
|||
Category: flags.MiscCategory,
|
||||
}
|
||||
|
||||
PrefetchWorkersFlag = &cli.UintFlag{
|
||||
Name: "bal.prefetchworkers",
|
||||
Usage: "The number of concurrent state loading tasks to perform when prefetching BAL state. Default to the number of cpus",
|
||||
Value: uint(runtime.NumCPU()),
|
||||
Category: flags.MiscCategory,
|
||||
}
|
||||
|
||||
BlockingPrefetch = &cli.BoolFlag{
|
||||
Name: "bal.blockingprefetch",
|
||||
Usage: "only relevant when executing in parallel with a BAL: if true, the prefetcher will block tx/state-root calculation until all scheduled fetching tasks have completed.",
|
||||
Category: flags.MiscCategory,
|
||||
}
|
||||
|
||||
// RPC settings
|
||||
IPCDisabledFlag = &cli.BoolFlag{
|
||||
Name: "ipcdisable",
|
||||
|
|
@ -2459,6 +2473,8 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockCh
|
|||
TrienodeHistory: ctx.Int64(TrienodeHistoryFlag.Name),
|
||||
NodeFullValueCheckpoint: uint32(ctx.Uint(TrienodeHistoryFullValueCheckpointFlag.Name)),
|
||||
|
||||
PrefetchWorkers: int(ctx.Uint(PrefetchWorkersFlag.Name)),
|
||||
BlockingPrefetch: ctx.Bool(BlockingPrefetch.Name),
|
||||
// Disable transaction indexing/unindexing.
|
||||
TxLookupLimit: -1,
|
||||
|
||||
|
|
|
|||
|
|
@ -211,6 +211,10 @@ type BlockChainConfig struct {
|
|||
Overrides *ChainOverrides // Optional chain config overrides
|
||||
VmConfig vm.Config // Config options for the EVM Interpreter
|
||||
|
||||
// BAL-related
|
||||
PrefetchWorkers int // number of concurrent go-routines for BAL state prefetching
|
||||
BlockingPrefetch bool // whether the prefetch should block further execution until it finishes
|
||||
|
||||
// TxLookupLimit specifies the maximum number of blocks from head for which
|
||||
// transaction hashes will be indexed.
|
||||
//
|
||||
|
|
@ -597,7 +601,7 @@ func (bc *BlockChain) processBlockWithAccessList(parentRoot common.Hash, block *
|
|||
useAsyncReads := bc.cfg.BALExecutionMode != bal.BALExecutionNoBatchIO
|
||||
al := block.AccessList() // TODO: make the return of this method not be a pointer
|
||||
accessListReader := bal.NewAccessListReader(*al)
|
||||
prefetchReader, err := sdb.ReaderEIP7928(parentRoot, accessListReader.StorageKeys(useAsyncReads), runtime.NumCPU())
|
||||
prefetchReader, err := sdb.ReaderEIP7928(parentRoot, accessListReader.StorageKeys(useAsyncReads), bc.cfg.PrefetchWorkers, bc.cfg.BlockingPrefetch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -652,34 +656,22 @@ func (bc *BlockChain) processBlockWithAccessList(parentRoot common.Hash, block *
|
|||
writeTime := time.Since(writeStart)
|
||||
var stats ExecuteStats
|
||||
|
||||
/*
|
||||
// TODO: implement the gathering of this data
|
||||
stats.AccountReads = statedb.AccountReads // Account reads are complete(in processing)
|
||||
stats.StorageReads = statedb.StorageReads // Storage reads are complete(in processing)
|
||||
stats.AccountUpdates = statedb.AccountUpdates // Account updates are complete(in validation)
|
||||
stats.StorageUpdates = statedb.StorageUpdates // Storage updates are complete(in validation)
|
||||
stats.AccountHashes = statedb.AccountHashes // Account hashes are complete(in validation)
|
||||
stats.CodeReads = statedb.CodeReads
|
||||
stats.ExecWall = res.ExecTime
|
||||
stats.PostProcess = res.PostProcessTime
|
||||
|
||||
stats.AccountLoaded = statedb.AccountLoaded
|
||||
stats.AccountUpdated = statedb.AccountUpdated
|
||||
stats.AccountDeleted = statedb.AccountDeleted
|
||||
stats.StorageLoaded = statedb.StorageLoaded
|
||||
stats.StorageUpdated = int(statedb.StorageUpdated.Load())
|
||||
stats.StorageDeleted = int(statedb.StorageDeleted.Load())
|
||||
stats.CodeLoaded = statedb.CodeLoaded
|
||||
stats.CodeLoadBytes = statedb.CodeLoadBytes
|
||||
if m := res.StateTransitionMetrics; m != nil {
|
||||
stats.AccountHashes = m.AccountUpdate + m.StateUpdate + m.StateHash
|
||||
stats.AccountCommits = m.AccountCommits
|
||||
stats.StorageCommits = m.StorageCommits
|
||||
stats.DatabaseCommit = m.TrieDBCommits
|
||||
stats.Prefetch = m.StatePrefetch
|
||||
}
|
||||
|
||||
stats.Execution = ptime - (statedb.AccountReads + statedb.StorageReads + statedb.CodeReads) // The time spent on EVM processing
|
||||
stats.Validation = vtime - (statedb.AccountHashes + statedb.AccountUpdates + statedb.StorageUpdates) // The time spent on block validation
|
||||
*/
|
||||
stats.Prefetch = prefetchReader.(state.PrefetcherMetricer).Metrics().Elapsed
|
||||
|
||||
// Update the metrics touched during block commit
|
||||
stats.AccountCommits = stateTransition.Metrics().AccountCommits
|
||||
stats.StorageCommits = stateTransition.Metrics().StorageCommits
|
||||
|
||||
// stats.StateReadCacheStats = whichReader.GetStats()
|
||||
// ^ TODO fix this
|
||||
if r, ok := prefetchReader.(state.ReaderStater); ok {
|
||||
stats.StateReadCacheStats = r.GetStats()
|
||||
}
|
||||
|
||||
elapsed := time.Since(startTime) + 1 // prevent zero division
|
||||
stats.TotalTime = elapsed
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ type ExecuteStats struct {
|
|||
StorageCommits time.Duration // Time spent on the storage trie commit
|
||||
CodeReads time.Duration // Time spent on the contract code read
|
||||
|
||||
// TODO: code bytes loaded
|
||||
AccountLoaded int // Number of accounts loaded
|
||||
AccountUpdated int // Number of accounts updated
|
||||
AccountDeleted int // Number of accounts deleted
|
||||
|
|
@ -59,6 +60,11 @@ type ExecuteStats struct {
|
|||
TotalTime time.Duration // The total time spent on block execution
|
||||
MgasPerSecond float64 // The million gas processed per second
|
||||
|
||||
// BAL parallel-path durations, surfaced under slowBlockLog.BAL.
|
||||
ExecWall time.Duration // Wall-clock parallel transaction execution
|
||||
PostProcess time.Duration // Post-tx finalization (system contracts, requests)
|
||||
Prefetch time.Duration // BAL state prefetching
|
||||
|
||||
// Cache hit rates
|
||||
StateReadCacheStats state.ReaderStats
|
||||
StatePrefetchCacheStats state.ReaderStats
|
||||
|
|
@ -120,6 +126,8 @@ type slowBlockLog struct {
|
|||
StateReads slowBlockReads `json:"state_reads"`
|
||||
StateWrites slowBlockWrites `json:"state_writes"`
|
||||
Cache slowBlockCache `json:"cache"`
|
||||
// BAL is set only for blocks processed via the parallel BAL path.
|
||||
BAL *slowBlockBAL `json:"bal,omitempty"`
|
||||
}
|
||||
|
||||
type slowBlockInfo struct {
|
||||
|
|
@ -180,24 +188,30 @@ type slowBlockCodeCacheEntry struct {
|
|||
MissBytes int64 `json:"miss_bytes"`
|
||||
}
|
||||
|
||||
// slowBlockBAL holds parallel-execution timings that don't fit the sequential schema.
|
||||
type slowBlockBAL struct {
|
||||
ExecWallMs float64 `json:"exec_wall_ms"`
|
||||
PostProcessMs float64 `json:"post_process_ms"`
|
||||
PrefetchMs float64 `json:"prefetch_ms"`
|
||||
StatePrefetchMs float64 `json:"state_prefetch_ms"`
|
||||
AccountUpdateMs float64 `json:"account_update_ms"`
|
||||
StateUpdateMs float64 `json:"state_update_ms"`
|
||||
StateHashMs float64 `json:"state_hash_ms"`
|
||||
AccountCommitMs float64 `json:"account_commit_ms"`
|
||||
StorageCommitMs float64 `json:"storage_commit_ms"`
|
||||
TrieDBCommitMs float64 `json:"triedb_commit_ms"`
|
||||
SnapshotCommitMs float64 `json:"snapshot_commit_ms"`
|
||||
}
|
||||
|
||||
// durationToMs converts a time.Duration to milliseconds as a float64
|
||||
// with sub-millisecond precision for accurate cross-client metrics.
|
||||
func durationToMs(d time.Duration) float64 {
|
||||
return float64(d.Nanoseconds()) / 1e6
|
||||
}
|
||||
|
||||
// logSlow prints the detailed execution statistics in JSON format if the block
|
||||
// is regarded as slow. The JSON format is designed for cross-client compatibility
|
||||
// with other Ethereum execution clients.
|
||||
func (s *ExecuteStats) logSlow(block *types.Block, slowBlockThreshold time.Duration) {
|
||||
// Negative threshold means disabled (default when flag not set)
|
||||
if slowBlockThreshold < 0 {
|
||||
return
|
||||
}
|
||||
// Threshold of 0 logs all blocks; positive threshold filters
|
||||
if slowBlockThreshold > 0 && s.TotalTime < slowBlockThreshold {
|
||||
return
|
||||
}
|
||||
// buildSlowBlockLog builds the slow-block JSON payload. Split out from logSlow
|
||||
// so the JSON shape is directly testable.
|
||||
func buildSlowBlockLog(s *ExecuteStats, block *types.Block) slowBlockLog {
|
||||
logEntry := slowBlockLog{
|
||||
Level: "warn",
|
||||
Msg: "Slow block",
|
||||
|
|
@ -226,8 +240,8 @@ func (s *ExecuteStats) logSlow(block *types.Block, slowBlockThreshold time.Durat
|
|||
StateWrites: slowBlockWrites{
|
||||
Accounts: s.AccountUpdated,
|
||||
AccountsDeleted: s.AccountDeleted,
|
||||
StorageSlots: s.StorageUpdated,
|
||||
StorageSlotsDeleted: s.StorageDeleted,
|
||||
StorageSlots: int(s.StorageUpdated),
|
||||
StorageSlotsDeleted: int(s.StorageDeleted),
|
||||
Code: s.CodeUpdated,
|
||||
CodeBytes: s.CodeUpdateBytes,
|
||||
},
|
||||
|
|
@ -251,7 +265,37 @@ func (s *ExecuteStats) logSlow(block *types.Block, slowBlockThreshold time.Durat
|
|||
},
|
||||
},
|
||||
}
|
||||
jsonBytes, err := json.Marshal(logEntry)
|
||||
if m := s.balTransitionStats; m != nil {
|
||||
logEntry.BAL = &slowBlockBAL{
|
||||
ExecWallMs: durationToMs(s.ExecWall),
|
||||
PostProcessMs: durationToMs(s.PostProcess),
|
||||
PrefetchMs: durationToMs(s.Prefetch),
|
||||
StatePrefetchMs: durationToMs(m.StatePrefetch),
|
||||
AccountUpdateMs: durationToMs(m.AccountUpdate),
|
||||
StateUpdateMs: durationToMs(m.StateUpdate),
|
||||
StateHashMs: durationToMs(m.StateHash),
|
||||
AccountCommitMs: durationToMs(m.AccountCommits),
|
||||
StorageCommitMs: durationToMs(m.StorageCommits),
|
||||
TrieDBCommitMs: durationToMs(m.TrieDBCommits),
|
||||
SnapshotCommitMs: durationToMs(m.SnapshotCommits),
|
||||
}
|
||||
}
|
||||
return logEntry
|
||||
}
|
||||
|
||||
// logSlow prints the detailed execution statistics in JSON format if the block
|
||||
// is regarded as slow. The JSON format is designed for cross-client compatibility
|
||||
// with other Ethereum execution clients.
|
||||
func (s *ExecuteStats) logSlow(block *types.Block, slowBlockThreshold time.Duration) {
|
||||
// Negative threshold means disabled (default when flag not set)
|
||||
if slowBlockThreshold < 0 {
|
||||
return
|
||||
}
|
||||
// Threshold of 0 logs all blocks; positive threshold filters
|
||||
if slowBlockThreshold > 0 && s.TotalTime < slowBlockThreshold {
|
||||
return
|
||||
}
|
||||
jsonBytes, err := json.Marshal(buildSlowBlockLog(s, block))
|
||||
if err != nil {
|
||||
log.Error("Failed to marshal slow block log", "error", err)
|
||||
return
|
||||
|
|
@ -260,40 +304,16 @@ func (s *ExecuteStats) logSlow(block *types.Block, slowBlockThreshold time.Durat
|
|||
}
|
||||
|
||||
func (s *ExecuteStats) reportBALMetrics() {
|
||||
/*
|
||||
if s.AccountLoaded != 0 {
|
||||
accountReadTimer.Update(s.AccountReads)
|
||||
accountReadSingleTimer.Update(s.AccountReads / time.Duration(s.AccountLoaded))
|
||||
}
|
||||
if s.StorageLoaded != 0 {
|
||||
storageReadTimer.Update(s.StorageReads)
|
||||
storageReadSingleTimer.Update(s.StorageReads / time.Duration(s.StorageLoaded))
|
||||
}
|
||||
if s.CodeLoaded != 0 {
|
||||
codeReadTimer.Update(s.CodeReads)
|
||||
codeReadSingleTimer.Update(s.CodeReads / time.Duration(s.CodeLoaded))
|
||||
codeReadBytesTimer.Update(time.Duration(s.CodeLoadBytes))
|
||||
}
|
||||
// TODO: implement these ^
|
||||
*/
|
||||
//accountUpdateTimer.Update(s.AccountUpdates) // Account updates are complete(in validation)
|
||||
//storageUpdateTimer.Update(s.StorageUpdates) // Storage updates are complete(in validation)
|
||||
//accountHashTimer.Update(s.AccountHashes) // Account hashes are complete(in validation)
|
||||
|
||||
accountCommitTimer.Update(s.AccountCommits) // Account commits are complete, we can mark them
|
||||
storageCommitTimer.Update(s.StorageCommits) // Storage commits are complete, we can mark them
|
||||
|
||||
stateTriePrefetchTimer.Update(s.balTransitionStats.StatePrefetch)
|
||||
accountTriesUpdateTimer.Update(s.balTransitionStats.AccountUpdate)
|
||||
stateTrieUpdateTimer.Update(s.balTransitionStats.StateUpdate)
|
||||
stateTrieHashTimer.Update(s.balTransitionStats.StateHash)
|
||||
stateRootComputeTimer.Update(s.balTransitionStats.AccountUpdate + s.balTransitionStats.StateUpdate + s.balTransitionStats.StateHash)
|
||||
|
||||
//blockExecutionTimer.Update(s.Execution) // The time spent on EVM processing
|
||||
// ^basically impossible to get this metric with parallel execution
|
||||
|
||||
//blockValidationTimer.Update(s.Validation) // The time spent on block validation
|
||||
//blockCrossValidationTimer.Update(s.CrossValidation) // The time spent on stateless cross validation
|
||||
if m := s.balTransitionStats; m != nil {
|
||||
stateTriePrefetchTimer.Update(m.StatePrefetch)
|
||||
accountTriesUpdateTimer.Update(m.AccountUpdate)
|
||||
stateTrieUpdateTimer.Update(m.StateUpdate)
|
||||
stateTrieHashTimer.Update(m.StateHash)
|
||||
stateRootComputeTimer.Update(m.AccountUpdate + m.StateUpdate + m.StateHash)
|
||||
}
|
||||
|
||||
blockWriteTimer.Update(s.BlockWrite) // The time spent on block write
|
||||
blockInsertTimer.Update(s.TotalTime) // The total time spent on block execution
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ type ProcessResultWithMetrics struct {
|
|||
// the time it took to execute all txs in the block
|
||||
ExecTime time.Duration
|
||||
PostProcessTime time.Duration
|
||||
// TODO: have the prefetch metric in here as well?
|
||||
}
|
||||
|
||||
// ParallelStateProcessor is used to execute and verify blocks containing
|
||||
|
|
@ -198,15 +199,14 @@ type txExecResult struct {
|
|||
|
||||
// resultHandler polls until all transactions have finished executing and the
|
||||
// state root calculation is complete. The result is emitted on resCh.
|
||||
func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) {
|
||||
func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) {
|
||||
// 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd
|
||||
// 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL)
|
||||
var results []txExecResult
|
||||
var cumulativeStateGas, cumulativeRegularGas uint64
|
||||
var execErr error
|
||||
var numTxComplete int
|
||||
|
||||
accesses := preTxReads
|
||||
accesses := preTxAccesses
|
||||
|
||||
if len(block.Transactions()) > 0 {
|
||||
loop:
|
||||
|
|
@ -361,7 +361,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st
|
|||
)
|
||||
|
||||
startingState := statedb.Copy()
|
||||
preReads, err := p.processBlockPreTx(block, statedb, balReader, cfg)
|
||||
preTxReads, err := p.processBlockPreTx(block, statedb, balReader, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -371,7 +371,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st
|
|||
|
||||
// execute transactions and state root calculation in parallel
|
||||
tExecStart = time.Now()
|
||||
go p.resultHandler(block, preReads, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh)
|
||||
go p.resultHandler(block, preTxReads, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh)
|
||||
var workers errgroup.Group
|
||||
workers.SetLimit(runtime.NumCPU())
|
||||
for i, t := range block.Transactions() {
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ package state
|
|||
import (
|
||||
"maps"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
|
|
@ -41,11 +40,6 @@ type BALStateTransition struct {
|
|||
tries sync.Map //map[common.Address]Trie
|
||||
deletions map[common.Address]struct{}
|
||||
|
||||
accountDeleted int64
|
||||
accountUpdated int64
|
||||
storageDeleted atomic.Int64
|
||||
storageUpdated atomic.Int64
|
||||
|
||||
stateUpdate *stateUpdate
|
||||
|
||||
metrics BALStateTransitionMetrics
|
||||
|
|
@ -60,11 +54,10 @@ func (s *BALStateTransition) Metrics() *BALStateTransitionMetrics {
|
|||
|
||||
type BALStateTransitionMetrics struct {
|
||||
// trie hashing metrics
|
||||
AccountUpdate time.Duration
|
||||
StatePrefetch time.Duration
|
||||
StateUpdate time.Duration
|
||||
StateHash time.Duration
|
||||
OriginStorageLoadTime time.Duration
|
||||
AccountUpdate time.Duration
|
||||
StatePrefetch time.Duration
|
||||
StateUpdate time.Duration
|
||||
StateHash time.Duration
|
||||
|
||||
// commit metrics
|
||||
AccountCommits time.Duration
|
||||
|
|
@ -341,10 +334,15 @@ func (s *BALStateTransition) CommitWithUpdate(block uint64, deleteEmptyObjects b
|
|||
return common.Hash{}, nil, err
|
||||
}
|
||||
|
||||
accountUpdatedMeter.Mark(s.accountUpdated)
|
||||
storageUpdatedMeter.Mark(s.storageUpdated.Load())
|
||||
accountDeletedMeter.Mark(s.accountDeleted)
|
||||
storageDeletedMeter.Mark(s.storageDeleted.Load())
|
||||
/*
|
||||
TODO: derive these from the BAL
|
||||
^ I think even then, there is a semantic difference with how these metrics were calculated previously
|
||||
I don't know if it makes sense to recompute those, or just derive new ones from the BAL
|
||||
accountUpdatedMeter.Mark(int64(s.accountUpdated))
|
||||
storageUpdatedMeter.Mark(s.storageUpdated.Load())
|
||||
accountDeletedMeter.Mark(int64(s.accountDeleted))
|
||||
storageDeletedMeter.Mark(s.storageDeleted.Load())
|
||||
*/
|
||||
accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated))
|
||||
accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted))
|
||||
storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated))
|
||||
|
|
@ -424,12 +422,8 @@ func (s *BALStateTransition) IntermediateRoot(_ bool) common.Hash {
|
|||
if val != (common.Hash{}) {
|
||||
updateKeys = append(updateKeys, key[:])
|
||||
updateValues = append(updateValues, common.TrimLeftZeroes(val[:]))
|
||||
|
||||
s.storageUpdated.Add(1)
|
||||
} else {
|
||||
deleteKeys = append(deleteKeys, key[:])
|
||||
|
||||
s.storageDeleted.Add(1)
|
||||
}
|
||||
}
|
||||
if err := tr.UpdateStorageBatch(address, updateKeys, updateValues); err != nil {
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ package state
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/overlay"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
|
|
@ -241,7 +240,7 @@ func (db *CachingDB) ReadersWithCacheStats(stateRoot common.Hash) (Reader, Reade
|
|||
}
|
||||
|
||||
// ReaderEIP7928 creates a state reader with the manner of Block-level accessList.
|
||||
func (db *CachingDB) ReaderEIP7928(stateRoot common.Hash, accessList map[common.Address][]common.Hash, threads int) (Reader, error) {
|
||||
func (db *CachingDB) ReaderEIP7928(stateRoot common.Hash, accessList map[common.Address][]common.Hash, threads int, block bool) (Reader, error) {
|
||||
base, err := db.StateReader(stateRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -251,8 +250,13 @@ func (db *CachingDB) ReaderEIP7928(stateRoot common.Hash, accessList map[common.
|
|||
|
||||
// Construct the state reader with background prefetching
|
||||
pr := newPrefetchStateReader(r, accessList, threads)
|
||||
if block {
|
||||
if err := pr.Wait(); err != nil {
|
||||
panic("unreachable")
|
||||
}
|
||||
}
|
||||
|
||||
return newReader(db.codedb.Reader(), pr), nil
|
||||
return newReaderWithPrefetch(db.codedb.Reader(), pr, pr), nil
|
||||
}
|
||||
|
||||
// OpenTrie opens the main account trie at a specific root hash.
|
||||
|
|
|
|||
|
|
@ -18,9 +18,6 @@ package state
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/overlay"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
|
|
@ -31,6 +28,8 @@ import (
|
|||
"github.com/ethereum/go-ethereum/trie/transitiontrie"
|
||||
"github.com/ethereum/go-ethereum/triedb"
|
||||
"github.com/ethereum/go-ethereum/triedb/database"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// ContractCodeReader defines the interface for accessing contract code.
|
||||
|
|
@ -530,6 +529,7 @@ func (r *stateReaderWithStats) GetStateStats() StateReaderStats {
|
|||
type reader struct {
|
||||
ContractCodeReader
|
||||
StateReader
|
||||
PrefetcherMetricer
|
||||
}
|
||||
|
||||
// newReader constructs a reader with the supplied code reader and state reader.
|
||||
|
|
@ -540,6 +540,14 @@ func newReader(codeReader ContractCodeReader, stateReader StateReader) *reader {
|
|||
}
|
||||
}
|
||||
|
||||
func newReaderWithPrefetch(codeReader ContractCodeReader, stateReader StateReader, metricer PrefetcherMetricer) *reader {
|
||||
return &reader{
|
||||
ContractCodeReader: codeReader,
|
||||
StateReader: stateReader,
|
||||
PrefetcherMetricer: metricer,
|
||||
}
|
||||
}
|
||||
|
||||
// GetCodeStats returns the statistics of code access.
|
||||
func (r *reader) GetCodeStats() ContractCodeReaderStats {
|
||||
if stater, ok := r.ContractCodeReader.(ContractCodeReaderStater); ok {
|
||||
|
|
|
|||
|
|
@ -64,6 +64,7 @@ package state
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
|
||||
|
|
@ -86,6 +87,17 @@ type prefetchStateReader struct {
|
|||
done chan struct{}
|
||||
term chan struct{}
|
||||
closeOnce sync.Once
|
||||
start time.Time
|
||||
metrics PrefetchMetrics
|
||||
}
|
||||
|
||||
type PrefetchMetrics struct {
|
||||
// the total amount of time it took to complete the scheduled workload
|
||||
Elapsed time.Duration
|
||||
}
|
||||
|
||||
type PrefetcherMetricer interface {
|
||||
Metrics() PrefetchMetrics
|
||||
}
|
||||
|
||||
func newPrefetchStateReader(reader StateReader, accessList bal.StorageKeys, nThreads int) *prefetchStateReader {
|
||||
|
|
@ -106,11 +118,17 @@ func newPrefetchStateReaderInternal(reader StateReader, tasks []*fetchTask, nThr
|
|||
nThreads: nThreads,
|
||||
done: make(chan struct{}),
|
||||
term: make(chan struct{}),
|
||||
start: time.Now(),
|
||||
}
|
||||
go r.prefetch()
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *prefetchStateReader) Metrics() PrefetchMetrics {
|
||||
// TODO (jwasinger) actually implement this
|
||||
return PrefetchMetrics{}
|
||||
}
|
||||
|
||||
func (r *prefetchStateReader) Close() {
|
||||
r.closeOnce.Do(func() {
|
||||
close(r.term)
|
||||
|
|
@ -128,7 +146,10 @@ func (r *prefetchStateReader) Wait() error {
|
|||
}
|
||||
|
||||
func (r *prefetchStateReader) prefetch() {
|
||||
defer close(r.done)
|
||||
defer func() {
|
||||
r.metrics = PrefetchMetrics{time.Since(r.start)}
|
||||
close(r.done)
|
||||
}()
|
||||
|
||||
if len(r.tasks) == 0 {
|
||||
return
|
||||
|
|
|
|||
|
|
@ -209,10 +209,10 @@ func TestReaderWithTracker(t *testing.T) {
|
|||
// transactions read without hitting the reader, causing the BAL to be incomplete.
|
||||
func TestTrackerSurvivesStateDBCache(t *testing.T) {
|
||||
var (
|
||||
sdb = NewDatabaseForTesting()
|
||||
statedb, _ = New(types.EmptyRootHash, sdb)
|
||||
addr = common.HexToAddress("0xaaaa")
|
||||
slot = common.HexToHash("0x01")
|
||||
sdb = NewDatabaseForTesting()
|
||||
statedb, _ = New(types.EmptyRootHash, sdb)
|
||||
addr = common.HexToAddress("0xaaaa")
|
||||
slot = common.HexToHash("0x01")
|
||||
)
|
||||
// Set up committed state with one account that has a storage slot.
|
||||
statedb.SetBalance(addr, uint256.NewInt(1e18), tracing.BalanceChangeUnspecified)
|
||||
|
|
|
|||
|
|
@ -162,6 +162,8 @@ func (t *BlockTest) createTestBlockChain(config *params.ChainConfig, snapshotter
|
|||
},
|
||||
StatelessSelfValidation: witness,
|
||||
NoPrefetch: true,
|
||||
BlockingPrefetch: true,
|
||||
PrefetchWorkers: 100, // note: this is totally unrelated to NoPrefetch, just for BAL execution
|
||||
}
|
||||
if snapshotter {
|
||||
options.SnapshotLimit = 1
|
||||
|
|
|
|||
Loading…
Reference in a new issue