From aa13b208b1e768e99bf6881154191ab4ba2acbd3 Mon Sep 17 00:00:00 2001 From: CPerezz Date: Fri, 1 May 2026 10:40:40 +0200 Subject: [PATCH] core: deduplicate CodeLoaded/CodeLoadBytes for BAL blocks The previous follow-up note: per-tx + pre-tx + post-tx StateDBs each have their own stateObjects, so summing CodeLoaded/CodeLoadBytes over-counts contracts whose code body was fetched by multiple phases. Fix: snapshot per-StateDB the {address: codeLen} map of contracts whose s.code is populated, plumb through the existing aggregation pipeline, dedupe by address in resultHandler/prepareExecResult. The merged map's size and value-sum become CodeLoaded and CodeLoadBytes respectively, overriding the per-tx-summed values at the wiring site. Empirical: a 3-tx block touching the same set of system contracts now reports code=4, code_bytes=1098 (matches single-tx baseline) instead of code=12, code_bytes=3294 under the prior over-count. --- core/blockchain.go | 9 ++--- core/parallel_state_processor.go | 57 ++++++++++++++++++++++++++------ core/state/statedb.go | 19 +++++++++++ 3 files changed, 70 insertions(+), 15 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 8791a0b43b..82e9c35719 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -653,16 +653,17 @@ func (bc *BlockChain) processBlockWithAccessList(parentRoot common.Hash, block * var stats ExecuteStats // Counts: write counts come from the BAL state transition; read counts - // for accounts/storage come from the BAL access list itself (deduplicated). - // CodeLoaded/CodeLoadBytes still sum per-tx worker contributions because - // the BAL doesn't track code-fetch events distinctly — accept slight - // over-counting there. + // for accounts/storage come from the BAL access list (deduplicated); + // code-load counts come from a deduplicated address set tracked across + // all phase StateDBs by the parallel processor. stats.StateCounts = res.Counts stats.StateCounts.Add(stateTransition.WriteCounts()) if al := block.AccessList(); al != nil { stats.StateCounts.AccountLoaded = al.UniqueAccountCount() stats.StateCounts.StorageLoaded = al.UniqueStorageSlotCount() } + stats.StateCounts.CodeLoaded = res.CodeLoaded + stats.StateCounts.CodeLoadBytes = res.CodeLoadBytes // Time durations under parallel execution use wall-clock semantics. // Per-tx duration sums (CPU-time) are intentionally not plumbed: they diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 409d70cdb2..e65cee16f9 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -7,6 +7,7 @@ import ( "slices" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types/bal" @@ -31,6 +32,11 @@ type ProcessResultWithMetrics struct { // Reads is the sum of per-StateDB read times across pre-tx, per-tx and // post-tx phases. Sum-of-CPU-time, not wall-clock. Reads state.ReadDurations + // CodeLoaded is the deduplicated count of unique contract addresses whose + // code body was fetched during the block (across all phase StateDBs). + // CodeLoadBytes is the sum of those code lengths. + CodeLoaded int + CodeLoadBytes int } // ParallelStateProcessor is used to execute and verify blocks containing @@ -79,7 +85,7 @@ func validateStateAccesses(lastIdx int, accessList bal.AccessListReader, localAc // performs post-tx state transition (system contracts and withdrawals) // and calculates the ProcessResult, returning it to be sent on resCh // by resultHandler -func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, accesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, results []txExecResult, aggCounts state.StateCounts, aggReads state.ReadDurations) *ProcessResultWithMetrics { +func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, accesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, results []txExecResult, aggCounts state.StateCounts, aggReads state.ReadDurations, aggCodeLoads map[common.Address]int) *ProcessResultWithMetrics { tExec := time.Since(tExecStart) var requests [][]byte tPostprocessStart := time.Now() @@ -180,9 +186,20 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar // Fold post-tx statedb counts and reads into the aggregate. postTxState is // local and would otherwise be discarded; this captures system-contract - // reads (withdrawal queue, consolidation queue) and engine.Finalize. + // activity (withdrawal queue, consolidation queue) and engine.Finalize. aggCounts.Add(postTxState.SnapshotCounts()) aggReads.Add(postTxState.SnapshotReads()) + for addr, l := range postTxState.SnapshotCodeLoads() { + if _, ok := aggCodeLoads[addr]; !ok { + aggCodeLoads[addr] = l + } + } + + codeLoaded := len(aggCodeLoads) + var codeLoadBytes int + for _, l := range aggCodeLoads { + codeLoadBytes += l + } return &ProcessResultWithMetrics{ ProcessResult: &ProcessResult{ @@ -195,6 +212,8 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar ExecTime: tExec, Counts: aggCounts, Reads: aggReads, + CodeLoaded: codeLoaded, + CodeLoadBytes: codeLoadBytes, } } @@ -216,11 +235,14 @@ type txExecResult struct { // resultHandler. counts state.StateCounts reads state.ReadDurations + // codeLoads is addr→codeLen for contracts whose code body was fetched + // in this tx. Deduped across all phases in resultHandler. + codeLoads map[common.Address]int } // resultHandler polls until all transactions have finished executing and the // state root calculation is complete. The result is emitted on resCh. -func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses bal.StateAccesses, preCounts state.StateCounts, preReads state.ReadDurations, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) { +func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses bal.StateAccesses, preCounts state.StateCounts, preReads state.ReadDurations, preCodeLoads map[common.Address]int, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) { // 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd // 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL) var results []txExecResult @@ -233,6 +255,13 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses accesses := preTxAccesses aggCounts := preCounts aggReads := preReads + // Dedup'd map of contract addresses whose code body was fetched by any + // phase StateDB. Address-keyed so multiple phases adding the same contract + // only count it once. + aggCodeLoads := make(map[common.Address]int) + for addr, l := range preCodeLoads { + aggCodeLoads[addr] = l + } if len(block.Transactions()) > 0 { loop: @@ -252,6 +281,11 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses accesses.Merge(res.stateReads) aggCounts.Add(res.counts) aggReads.Add(res.reads) + for addr, l := range res.codeLoads { + if _, ok := aggCodeLoads[addr]; !ok { + aggCodeLoads[addr] = l + } + } } } if numTxComplete == len(block.Transactions()) { @@ -268,7 +302,7 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses } } - execResults := p.prepareExecResult(block, tExecStart, accesses, statedb, prefetchReader, results, aggCounts, aggReads) + execResults := p.prepareExecResult(block, tExecStart, accesses, statedb, prefetchReader, results, aggCounts, aggReads, aggCodeLoads) rootCalcRes := <-stateRootCalcResCh if execResults.ProcessResult.Error != nil { @@ -344,10 +378,11 @@ func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transactio stateReads: db.Reader().(state.StateReaderTracker).GetStateAccessList(), counts: db.SnapshotCounts(), reads: db.SnapshotReads(), + codeLoads: db.SnapshotCodeLoads(), } } -func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, prefetchReader state.Reader, cfg vm.Config) (bal.StateAccesses, state.StateCounts, state.ReadDurations, error) { +func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, prefetchReader state.Reader, cfg vm.Config) (bal.StateAccesses, state.StateCounts, state.ReadDurations, map[common.Address]int, error) { var ( header = block.Header() ) @@ -369,12 +404,12 @@ func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb * mutations.Merge(pbhMutations) reads := readerWithTracker.(state.StateReaderTracker).GetStateAccessList() if !accessList.MutationsAt(0).Eq(mutations) { - return nil, state.StateCounts{}, state.ReadDurations{}, fmt.Errorf("invalid block access list: mismatch between local/remote access list mutations at idx 0") + return nil, state.StateCounts{}, state.ReadDurations{}, nil, fmt.Errorf("invalid block access list: mismatch between local/remote access list mutations at idx 0") } - // Snapshot the pre-tx statedb's counts and read-times so system-contract - // reads/writes (BeaconRoot, ParentBlockHash) contribute to the aggregate; + // Snapshot the pre-tx statedb's counts/reads/code-loads so system-contract + // activity (BeaconRoot, ParentBlockHash) contributes to the aggregate; // sdb is local and would otherwise be discarded. - return reads, sdb.SnapshotCounts(), sdb.SnapshotReads(), nil + return reads, sdb.SnapshotCounts(), sdb.SnapshotReads(), sdb.SnapshotCodeLoads(), nil } // Process performs EVM execution and state root computation for a block which is known @@ -394,7 +429,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st ) startingState := statedb.Copy() - preTxReads, preCounts, preReads, err := p.processBlockPreTx(block, statedb, balReader, cfg) + preTxReads, preCounts, preReads, preCodeLoads, err := p.processBlockPreTx(block, statedb, balReader, cfg) if err != nil { return nil, err } @@ -404,7 +439,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st // execute transactions and state root calculation in parallel tExecStart = time.Now() - go p.resultHandler(block, preTxReads, preCounts, preReads, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh) + go p.resultHandler(block, preTxReads, preCounts, preReads, preCodeLoads, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh) var workers errgroup.Group workers.SetLimit(runtime.NumCPU()) for i, t := range block.Transactions() { diff --git a/core/state/statedb.go b/core/state/statedb.go index 938c350a52..6719182e13 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -251,6 +251,25 @@ func (s *StateDB) SnapshotReads() ReadDurations { } } +// SnapshotCodeLoads returns the addresses whose contract code body was +// fetched during this StateDB's lifetime, mapped to byte length. Used by the +// BAL parallel pipeline to deduplicate code-load events across phase StateDBs. +func (s *StateDB) SnapshotCodeLoads() map[common.Address]int { + if len(s.stateObjects) == 0 { + return nil + } + var m map[common.Address]int + for addr, obj := range s.stateObjects { + if l := len(obj.code); l > 0 { + if m == nil { + m = make(map[common.Address]int) + } + m[addr] = l + } + } + return m +} + // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot.