core: clean up parallal state processor

This commit is contained in:
MariusVanDerWijden 2026-02-05 14:55:54 +01:00
parent 97d384f1bb
commit 1d942bddf3

View file

@ -46,7 +46,7 @@ func NewParallelStateProcessor(chain *HeaderChain, vmConfig *vm.Config) Parallel
// performs post-tx state transition (system contracts and withdrawals)
// and calculates the ProcessResult, returning it to be sent on resCh
// by resultHandler
func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, allStateReads *bal.StateAccesses, tExecStart time.Time, postTxState *state.StateDB, receipts types.Receipts) *ProcessResultWithMetrics {
func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, allStateReads *bal.StateAccesses, tExecStart time.Time, postTxState *state.StateDB, results []txExecResult) *ProcessResultWithMetrics {
tExec := time.Since(tExecStart)
var requests [][]byte
tPostprocessStart := time.Now()
@ -73,21 +73,32 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, allStateR
// 1. order the receipts by tx index
// 2. correctly calculate the cumulative gas used per receipt, returning bad block error if it goes over the allowed
slices.SortFunc(receipts, func(a, b *types.Receipt) int {
return cmp.Compare(a.TransactionIndex, b.TransactionIndex)
slices.SortFunc(results, func(a, b txExecResult) int {
return cmp.Compare(a.receipt.TransactionIndex, b.receipt.TransactionIndex)
})
var cumulativeGasUsed uint64
var (
// We are maintaining two counters here:
// one counts all the cumulativeGas which includes refunds
// while the other counts only the usedGas which excludes refunds after Amsterdam
// We need the cumulativeGas for receipts and the usedGas for the block gas limit
cumulativeGas = uint64(0)
usedGas = uint64(0)
)
var allLogs []*types.Log
for _, receipt := range receipts {
receipt.CumulativeGasUsed = cumulativeGasUsed + receipt.GasUsed
cumulativeGasUsed += receipt.GasUsed
if receipt.CumulativeGasUsed > header.GasLimit {
var allReceipts []*types.Receipt
for _, result := range results {
cumulativeGas += result.cumulativeGas
usedGas += result.usedGas
result.receipt.CumulativeGasUsed = cumulativeGas
if result.receipt.CumulativeGasUsed > header.GasLimit {
return &ProcessResultWithMetrics{
ProcessResult: &ProcessResult{Error: fmt.Errorf("gas limit exceeded")},
}
}
allLogs = append(allLogs, receipt.Logs...)
allLogs = append(allLogs, result.receipt.Logs...)
allReceipts = append(allReceipts, result.receipt)
}
// Read requests if Prague is enabled.
@ -145,10 +156,10 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, allStateR
return &ProcessResultWithMetrics{
ProcessResult: &ProcessResult{
Receipts: receipts,
Receipts: allReceipts,
Requests: requests,
Logs: allLogs,
GasUsed: cumulativeGasUsed,
GasUsed: usedGas,
},
PostProcessTime: tPostprocess,
ExecTime: tExec,
@ -156,9 +167,11 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, allStateR
}
type txExecResult struct {
idx int // transaction index
receipt *types.Receipt
err error // non-EVM error which would render the block invalid
idx int // transaction index
receipt *types.Receipt
err error // non-EVM error which would render the block invalid
cumulativeGas uint64
usedGas uint64
stateReads bal.StateAccesses
}
@ -168,7 +181,7 @@ type txExecResult struct {
func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxStateReads bal.StateAccesses, postTxState *state.StateDB, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) {
// 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd
// 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL)
var receipts []*types.Receipt
var results []txExecResult
gp := new(GasPool)
gp.SetGas(block.GasLimit())
var execErr error
@ -188,7 +201,7 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxStateRea
if err := gp.SubGas(res.receipt.GasUsed); err != nil {
execErr = err
} else {
receipts = append(receipts, res.receipt)
results = append(results, res)
allReads.Merge(res.stateReads)
}
}
@ -206,7 +219,7 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxStateRea
}
}
execResults := p.prepareExecResult(block, &allReads, tExecStart, postTxState, receipts)
execResults := p.prepareExecResult(block, &allReads, tExecStart, postTxState, results)
rootCalcRes := <-stateRootCalcResCh
if execResults.ProcessResult.Error != nil {
@ -271,7 +284,7 @@ func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transactio
gp := new(GasPool)
gp.SetGas(block.GasLimit())
db.SetTxContext(tx.Hash(), txIdx)
receipt, _, err := ApplyTransactionWithEVM(msg, gp, db, block.Number(), block.Hash(), context.Time, tx, 0, evm)
receipt, cumulativeGas, err := ApplyTransactionWithEVM(msg, gp, db, block.Number(), block.Hash(), context.Time, tx, 0, evm)
if err != nil {
err := fmt.Errorf("could not apply tx %d [%v]: %w", txIdx, tx.Hash().Hex(), err)
return &txExecResult{err: err}
@ -283,9 +296,11 @@ func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transactio
}
return &txExecResult{
idx: txIdx,
receipt: receipt,
stateReads: accesses,
idx: txIdx,
receipt: receipt,
stateReads: accesses,
cumulativeGas: cumulativeGas,
usedGas: receipt.GasUsed,
}
}