core: aggregate per-tx state-read durations through parallel pipeline

This commit is contained in:
CPerezz 2026-04-30 12:20:15 +02:00
parent d419d91c44
commit 6730ab31e5
No known key found for this signature in database
GPG key ID: 62045F34B97177DD

View file

@ -23,6 +23,15 @@ type ProcessResultWithMetrics struct {
// the time it took to execute all txs in the block // the time it took to execute all txs in the block
ExecTime time.Duration ExecTime time.Duration
PostProcessTime time.Duration PostProcessTime time.Duration
// Counts is the aggregate of per-tx, pre-tx and post-tx state-mutation
// counts harvested from each worker statedb. Plain-int snapshot type;
// safe to copy.
Counts state.StateCounts
// Per-tx state-read durations summed across parallel workers + pre-tx
// + post-tx statedbs. Sum-of-CPU-time semantics; not wall-clock.
PerTxAccountReads time.Duration
PerTxStorageReads time.Duration
PerTxCodeReads time.Duration
} }
// ParallelStateProcessor is used to execute and verify blocks containing // ParallelStateProcessor is used to execute and verify blocks containing
@ -71,7 +80,7 @@ func validateStateAccesses(lastIdx int, accessList bal.AccessListReader, localAc
// performs post-tx state transition (system contracts and withdrawals) // performs post-tx state transition (system contracts and withdrawals)
// and calculates the ProcessResult, returning it to be sent on resCh // and calculates the ProcessResult, returning it to be sent on resCh
// by resultHandler // by resultHandler
func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, accesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, results []txExecResult) *ProcessResultWithMetrics { func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, accesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, results []txExecResult, aggCounts state.StateCounts, aggAccountReads, aggStorageReads, aggCodeReads time.Duration) *ProcessResultWithMetrics {
tExec := time.Since(tExecStart) tExec := time.Since(tExecStart)
var requests [][]byte var requests [][]byte
tPostprocessStart := time.Now() tPostprocessStart := time.Now()
@ -170,6 +179,18 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar
tPostprocess := time.Since(tPostprocessStart) tPostprocess := time.Since(tPostprocessStart)
// Fold post-tx statedb counts into the aggregate. postTxState is local and
// would otherwise be discarded; this captures system-contract reads and
// the engine.Finalize state mutations.
postTxCounts := postTxState.SnapshotCounts()
aggCounts.Add(&postTxCounts)
// Fold post-tx statedb reads into the aggregate (system contracts,
// withdrawal queue, consolidation queue).
aggAccountReads += postTxState.AccountReads
aggStorageReads += postTxState.StorageReads
aggCodeReads += postTxState.CodeReads
return &ProcessResultWithMetrics{ return &ProcessResultWithMetrics{
ProcessResult: &ProcessResult{ ProcessResult: &ProcessResult{
Receipts: allReceipts, Receipts: allReceipts,
@ -177,8 +198,12 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar
Logs: allLogs, Logs: allLogs,
GasUsed: blockGasUsed, GasUsed: blockGasUsed,
}, },
PostProcessTime: tPostprocess, PostProcessTime: tPostprocess,
ExecTime: tExec, ExecTime: tExec,
Counts: aggCounts,
PerTxAccountReads: aggAccountReads,
PerTxStorageReads: aggStorageReads,
PerTxCodeReads: aggCodeReads,
} }
} }
@ -194,11 +219,21 @@ type txExecResult struct {
txState uint64 txState uint64
stateReads bal.StateAccesses stateReads bal.StateAccesses
// Per-tx state-mutation counts, snapshotted from this tx's worker
// statedb just before send. Aggregated single-threaded in resultHandler.
counts state.StateCounts
// Per-tx state-read durations (auto-populated on the per-tx StateDB during
// execution; snapshot before the worker discards the statedb).
accountReads time.Duration
storageReads time.Duration
codeReads time.Duration
} }
// resultHandler polls until all transactions have finished executing and the // resultHandler polls until all transactions have finished executing and the
// state root calculation is complete. The result is emitted on resCh. // state root calculation is complete. The result is emitted on resCh.
func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) { func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads bal.StateAccesses, preCounts state.StateCounts, preAR, preSR, preCR time.Duration, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) {
// 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd // 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd
// 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL) // 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL)
var results []txExecResult var results []txExecResult
@ -207,6 +242,14 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads ba
var numTxComplete int var numTxComplete int
accesses := preTxReads accesses := preTxReads
// aggCounts seeds with the pre-tx contribution (BeaconRoot, ParentBlockHash);
// per-tx counts are folded in below; post-tx is folded in prepareExecResult.
aggCounts := preCounts
// Read durations seeded with pre-tx contribution; per-tx folded in
// below; post-tx folded in prepareExecResult.
aggAccountReads := preAR
aggStorageReads := preSR
aggCodeReads := preCR
if len(block.Transactions()) > 0 { if len(block.Transactions()) > 0 {
loop: loop:
@ -224,6 +267,10 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads ba
cumulativeStateGas += res.txState cumulativeStateGas += res.txState
results = append(results, res) results = append(results, res)
accesses.Merge(res.stateReads) accesses.Merge(res.stateReads)
aggCounts.Add(&res.counts)
aggAccountReads += res.accountReads
aggStorageReads += res.storageReads
aggCodeReads += res.codeReads
} }
} }
if numTxComplete == len(block.Transactions()) { if numTxComplete == len(block.Transactions()) {
@ -240,7 +287,7 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads ba
} }
} }
execResults := p.prepareExecResult(block, tExecStart, accesses, statedb, prefetchReader, results) execResults := p.prepareExecResult(block, tExecStart, accesses, statedb, prefetchReader, results, aggCounts, aggAccountReads, aggStorageReads, aggCodeReads)
rootCalcRes := <-stateRootCalcResCh rootCalcRes := <-stateRootCalcResCh
if execResults.ProcessResult.Error != nil { if execResults.ProcessResult.Error != nil {
@ -307,17 +354,21 @@ func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transactio
txRegular, txState := gp.AmsterdamDimensions() txRegular, txState := gp.AmsterdamDimensions()
return &txExecResult{ return &txExecResult{
idx: balIdx, idx: balIdx,
receipt: receipt, receipt: receipt,
execGas: receipt.GasUsed, execGas: receipt.GasUsed,
blockGas: gp.Used(), blockGas: gp.Used(),
txRegular: txRegular, txRegular: txRegular,
txState: txState, txState: txState,
stateReads: db.Reader().(state.StateReaderTracker).GetStateAccessList(), stateReads: db.Reader().(state.StateReaderTracker).GetStateAccessList(),
counts: db.SnapshotCounts(),
accountReads: db.AccountReads,
storageReads: db.StorageReads,
codeReads: db.CodeReads,
} }
} }
func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, prefetchReader state.Reader, cfg vm.Config) (bal.StateAccesses, error) { func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, prefetchReader state.Reader, cfg vm.Config) (bal.StateAccesses, state.StateCounts, time.Duration, time.Duration, time.Duration, error) {
var ( var (
header = block.Header() header = block.Header()
) )
@ -339,9 +390,12 @@ func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *
mutations.Merge(pbhMutations) mutations.Merge(pbhMutations)
reads := readerWithTracker.(state.StateReaderTracker).GetStateAccessList() reads := readerWithTracker.(state.StateReaderTracker).GetStateAccessList()
if !accessList.MutationsAt(0).Eq(mutations) { if !accessList.MutationsAt(0).Eq(mutations) {
return nil, fmt.Errorf("invalid block access list: mismatch between local/remote access list mutations at idx 0") return nil, state.StateCounts{}, 0, 0, 0, fmt.Errorf("invalid block access list: mismatch between local/remote access list mutations at idx 0")
} }
return reads, nil // Snapshot the pre-tx statedb's counts and read-times so system-contract
// reads/writes (BeaconRoot, ParentBlockHash) contribute to the aggregate;
// sdb is local and would otherwise be discarded.
return reads, sdb.SnapshotCounts(), sdb.AccountReads, sdb.StorageReads, sdb.CodeReads, nil
} }
// Process performs EVM execution and state root computation for a block which is known // Process performs EVM execution and state root computation for a block which is known
@ -361,7 +415,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st
) )
startingState := statedb.Copy() startingState := statedb.Copy()
preReads, err := p.processBlockPreTx(block, statedb, balReader, cfg) preReads, preCounts, preAR, preSR, preCR, err := p.processBlockPreTx(block, statedb, balReader, cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -371,7 +425,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st
// execute transactions and state root calculation in parallel // execute transactions and state root calculation in parallel
tExecStart = time.Now() tExecStart = time.Now()
go p.resultHandler(block, preReads, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh) go p.resultHandler(block, preReads, preCounts, preAR, preSR, preCR, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh)
var workers errgroup.Group var workers errgroup.Group
workers.SetLimit(runtime.NumCPU()) workers.SetLimit(runtime.NumCPU())
for i, t := range block.Transactions() { for i, t := range block.Transactions() {