core: extract state.ReadDurations triple

Replace the {Account, Storage, Code} time.Duration scalars threaded through
ProcessResultWithMetrics, txExecResult, processBlockPreTx and resultHandler
with a single ReadDurations struct + Add merge primitive. Same shape as
StateCounts. Adds (*StateDB).SnapshotReads() helper at the boundary.
This commit is contained in:
CPerezz 2026-05-01 00:16:55 +02:00
parent 546d2b457e
commit 13733390da
No known key found for this signature in database
GPG key ID: 62045F34B97177DD
4 changed files with 73 additions and 77 deletions

View file

@ -689,9 +689,9 @@ func (bc *BlockChain) processBlockWithAccessList(parentRoot common.Hash, block *
prefetchAccountReads, prefetchStorageReads = pr.PrefetchReadTimes() prefetchAccountReads, prefetchStorageReads = pr.PrefetchReadTimes()
} }
balAccountReads, balStorageReads := stateTransition.ReadTimes() balAccountReads, balStorageReads := stateTransition.ReadTimes()
stats.AccountReads = res.PerTxAccountReads + prefetchAccountReads + balAccountReads stats.AccountReads = res.Reads.Account + prefetchAccountReads + balAccountReads
stats.StorageReads = res.PerTxStorageReads + prefetchStorageReads + balStorageReads stats.StorageReads = res.Reads.Storage + prefetchStorageReads + balStorageReads
stats.CodeReads = res.PerTxCodeReads stats.CodeReads = res.Reads.Code
// Cache stats from the shared prefetch reader (accumulates centrally). // Cache stats from the shared prefetch reader (accumulates centrally).
if r, ok := prefetchReader.(state.ReaderStater); ok { if r, ok := prefetchReader.(state.ReaderStater); ok {

View file

@ -24,14 +24,11 @@ type ProcessResultWithMetrics struct {
ExecTime time.Duration ExecTime time.Duration
PostProcessTime time.Duration PostProcessTime time.Duration
// Counts is the aggregate of per-tx, pre-tx and post-tx state-mutation // Counts is the aggregate of per-tx, pre-tx and post-tx state-mutation
// counts harvested from each worker statedb. Plain-int snapshot type; // counts. Plain-int snapshot, safe to copy.
// safe to copy.
Counts state.StateCounts Counts state.StateCounts
// Per-tx state-read durations summed across parallel workers + pre-tx // Reads is the aggregate of per-tx, pre-tx and post-tx state-read times.
// + post-tx statedbs. Sum-of-CPU-time semantics; not wall-clock. // Sum-of-CPU-time, not wall-clock.
PerTxAccountReads time.Duration Reads state.ReadDurations
PerTxStorageReads time.Duration
PerTxCodeReads time.Duration
} }
// ParallelStateProcessor is used to execute and verify blocks containing // ParallelStateProcessor is used to execute and verify blocks containing
@ -80,7 +77,7 @@ func validateStateAccesses(lastIdx int, accessList bal.AccessListReader, localAc
// performs post-tx state transition (system contracts and withdrawals) // performs post-tx state transition (system contracts and withdrawals)
// and calculates the ProcessResult, returning it to be sent on resCh // and calculates the ProcessResult, returning it to be sent on resCh
// by resultHandler // by resultHandler
func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, accesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, results []txExecResult, aggCounts state.StateCounts, aggAccountReads, aggStorageReads, aggCodeReads time.Duration) *ProcessResultWithMetrics { func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, accesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, results []txExecResult, aggCounts state.StateCounts, aggReads state.ReadDurations) *ProcessResultWithMetrics {
tExec := time.Since(tExecStart) tExec := time.Since(tExecStart)
var requests [][]byte var requests [][]byte
tPostprocessStart := time.Now() tPostprocessStart := time.Now()
@ -179,17 +176,11 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar
tPostprocess := time.Since(tPostprocessStart) tPostprocess := time.Since(tPostprocessStart)
// Fold post-tx statedb counts into the aggregate. postTxState is local and // Fold post-tx statedb counts and reads into the aggregate. postTxState is
// would otherwise be discarded; this captures system-contract reads and // local and would otherwise be discarded; this captures system-contract
// the engine.Finalize state mutations. // reads (withdrawal queue, consolidation queue) and engine.Finalize.
postTxCounts := postTxState.SnapshotCounts() aggCounts.Add(postTxState.SnapshotCounts())
aggCounts.Add(postTxCounts) aggReads.Add(postTxState.SnapshotReads())
// Fold post-tx statedb reads into the aggregate (system contracts,
// withdrawal queue, consolidation queue).
aggAccountReads += postTxState.AccountReads
aggStorageReads += postTxState.StorageReads
aggCodeReads += postTxState.CodeReads
return &ProcessResultWithMetrics{ return &ProcessResultWithMetrics{
ProcessResult: &ProcessResult{ ProcessResult: &ProcessResult{
@ -198,12 +189,10 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar
Logs: allLogs, Logs: allLogs,
GasUsed: blockGasUsed, GasUsed: blockGasUsed,
}, },
PostProcessTime: tPostprocess, PostProcessTime: tPostprocess,
ExecTime: tExec, ExecTime: tExec,
Counts: aggCounts, Counts: aggCounts,
PerTxAccountReads: aggAccountReads, Reads: aggReads,
PerTxStorageReads: aggStorageReads,
PerTxCodeReads: aggCodeReads,
} }
} }
@ -220,20 +209,16 @@ type txExecResult struct {
stateReads bal.StateAccesses stateReads bal.StateAccesses
// Per-tx state-mutation counts, snapshotted from this tx's worker // Per-tx state-mutation counts and read durations, snapshotted from the
// statedb just before send. Aggregated single-threaded in resultHandler. // worker statedb just before send. Aggregated single-threaded in
// resultHandler.
counts state.StateCounts counts state.StateCounts
reads state.ReadDurations
// Per-tx state-read durations (auto-populated on the per-tx StateDB during
// execution; snapshot before the worker discards the statedb).
accountReads time.Duration
storageReads time.Duration
codeReads time.Duration
} }
// resultHandler polls until all transactions have finished executing and the // resultHandler polls until all transactions have finished executing and the
// state root calculation is complete. The result is emitted on resCh. // state root calculation is complete. The result is emitted on resCh.
func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads bal.StateAccesses, preCounts state.StateCounts, preAR, preSR, preCR time.Duration, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) { func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses bal.StateAccesses, preCounts state.StateCounts, preReads state.ReadDurations, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) {
// 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd // 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd
// 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL) // 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL)
var results []txExecResult var results []txExecResult
@ -241,15 +226,11 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads ba
var execErr error var execErr error
var numTxComplete int var numTxComplete int
accesses := preTxReads // Seed aggregates with the pre-tx contribution (BeaconRoot, ParentBlockHash).
// aggCounts seeds with the pre-tx contribution (BeaconRoot, ParentBlockHash); // Per-tx fold below; post-tx fold in prepareExecResult.
// per-tx counts are folded in below; post-tx is folded in prepareExecResult. accesses := preTxAccesses
aggCounts := preCounts aggCounts := preCounts
// Read durations seeded with pre-tx contribution; per-tx folded in aggReads := preReads
// below; post-tx folded in prepareExecResult.
aggAccountReads := preAR
aggStorageReads := preSR
aggCodeReads := preCR
if len(block.Transactions()) > 0 { if len(block.Transactions()) > 0 {
loop: loop:
@ -268,9 +249,7 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads ba
results = append(results, res) results = append(results, res)
accesses.Merge(res.stateReads) accesses.Merge(res.stateReads)
aggCounts.Add(res.counts) aggCounts.Add(res.counts)
aggAccountReads += res.accountReads aggReads.Add(res.reads)
aggStorageReads += res.storageReads
aggCodeReads += res.codeReads
} }
} }
if numTxComplete == len(block.Transactions()) { if numTxComplete == len(block.Transactions()) {
@ -287,7 +266,7 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads ba
} }
} }
execResults := p.prepareExecResult(block, tExecStart, accesses, statedb, prefetchReader, results, aggCounts, aggAccountReads, aggStorageReads, aggCodeReads) execResults := p.prepareExecResult(block, tExecStart, accesses, statedb, prefetchReader, results, aggCounts, aggReads)
rootCalcRes := <-stateRootCalcResCh rootCalcRes := <-stateRootCalcResCh
if execResults.ProcessResult.Error != nil { if execResults.ProcessResult.Error != nil {
@ -354,21 +333,19 @@ func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transactio
txRegular, txState := gp.AmsterdamDimensions() txRegular, txState := gp.AmsterdamDimensions()
return &txExecResult{ return &txExecResult{
idx: balIdx, idx: balIdx,
receipt: receipt, receipt: receipt,
execGas: receipt.GasUsed, execGas: receipt.GasUsed,
blockGas: gp.Used(), blockGas: gp.Used(),
txRegular: txRegular, txRegular: txRegular,
txState: txState, txState: txState,
stateReads: db.Reader().(state.StateReaderTracker).GetStateAccessList(), stateReads: db.Reader().(state.StateReaderTracker).GetStateAccessList(),
counts: db.SnapshotCounts(), counts: db.SnapshotCounts(),
accountReads: db.AccountReads, reads: db.SnapshotReads(),
storageReads: db.StorageReads,
codeReads: db.CodeReads,
} }
} }
func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, prefetchReader state.Reader, cfg vm.Config) (bal.StateAccesses, state.StateCounts, time.Duration, time.Duration, time.Duration, error) { func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, prefetchReader state.Reader, cfg vm.Config) (bal.StateAccesses, state.StateCounts, state.ReadDurations, error) {
var ( var (
header = block.Header() header = block.Header()
) )
@ -390,12 +367,12 @@ func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *
mutations.Merge(pbhMutations) mutations.Merge(pbhMutations)
reads := readerWithTracker.(state.StateReaderTracker).GetStateAccessList() reads := readerWithTracker.(state.StateReaderTracker).GetStateAccessList()
if !accessList.MutationsAt(0).Eq(mutations) { if !accessList.MutationsAt(0).Eq(mutations) {
return nil, state.StateCounts{}, 0, 0, 0, fmt.Errorf("invalid block access list: mismatch between local/remote access list mutations at idx 0") return nil, state.StateCounts{}, state.ReadDurations{}, fmt.Errorf("invalid block access list: mismatch between local/remote access list mutations at idx 0")
} }
// Snapshot the pre-tx statedb's counts and read-times so system-contract // Snapshot the pre-tx statedb's counts and read-times so system-contract
// reads/writes (BeaconRoot, ParentBlockHash) contribute to the aggregate; // reads/writes (BeaconRoot, ParentBlockHash) contribute to the aggregate;
// sdb is local and would otherwise be discarded. // sdb is local and would otherwise be discarded.
return reads, sdb.SnapshotCounts(), sdb.AccountReads, sdb.StorageReads, sdb.CodeReads, nil return reads, sdb.SnapshotCounts(), sdb.SnapshotReads(), nil
} }
// Process performs EVM execution and state root computation for a block which is known // Process performs EVM execution and state root computation for a block which is known
@ -415,7 +392,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st
) )
startingState := statedb.Copy() startingState := statedb.Copy()
preReads, preCounts, preAR, preSR, preCR, err := p.processBlockPreTx(block, statedb, balReader, cfg) preTxReads, preCounts, preReads, err := p.processBlockPreTx(block, statedb, balReader, cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -425,7 +402,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st
// execute transactions and state root calculation in parallel // execute transactions and state root calculation in parallel
tExecStart = time.Now() tExecStart = time.Now()
go p.resultHandler(block, preReads, preCounts, preAR, preSR, preCR, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh) go p.resultHandler(block, preTxReads, preCounts, preReads, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh)
var workers errgroup.Group var workers errgroup.Group
workers.SetLimit(runtime.NumCPU()) workers.SetLimit(runtime.NumCPU())
for i, t := range block.Transactions() { for i, t := range block.Transactions() {

View file

@ -16,19 +16,28 @@
package state package state
import "time"
// ReadDurations groups the {Account, Storage, Code} state-read times that are
// aggregated across pre-tx, per-tx and post-tx statedbs in the BAL parallel
// path. Sum-of-CPU-time, not wall-clock.
type ReadDurations struct {
Account time.Duration
Storage time.Duration
Code time.Duration
}
// Add merges other into r.
func (r *ReadDurations) Add(other ReadDurations) {
r.Account += other.Account
r.Storage += other.Storage
r.Code += other.Code
}
// StateCounts holds count-only statistics gathered during a block's state // StateCounts holds count-only statistics gathered during a block's state
// transition. It is the snapshot/aggregation type: all fields are plain ints, // transition. Plain-int snapshot type, safe to copy through channels.
// safe to copy and pass by value through channels and struct fields. // Atomic counters on StateDB are converted at the snapshot boundary in
// // SnapshotCounts. Read durations live in ReadDurations (separate type).
// StateDB still uses atomic counters internally (for concurrent worker
// updates); the conversion to plain ints happens at the snapshot boundary
// in (*StateDB).SnapshotCounts. This separation keeps the live atomics
// scoped to the mutation surface and lets the rest of the pipeline use
// vet-clean value semantics.
//
// Only counts live here — time.Duration fields (AccountReads, StorageReads,
// etc.) stay on StateDB directly, since their parallel-execution semantics
// don't fit the simple Add merge pattern.
type StateCounts struct { type StateCounts struct {
AccountLoaded int // accounts retrieved from the database during the state transition AccountLoaded int // accounts retrieved from the database during the state transition
AccountUpdated int // accounts updated during the state transition AccountUpdated int // accounts updated during the state transition

View file

@ -241,6 +241,16 @@ func (s *StateDB) SnapshotCounts() StateCounts {
} }
} }
// SnapshotReads returns a value-copy of the {Account, Storage, Code} read
// durations accumulated on this StateDB.
func (s *StateDB) SnapshotReads() ReadDurations {
return ReadDurations{
Account: s.AccountReads,
Storage: s.StorageReads,
Code: s.CodeReads,
}
}
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the // state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot. // commit phase, most of the needed data is already hot.