From 13733390daab7834ee989bdf8bf2d96ed3e68e7d Mon Sep 17 00:00:00 2001 From: CPerezz Date: Fri, 1 May 2026 00:16:55 +0200 Subject: [PATCH] core: extract state.ReadDurations triple Replace the {Account, Storage, Code} time.Duration scalars threaded through ProcessResultWithMetrics, txExecResult, processBlockPreTx and resultHandler with a single ReadDurations struct + Add merge primitive. Same shape as StateCounts. Adds (*StateDB).SnapshotReads() helper at the boundary. --- core/blockchain.go | 6 +- core/parallel_state_processor.go | 101 ++++++++++++------------------- core/state/state_counts.go | 33 ++++++---- core/state/statedb.go | 10 +++ 4 files changed, 73 insertions(+), 77 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 17deeac42b..9912f4cb9c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -689,9 +689,9 @@ func (bc *BlockChain) processBlockWithAccessList(parentRoot common.Hash, block * prefetchAccountReads, prefetchStorageReads = pr.PrefetchReadTimes() } balAccountReads, balStorageReads := stateTransition.ReadTimes() - stats.AccountReads = res.PerTxAccountReads + prefetchAccountReads + balAccountReads - stats.StorageReads = res.PerTxStorageReads + prefetchStorageReads + balStorageReads - stats.CodeReads = res.PerTxCodeReads + stats.AccountReads = res.Reads.Account + prefetchAccountReads + balAccountReads + stats.StorageReads = res.Reads.Storage + prefetchStorageReads + balStorageReads + stats.CodeReads = res.Reads.Code // Cache stats from the shared prefetch reader (accumulates centrally). if r, ok := prefetchReader.(state.ReaderStater); ok { diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 58a75ecca4..c24043a2c8 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -24,14 +24,11 @@ type ProcessResultWithMetrics struct { ExecTime time.Duration PostProcessTime time.Duration // Counts is the aggregate of per-tx, pre-tx and post-tx state-mutation - // counts harvested from each worker statedb. Plain-int snapshot type; - // safe to copy. + // counts. Plain-int snapshot, safe to copy. Counts state.StateCounts - // Per-tx state-read durations summed across parallel workers + pre-tx - // + post-tx statedbs. Sum-of-CPU-time semantics; not wall-clock. - PerTxAccountReads time.Duration - PerTxStorageReads time.Duration - PerTxCodeReads time.Duration + // Reads is the aggregate of per-tx, pre-tx and post-tx state-read times. + // Sum-of-CPU-time, not wall-clock. + Reads state.ReadDurations } // ParallelStateProcessor is used to execute and verify blocks containing @@ -80,7 +77,7 @@ func validateStateAccesses(lastIdx int, accessList bal.AccessListReader, localAc // performs post-tx state transition (system contracts and withdrawals) // and calculates the ProcessResult, returning it to be sent on resCh // by resultHandler -func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, accesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, results []txExecResult, aggCounts state.StateCounts, aggAccountReads, aggStorageReads, aggCodeReads time.Duration) *ProcessResultWithMetrics { +func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, accesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, results []txExecResult, aggCounts state.StateCounts, aggReads state.ReadDurations) *ProcessResultWithMetrics { tExec := time.Since(tExecStart) var requests [][]byte tPostprocessStart := time.Now() @@ -179,17 +176,11 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar tPostprocess := time.Since(tPostprocessStart) - // Fold post-tx statedb counts into the aggregate. postTxState is local and - // would otherwise be discarded; this captures system-contract reads and - // the engine.Finalize state mutations. - postTxCounts := postTxState.SnapshotCounts() - aggCounts.Add(postTxCounts) - - // Fold post-tx statedb reads into the aggregate (system contracts, - // withdrawal queue, consolidation queue). - aggAccountReads += postTxState.AccountReads - aggStorageReads += postTxState.StorageReads - aggCodeReads += postTxState.CodeReads + // Fold post-tx statedb counts and reads into the aggregate. postTxState is + // local and would otherwise be discarded; this captures system-contract + // reads (withdrawal queue, consolidation queue) and engine.Finalize. + aggCounts.Add(postTxState.SnapshotCounts()) + aggReads.Add(postTxState.SnapshotReads()) return &ProcessResultWithMetrics{ ProcessResult: &ProcessResult{ @@ -198,12 +189,10 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar Logs: allLogs, GasUsed: blockGasUsed, }, - PostProcessTime: tPostprocess, - ExecTime: tExec, - Counts: aggCounts, - PerTxAccountReads: aggAccountReads, - PerTxStorageReads: aggStorageReads, - PerTxCodeReads: aggCodeReads, + PostProcessTime: tPostprocess, + ExecTime: tExec, + Counts: aggCounts, + Reads: aggReads, } } @@ -220,20 +209,16 @@ type txExecResult struct { stateReads bal.StateAccesses - // Per-tx state-mutation counts, snapshotted from this tx's worker - // statedb just before send. Aggregated single-threaded in resultHandler. + // Per-tx state-mutation counts and read durations, snapshotted from the + // worker statedb just before send. Aggregated single-threaded in + // resultHandler. counts state.StateCounts - - // Per-tx state-read durations (auto-populated on the per-tx StateDB during - // execution; snapshot before the worker discards the statedb). - accountReads time.Duration - storageReads time.Duration - codeReads time.Duration + reads state.ReadDurations } // resultHandler polls until all transactions have finished executing and the // state root calculation is complete. The result is emitted on resCh. -func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads bal.StateAccesses, preCounts state.StateCounts, preAR, preSR, preCR time.Duration, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) { +func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses bal.StateAccesses, preCounts state.StateCounts, preReads state.ReadDurations, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) { // 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd // 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL) var results []txExecResult @@ -241,15 +226,11 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads ba var execErr error var numTxComplete int - accesses := preTxReads - // aggCounts seeds with the pre-tx contribution (BeaconRoot, ParentBlockHash); - // per-tx counts are folded in below; post-tx is folded in prepareExecResult. + // Seed aggregates with the pre-tx contribution (BeaconRoot, ParentBlockHash). + // Per-tx fold below; post-tx fold in prepareExecResult. + accesses := preTxAccesses aggCounts := preCounts - // Read durations seeded with pre-tx contribution; per-tx folded in - // below; post-tx folded in prepareExecResult. - aggAccountReads := preAR - aggStorageReads := preSR - aggCodeReads := preCR + aggReads := preReads if len(block.Transactions()) > 0 { loop: @@ -268,9 +249,7 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads ba results = append(results, res) accesses.Merge(res.stateReads) aggCounts.Add(res.counts) - aggAccountReads += res.accountReads - aggStorageReads += res.storageReads - aggCodeReads += res.codeReads + aggReads.Add(res.reads) } } if numTxComplete == len(block.Transactions()) { @@ -287,7 +266,7 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads ba } } - execResults := p.prepareExecResult(block, tExecStart, accesses, statedb, prefetchReader, results, aggCounts, aggAccountReads, aggStorageReads, aggCodeReads) + execResults := p.prepareExecResult(block, tExecStart, accesses, statedb, prefetchReader, results, aggCounts, aggReads) rootCalcRes := <-stateRootCalcResCh if execResults.ProcessResult.Error != nil { @@ -354,21 +333,19 @@ func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transactio txRegular, txState := gp.AmsterdamDimensions() return &txExecResult{ - idx: balIdx, - receipt: receipt, - execGas: receipt.GasUsed, - blockGas: gp.Used(), - txRegular: txRegular, - txState: txState, - stateReads: db.Reader().(state.StateReaderTracker).GetStateAccessList(), - counts: db.SnapshotCounts(), - accountReads: db.AccountReads, - storageReads: db.StorageReads, - codeReads: db.CodeReads, + idx: balIdx, + receipt: receipt, + execGas: receipt.GasUsed, + blockGas: gp.Used(), + txRegular: txRegular, + txState: txState, + stateReads: db.Reader().(state.StateReaderTracker).GetStateAccessList(), + counts: db.SnapshotCounts(), + reads: db.SnapshotReads(), } } -func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, prefetchReader state.Reader, cfg vm.Config) (bal.StateAccesses, state.StateCounts, time.Duration, time.Duration, time.Duration, error) { +func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, prefetchReader state.Reader, cfg vm.Config) (bal.StateAccesses, state.StateCounts, state.ReadDurations, error) { var ( header = block.Header() ) @@ -390,12 +367,12 @@ func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb * mutations.Merge(pbhMutations) reads := readerWithTracker.(state.StateReaderTracker).GetStateAccessList() if !accessList.MutationsAt(0).Eq(mutations) { - return nil, state.StateCounts{}, 0, 0, 0, fmt.Errorf("invalid block access list: mismatch between local/remote access list mutations at idx 0") + return nil, state.StateCounts{}, state.ReadDurations{}, fmt.Errorf("invalid block access list: mismatch between local/remote access list mutations at idx 0") } // Snapshot the pre-tx statedb's counts and read-times so system-contract // reads/writes (BeaconRoot, ParentBlockHash) contribute to the aggregate; // sdb is local and would otherwise be discarded. - return reads, sdb.SnapshotCounts(), sdb.AccountReads, sdb.StorageReads, sdb.CodeReads, nil + return reads, sdb.SnapshotCounts(), sdb.SnapshotReads(), nil } // Process performs EVM execution and state root computation for a block which is known @@ -415,7 +392,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st ) startingState := statedb.Copy() - preReads, preCounts, preAR, preSR, preCR, err := p.processBlockPreTx(block, statedb, balReader, cfg) + preTxReads, preCounts, preReads, err := p.processBlockPreTx(block, statedb, balReader, cfg) if err != nil { return nil, err } @@ -425,7 +402,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st // execute transactions and state root calculation in parallel tExecStart = time.Now() - go p.resultHandler(block, preReads, preCounts, preAR, preSR, preCR, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh) + go p.resultHandler(block, preTxReads, preCounts, preReads, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh) var workers errgroup.Group workers.SetLimit(runtime.NumCPU()) for i, t := range block.Transactions() { diff --git a/core/state/state_counts.go b/core/state/state_counts.go index b5c109a657..457dcbf78e 100644 --- a/core/state/state_counts.go +++ b/core/state/state_counts.go @@ -16,19 +16,28 @@ package state +import "time" + +// ReadDurations groups the {Account, Storage, Code} state-read times that are +// aggregated across pre-tx, per-tx and post-tx statedbs in the BAL parallel +// path. Sum-of-CPU-time, not wall-clock. +type ReadDurations struct { + Account time.Duration + Storage time.Duration + Code time.Duration +} + +// Add merges other into r. +func (r *ReadDurations) Add(other ReadDurations) { + r.Account += other.Account + r.Storage += other.Storage + r.Code += other.Code +} + // StateCounts holds count-only statistics gathered during a block's state -// transition. It is the snapshot/aggregation type: all fields are plain ints, -// safe to copy and pass by value through channels and struct fields. -// -// StateDB still uses atomic counters internally (for concurrent worker -// updates); the conversion to plain ints happens at the snapshot boundary -// in (*StateDB).SnapshotCounts. This separation keeps the live atomics -// scoped to the mutation surface and lets the rest of the pipeline use -// vet-clean value semantics. -// -// Only counts live here — time.Duration fields (AccountReads, StorageReads, -// etc.) stay on StateDB directly, since their parallel-execution semantics -// don't fit the simple Add merge pattern. +// transition. Plain-int snapshot type, safe to copy through channels. +// Atomic counters on StateDB are converted at the snapshot boundary in +// SnapshotCounts. Read durations live in ReadDurations (separate type). type StateCounts struct { AccountLoaded int // accounts retrieved from the database during the state transition AccountUpdated int // accounts updated during the state transition diff --git a/core/state/statedb.go b/core/state/statedb.go index 3eccf62c0b..938c350a52 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -241,6 +241,16 @@ func (s *StateDB) SnapshotCounts() StateCounts { } } +// SnapshotReads returns a value-copy of the {Account, Storage, Code} read +// durations accumulated on this StateDB. +func (s *StateDB) SnapshotReads() ReadDurations { + return ReadDurations{ + Account: s.AccountReads, + Storage: s.StorageReads, + Code: s.CodeReads, + } +} + // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot.