From 1d942bddf319fe10cb675025b45a4f8a3fb4a1a2 Mon Sep 17 00:00:00 2001 From: MariusVanDerWijden Date: Thu, 5 Feb 2026 14:55:54 +0100 Subject: [PATCH] core: clean up parallal state processor --- core/parallel_state_processor.go | 57 ++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index a1c07bfd20..15e81f09e4 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -46,7 +46,7 @@ func NewParallelStateProcessor(chain *HeaderChain, vmConfig *vm.Config) Parallel // performs post-tx state transition (system contracts and withdrawals) // and calculates the ProcessResult, returning it to be sent on resCh // by resultHandler -func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, allStateReads *bal.StateAccesses, tExecStart time.Time, postTxState *state.StateDB, receipts types.Receipts) *ProcessResultWithMetrics { +func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, allStateReads *bal.StateAccesses, tExecStart time.Time, postTxState *state.StateDB, results []txExecResult) *ProcessResultWithMetrics { tExec := time.Since(tExecStart) var requests [][]byte tPostprocessStart := time.Now() @@ -73,21 +73,32 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, allStateR // 1. order the receipts by tx index // 2. correctly calculate the cumulative gas used per receipt, returning bad block error if it goes over the allowed - slices.SortFunc(receipts, func(a, b *types.Receipt) int { - return cmp.Compare(a.TransactionIndex, b.TransactionIndex) + slices.SortFunc(results, func(a, b txExecResult) int { + return cmp.Compare(a.receipt.TransactionIndex, b.receipt.TransactionIndex) }) - var cumulativeGasUsed uint64 + var ( + // We are maintaining two counters here: + // one counts all the cumulativeGas which includes refunds + // while the other counts only the usedGas which excludes refunds after Amsterdam + // We need the cumulativeGas for receipts and the usedGas for the block gas limit + cumulativeGas = uint64(0) + usedGas = uint64(0) + ) + var allLogs []*types.Log - for _, receipt := range receipts { - receipt.CumulativeGasUsed = cumulativeGasUsed + receipt.GasUsed - cumulativeGasUsed += receipt.GasUsed - if receipt.CumulativeGasUsed > header.GasLimit { + var allReceipts []*types.Receipt + for _, result := range results { + cumulativeGas += result.cumulativeGas + usedGas += result.usedGas + result.receipt.CumulativeGasUsed = cumulativeGas + if result.receipt.CumulativeGasUsed > header.GasLimit { return &ProcessResultWithMetrics{ ProcessResult: &ProcessResult{Error: fmt.Errorf("gas limit exceeded")}, } } - allLogs = append(allLogs, receipt.Logs...) + allLogs = append(allLogs, result.receipt.Logs...) + allReceipts = append(allReceipts, result.receipt) } // Read requests if Prague is enabled. @@ -145,10 +156,10 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, allStateR return &ProcessResultWithMetrics{ ProcessResult: &ProcessResult{ - Receipts: receipts, + Receipts: allReceipts, Requests: requests, Logs: allLogs, - GasUsed: cumulativeGasUsed, + GasUsed: usedGas, }, PostProcessTime: tPostprocess, ExecTime: tExec, @@ -156,9 +167,11 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, allStateR } type txExecResult struct { - idx int // transaction index - receipt *types.Receipt - err error // non-EVM error which would render the block invalid + idx int // transaction index + receipt *types.Receipt + err error // non-EVM error which would render the block invalid + cumulativeGas uint64 + usedGas uint64 stateReads bal.StateAccesses } @@ -168,7 +181,7 @@ type txExecResult struct { func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxStateReads bal.StateAccesses, postTxState *state.StateDB, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) { // 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd // 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL) - var receipts []*types.Receipt + var results []txExecResult gp := new(GasPool) gp.SetGas(block.GasLimit()) var execErr error @@ -188,7 +201,7 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxStateRea if err := gp.SubGas(res.receipt.GasUsed); err != nil { execErr = err } else { - receipts = append(receipts, res.receipt) + results = append(results, res) allReads.Merge(res.stateReads) } } @@ -206,7 +219,7 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxStateRea } } - execResults := p.prepareExecResult(block, &allReads, tExecStart, postTxState, receipts) + execResults := p.prepareExecResult(block, &allReads, tExecStart, postTxState, results) rootCalcRes := <-stateRootCalcResCh if execResults.ProcessResult.Error != nil { @@ -271,7 +284,7 @@ func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transactio gp := new(GasPool) gp.SetGas(block.GasLimit()) db.SetTxContext(tx.Hash(), txIdx) - receipt, _, err := ApplyTransactionWithEVM(msg, gp, db, block.Number(), block.Hash(), context.Time, tx, 0, evm) + receipt, cumulativeGas, err := ApplyTransactionWithEVM(msg, gp, db, block.Number(), block.Hash(), context.Time, tx, 0, evm) if err != nil { err := fmt.Errorf("could not apply tx %d [%v]: %w", txIdx, tx.Hash().Hex(), err) return &txExecResult{err: err} @@ -283,9 +296,11 @@ func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transactio } return &txExecResult{ - idx: txIdx, - receipt: receipt, - stateReads: accesses, + idx: txIdx, + receipt: receipt, + stateReads: accesses, + cumulativeGas: cumulativeGas, + usedGas: receipt.GasUsed, } }