diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index ea768775ae..60e30a44f9 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -23,6 +23,15 @@ type ProcessResultWithMetrics struct { // the time it took to execute all txs in the block ExecTime time.Duration PostProcessTime time.Duration + // Counts is the aggregate of per-tx, pre-tx and post-tx state-mutation + // counts harvested from each worker statedb. Plain-int snapshot type; + // safe to copy. + Counts state.StateCounts + // Per-tx state-read durations summed across parallel workers + pre-tx + // + post-tx statedbs. Sum-of-CPU-time semantics; not wall-clock. + PerTxAccountReads time.Duration + PerTxStorageReads time.Duration + PerTxCodeReads time.Duration } // ParallelStateProcessor is used to execute and verify blocks containing @@ -71,7 +80,7 @@ func validateStateAccesses(lastIdx int, accessList bal.AccessListReader, localAc // performs post-tx state transition (system contracts and withdrawals) // and calculates the ProcessResult, returning it to be sent on resCh // by resultHandler -func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, accesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, results []txExecResult) *ProcessResultWithMetrics { +func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, accesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, results []txExecResult, aggCounts state.StateCounts, aggAccountReads, aggStorageReads, aggCodeReads time.Duration) *ProcessResultWithMetrics { tExec := time.Since(tExecStart) var requests [][]byte tPostprocessStart := time.Now() @@ -170,6 +179,18 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar tPostprocess := time.Since(tPostprocessStart) + // Fold post-tx statedb counts into the aggregate. postTxState is local and + // would otherwise be discarded; this captures system-contract reads and + // the engine.Finalize state mutations. + postTxCounts := postTxState.SnapshotCounts() + aggCounts.Add(&postTxCounts) + + // Fold post-tx statedb reads into the aggregate (system contracts, + // withdrawal queue, consolidation queue). + aggAccountReads += postTxState.AccountReads + aggStorageReads += postTxState.StorageReads + aggCodeReads += postTxState.CodeReads + return &ProcessResultWithMetrics{ ProcessResult: &ProcessResult{ Receipts: allReceipts, @@ -177,8 +198,12 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar Logs: allLogs, GasUsed: blockGasUsed, }, - PostProcessTime: tPostprocess, - ExecTime: tExec, + PostProcessTime: tPostprocess, + ExecTime: tExec, + Counts: aggCounts, + PerTxAccountReads: aggAccountReads, + PerTxStorageReads: aggStorageReads, + PerTxCodeReads: aggCodeReads, } } @@ -194,11 +219,21 @@ type txExecResult struct { txState uint64 stateReads bal.StateAccesses + + // Per-tx state-mutation counts, snapshotted from this tx's worker + // statedb just before send. Aggregated single-threaded in resultHandler. + counts state.StateCounts + + // Per-tx state-read durations (auto-populated on the per-tx StateDB during + // execution; snapshot before the worker discards the statedb). + accountReads time.Duration + storageReads time.Duration + codeReads time.Duration } // resultHandler polls until all transactions have finished executing and the // state root calculation is complete. The result is emitted on resCh. -func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) { +func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads bal.StateAccesses, preCounts state.StateCounts, preAR, preSR, preCR time.Duration, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) { // 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd // 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL) var results []txExecResult @@ -207,6 +242,14 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads ba var numTxComplete int accesses := preTxReads + // aggCounts seeds with the pre-tx contribution (BeaconRoot, ParentBlockHash); + // per-tx counts are folded in below; post-tx is folded in prepareExecResult. + aggCounts := preCounts + // Read durations seeded with pre-tx contribution; per-tx folded in + // below; post-tx folded in prepareExecResult. + aggAccountReads := preAR + aggStorageReads := preSR + aggCodeReads := preCR if len(block.Transactions()) > 0 { loop: @@ -224,6 +267,10 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads ba cumulativeStateGas += res.txState results = append(results, res) accesses.Merge(res.stateReads) + aggCounts.Add(&res.counts) + aggAccountReads += res.accountReads + aggStorageReads += res.storageReads + aggCodeReads += res.codeReads } } if numTxComplete == len(block.Transactions()) { @@ -240,7 +287,7 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxReads ba } } - execResults := p.prepareExecResult(block, tExecStart, accesses, statedb, prefetchReader, results) + execResults := p.prepareExecResult(block, tExecStart, accesses, statedb, prefetchReader, results, aggCounts, aggAccountReads, aggStorageReads, aggCodeReads) rootCalcRes := <-stateRootCalcResCh if execResults.ProcessResult.Error != nil { @@ -307,17 +354,21 @@ func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transactio txRegular, txState := gp.AmsterdamDimensions() return &txExecResult{ - idx: balIdx, - receipt: receipt, - execGas: receipt.GasUsed, - blockGas: gp.Used(), - txRegular: txRegular, - txState: txState, - stateReads: db.Reader().(state.StateReaderTracker).GetStateAccessList(), + idx: balIdx, + receipt: receipt, + execGas: receipt.GasUsed, + blockGas: gp.Used(), + txRegular: txRegular, + txState: txState, + stateReads: db.Reader().(state.StateReaderTracker).GetStateAccessList(), + counts: db.SnapshotCounts(), + accountReads: db.AccountReads, + storageReads: db.StorageReads, + codeReads: db.CodeReads, } } -func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, prefetchReader state.Reader, cfg vm.Config) (bal.StateAccesses, error) { +func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, prefetchReader state.Reader, cfg vm.Config) (bal.StateAccesses, state.StateCounts, time.Duration, time.Duration, time.Duration, error) { var ( header = block.Header() ) @@ -339,9 +390,12 @@ func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb * mutations.Merge(pbhMutations) reads := readerWithTracker.(state.StateReaderTracker).GetStateAccessList() if !accessList.MutationsAt(0).Eq(mutations) { - return nil, fmt.Errorf("invalid block access list: mismatch between local/remote access list mutations at idx 0") + return nil, state.StateCounts{}, 0, 0, 0, fmt.Errorf("invalid block access list: mismatch between local/remote access list mutations at idx 0") } - return reads, nil + // Snapshot the pre-tx statedb's counts and read-times so system-contract + // reads/writes (BeaconRoot, ParentBlockHash) contribute to the aggregate; + // sdb is local and would otherwise be discarded. + return reads, sdb.SnapshotCounts(), sdb.AccountReads, sdb.StorageReads, sdb.CodeReads, nil } // Process performs EVM execution and state root computation for a block which is known @@ -361,7 +415,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st ) startingState := statedb.Copy() - preReads, err := p.processBlockPreTx(block, statedb, balReader, cfg) + preReads, preCounts, preAR, preSR, preCR, err := p.processBlockPreTx(block, statedb, balReader, cfg) if err != nil { return nil, err } @@ -371,7 +425,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st // execute transactions and state root calculation in parallel tExecStart = time.Now() - go p.resultHandler(block, preReads, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh) + go p.resultHandler(block, preReads, preCounts, preAR, preSR, preCR, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh) var workers errgroup.Group workers.SetLimit(runtime.NumCPU()) for i, t := range block.Transactions() {