mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-08 07:58:40 +00:00
core: deduplicate CodeLoaded/CodeLoadBytes for BAL blocks
The previous follow-up note: per-tx + pre-tx + post-tx StateDBs each
have their own stateObjects, so summing CodeLoaded/CodeLoadBytes
over-counts contracts whose code body was fetched by multiple phases.
Fix: snapshot per-StateDB the {address: codeLen} map of contracts whose
s.code is populated, plumb through the existing aggregation pipeline,
dedupe by address in resultHandler/prepareExecResult. The merged map's
size and value-sum become CodeLoaded and CodeLoadBytes respectively,
overriding the per-tx-summed values at the wiring site.
Empirical: a 3-tx block touching the same set of system contracts now
reports code=4, code_bytes=1098 (matches single-tx baseline) instead
of code=12, code_bytes=3294 under the prior over-count.
This commit is contained in:
parent
3d135baa36
commit
aa13b208b1
3 changed files with 70 additions and 15 deletions
|
|
@ -653,16 +653,17 @@ func (bc *BlockChain) processBlockWithAccessList(parentRoot common.Hash, block *
|
|||
var stats ExecuteStats
|
||||
|
||||
// Counts: write counts come from the BAL state transition; read counts
|
||||
// for accounts/storage come from the BAL access list itself (deduplicated).
|
||||
// CodeLoaded/CodeLoadBytes still sum per-tx worker contributions because
|
||||
// the BAL doesn't track code-fetch events distinctly — accept slight
|
||||
// over-counting there.
|
||||
// for accounts/storage come from the BAL access list (deduplicated);
|
||||
// code-load counts come from a deduplicated address set tracked across
|
||||
// all phase StateDBs by the parallel processor.
|
||||
stats.StateCounts = res.Counts
|
||||
stats.StateCounts.Add(stateTransition.WriteCounts())
|
||||
if al := block.AccessList(); al != nil {
|
||||
stats.StateCounts.AccountLoaded = al.UniqueAccountCount()
|
||||
stats.StateCounts.StorageLoaded = al.UniqueStorageSlotCount()
|
||||
}
|
||||
stats.StateCounts.CodeLoaded = res.CodeLoaded
|
||||
stats.StateCounts.CodeLoadBytes = res.CodeLoadBytes
|
||||
|
||||
// Time durations under parallel execution use wall-clock semantics.
|
||||
// Per-tx duration sums (CPU-time) are intentionally not plumbed: they
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
"slices"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/core/types/bal"
|
||||
|
|
@ -31,6 +32,11 @@ type ProcessResultWithMetrics struct {
|
|||
// Reads is the sum of per-StateDB read times across pre-tx, per-tx and
|
||||
// post-tx phases. Sum-of-CPU-time, not wall-clock.
|
||||
Reads state.ReadDurations
|
||||
// CodeLoaded is the deduplicated count of unique contract addresses whose
|
||||
// code body was fetched during the block (across all phase StateDBs).
|
||||
// CodeLoadBytes is the sum of those code lengths.
|
||||
CodeLoaded int
|
||||
CodeLoadBytes int
|
||||
}
|
||||
|
||||
// ParallelStateProcessor is used to execute and verify blocks containing
|
||||
|
|
@ -79,7 +85,7 @@ func validateStateAccesses(lastIdx int, accessList bal.AccessListReader, localAc
|
|||
// performs post-tx state transition (system contracts and withdrawals)
|
||||
// and calculates the ProcessResult, returning it to be sent on resCh
|
||||
// by resultHandler
|
||||
func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, accesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, results []txExecResult, aggCounts state.StateCounts, aggReads state.ReadDurations) *ProcessResultWithMetrics {
|
||||
func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, accesses bal.StateAccesses, statedb *state.StateDB, prefetchReader state.Reader, results []txExecResult, aggCounts state.StateCounts, aggReads state.ReadDurations, aggCodeLoads map[common.Address]int) *ProcessResultWithMetrics {
|
||||
tExec := time.Since(tExecStart)
|
||||
var requests [][]byte
|
||||
tPostprocessStart := time.Now()
|
||||
|
|
@ -180,9 +186,20 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar
|
|||
|
||||
// Fold post-tx statedb counts and reads into the aggregate. postTxState is
|
||||
// local and would otherwise be discarded; this captures system-contract
|
||||
// reads (withdrawal queue, consolidation queue) and engine.Finalize.
|
||||
// activity (withdrawal queue, consolidation queue) and engine.Finalize.
|
||||
aggCounts.Add(postTxState.SnapshotCounts())
|
||||
aggReads.Add(postTxState.SnapshotReads())
|
||||
for addr, l := range postTxState.SnapshotCodeLoads() {
|
||||
if _, ok := aggCodeLoads[addr]; !ok {
|
||||
aggCodeLoads[addr] = l
|
||||
}
|
||||
}
|
||||
|
||||
codeLoaded := len(aggCodeLoads)
|
||||
var codeLoadBytes int
|
||||
for _, l := range aggCodeLoads {
|
||||
codeLoadBytes += l
|
||||
}
|
||||
|
||||
return &ProcessResultWithMetrics{
|
||||
ProcessResult: &ProcessResult{
|
||||
|
|
@ -195,6 +212,8 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar
|
|||
ExecTime: tExec,
|
||||
Counts: aggCounts,
|
||||
Reads: aggReads,
|
||||
CodeLoaded: codeLoaded,
|
||||
CodeLoadBytes: codeLoadBytes,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -216,11 +235,14 @@ type txExecResult struct {
|
|||
// resultHandler.
|
||||
counts state.StateCounts
|
||||
reads state.ReadDurations
|
||||
// codeLoads is addr→codeLen for contracts whose code body was fetched
|
||||
// in this tx. Deduped across all phases in resultHandler.
|
||||
codeLoads map[common.Address]int
|
||||
}
|
||||
|
||||
// resultHandler polls until all transactions have finished executing and the
|
||||
// state root calculation is complete. The result is emitted on resCh.
|
||||
func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses bal.StateAccesses, preCounts state.StateCounts, preReads state.ReadDurations, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) {
|
||||
func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses bal.StateAccesses, preCounts state.StateCounts, preReads state.ReadDurations, preCodeLoads map[common.Address]int, statedb *state.StateDB, prefetchReader state.Reader, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) {
|
||||
// 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd
|
||||
// 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL)
|
||||
var results []txExecResult
|
||||
|
|
@ -233,6 +255,13 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses
|
|||
accesses := preTxAccesses
|
||||
aggCounts := preCounts
|
||||
aggReads := preReads
|
||||
// Dedup'd map of contract addresses whose code body was fetched by any
|
||||
// phase StateDB. Address-keyed so multiple phases adding the same contract
|
||||
// only count it once.
|
||||
aggCodeLoads := make(map[common.Address]int)
|
||||
for addr, l := range preCodeLoads {
|
||||
aggCodeLoads[addr] = l
|
||||
}
|
||||
|
||||
if len(block.Transactions()) > 0 {
|
||||
loop:
|
||||
|
|
@ -252,6 +281,11 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses
|
|||
accesses.Merge(res.stateReads)
|
||||
aggCounts.Add(res.counts)
|
||||
aggReads.Add(res.reads)
|
||||
for addr, l := range res.codeLoads {
|
||||
if _, ok := aggCodeLoads[addr]; !ok {
|
||||
aggCodeLoads[addr] = l
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if numTxComplete == len(block.Transactions()) {
|
||||
|
|
@ -268,7 +302,7 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxAccesses
|
|||
}
|
||||
}
|
||||
|
||||
execResults := p.prepareExecResult(block, tExecStart, accesses, statedb, prefetchReader, results, aggCounts, aggReads)
|
||||
execResults := p.prepareExecResult(block, tExecStart, accesses, statedb, prefetchReader, results, aggCounts, aggReads, aggCodeLoads)
|
||||
rootCalcRes := <-stateRootCalcResCh
|
||||
|
||||
if execResults.ProcessResult.Error != nil {
|
||||
|
|
@ -344,10 +378,11 @@ func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transactio
|
|||
stateReads: db.Reader().(state.StateReaderTracker).GetStateAccessList(),
|
||||
counts: db.SnapshotCounts(),
|
||||
reads: db.SnapshotReads(),
|
||||
codeLoads: db.SnapshotCodeLoads(),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, prefetchReader state.Reader, cfg vm.Config) (bal.StateAccesses, state.StateCounts, state.ReadDurations, error) {
|
||||
func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, prefetchReader state.Reader, cfg vm.Config) (bal.StateAccesses, state.StateCounts, state.ReadDurations, map[common.Address]int, error) {
|
||||
var (
|
||||
header = block.Header()
|
||||
)
|
||||
|
|
@ -369,12 +404,12 @@ func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *
|
|||
mutations.Merge(pbhMutations)
|
||||
reads := readerWithTracker.(state.StateReaderTracker).GetStateAccessList()
|
||||
if !accessList.MutationsAt(0).Eq(mutations) {
|
||||
return nil, state.StateCounts{}, state.ReadDurations{}, fmt.Errorf("invalid block access list: mismatch between local/remote access list mutations at idx 0")
|
||||
return nil, state.StateCounts{}, state.ReadDurations{}, nil, fmt.Errorf("invalid block access list: mismatch between local/remote access list mutations at idx 0")
|
||||
}
|
||||
// Snapshot the pre-tx statedb's counts and read-times so system-contract
|
||||
// reads/writes (BeaconRoot, ParentBlockHash) contribute to the aggregate;
|
||||
// Snapshot the pre-tx statedb's counts/reads/code-loads so system-contract
|
||||
// activity (BeaconRoot, ParentBlockHash) contributes to the aggregate;
|
||||
// sdb is local and would otherwise be discarded.
|
||||
return reads, sdb.SnapshotCounts(), sdb.SnapshotReads(), nil
|
||||
return reads, sdb.SnapshotCounts(), sdb.SnapshotReads(), sdb.SnapshotCodeLoads(), nil
|
||||
}
|
||||
|
||||
// Process performs EVM execution and state root computation for a block which is known
|
||||
|
|
@ -394,7 +429,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st
|
|||
)
|
||||
|
||||
startingState := statedb.Copy()
|
||||
preTxReads, preCounts, preReads, err := p.processBlockPreTx(block, statedb, balReader, cfg)
|
||||
preTxReads, preCounts, preReads, preCodeLoads, err := p.processBlockPreTx(block, statedb, balReader, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -404,7 +439,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *st
|
|||
|
||||
// execute transactions and state root calculation in parallel
|
||||
tExecStart = time.Now()
|
||||
go p.resultHandler(block, preTxReads, preCounts, preReads, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh)
|
||||
go p.resultHandler(block, preTxReads, preCounts, preReads, preCodeLoads, statedb, balReader, tExecStart, txResCh, rootCalcResultCh, resCh)
|
||||
var workers errgroup.Group
|
||||
workers.SetLimit(runtime.NumCPU())
|
||||
for i, t := range block.Transactions() {
|
||||
|
|
|
|||
|
|
@ -251,6 +251,25 @@ func (s *StateDB) SnapshotReads() ReadDurations {
|
|||
}
|
||||
}
|
||||
|
||||
// SnapshotCodeLoads returns the addresses whose contract code body was
|
||||
// fetched during this StateDB's lifetime, mapped to byte length. Used by the
|
||||
// BAL parallel pipeline to deduplicate code-load events across phase StateDBs.
|
||||
func (s *StateDB) SnapshotCodeLoads() map[common.Address]int {
|
||||
if len(s.stateObjects) == 0 {
|
||||
return nil
|
||||
}
|
||||
var m map[common.Address]int
|
||||
for addr, obj := range s.stateObjects {
|
||||
if l := len(obj.code); l > 0 {
|
||||
if m == nil {
|
||||
m = make(map[common.Address]int)
|
||||
}
|
||||
m[addr] = l
|
||||
}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
|
||||
// state trie concurrently while the state is mutated so that when we reach the
|
||||
// commit phase, most of the needed data is already hot.
|
||||
|
|
|
|||
Loading…
Reference in a new issue