package core import ( "cmp" "fmt" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types/bal" "github.com/ethereum/go-ethereum/core/vm" "golang.org/x/sync/errgroup" "slices" "time" ) // ProcessResultWithMetrics wraps ProcessResult with some metrics that are // emitted when executing blocks containing access lists. type ProcessResultWithMetrics struct { ProcessResult *ProcessResult PreProcessTime time.Duration StateTransitionMetrics *state.BALStateTransitionMetrics // the time it took to execute all txs in the block ExecTime time.Duration PostProcessTime time.Duration } // ParallelStateProcessor is used to execute and verify blocks containing // access lists. type ParallelStateProcessor struct { *StateProcessor vmCfg *vm.Config } // NewParallelStateProcessor returns a new ParallelStateProcessor instance. func NewParallelStateProcessor(chain *HeaderChain, vmConfig *vm.Config) ParallelStateProcessor { res := NewStateProcessor(chain) return ParallelStateProcessor{ res, vmConfig, } } // called by resultHandler when all transactions have successfully executed. // performs post-tx state transition (system contracts and withdrawals) // and calculates the ProcessResult, returning it to be sent on resCh // by resultHandler func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, allStateReads *bal.StateAccesses, tExecStart time.Time, postTxState *state.StateDB, receipts types.Receipts) *ProcessResultWithMetrics { tExec := time.Since(tExecStart) var requests [][]byte tPostprocessStart := time.Now() header := block.Header() balTracer, hooks := NewBlockAccessListTracer() tracingStateDB := state.NewHookedState(postTxState, hooks) context := NewEVMBlockContext(header, p.chain, nil) postTxState.SetAccessListIndex(len(block.Transactions()) + 1) cfg := vm.Config{ Tracer: hooks, NoBaseFee: p.vmCfg.NoBaseFee, EnablePreimageRecording: p.vmCfg.EnablePreimageRecording, ExtraEips: slices.Clone(p.vmCfg.ExtraEips), StatelessSelfValidation: p.vmCfg.StatelessSelfValidation, EnableWitnessStats: p.vmCfg.EnableWitnessStats, } cfg.Tracer = hooks evm := vm.NewEVM(context, tracingStateDB, p.chainConfig(), cfg) // 1. order the receipts by tx index // 2. correctly calculate the cumulative gas used per receipt, returning bad block error if it goes over the allowed slices.SortFunc(receipts, func(a, b *types.Receipt) int { return cmp.Compare(a.TransactionIndex, b.TransactionIndex) }) var cumulativeGasUsed uint64 var allLogs []*types.Log for _, receipt := range receipts { receipt.CumulativeGasUsed = cumulativeGasUsed + receipt.GasUsed cumulativeGasUsed += receipt.GasUsed if receipt.CumulativeGasUsed > header.GasLimit { return &ProcessResultWithMetrics{ ProcessResult: &ProcessResult{Error: fmt.Errorf("gas limit exceeded")}, } } allLogs = append(allLogs, receipt.Logs...) } // Read requests if Prague is enabled. if p.chainConfig().IsPrague(block.Number(), block.Time()) { requests = [][]byte{} // EIP-6110 if err := ParseDepositLogs(&requests, allLogs, p.chainConfig()); err != nil { return &ProcessResultWithMetrics{ ProcessResult: &ProcessResult{Error: err}, } } // EIP-7002 err := ProcessWithdrawalQueue(&requests, evm) if err != nil { return &ProcessResultWithMetrics{ ProcessResult: &ProcessResult{Error: err}, } } // EIP-7251 err = ProcessConsolidationQueue(&requests, evm) if err != nil { return &ProcessResultWithMetrics{ ProcessResult: &ProcessResult{Error: err}, } } } // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) p.chain.Engine().Finalize(p.chain, header, tracingStateDB, block.Body()) // invoke FinaliseIdxChanges so that withdrawals are accounted for in the state diff postTxState.Finalise(true) balTracer.OnBlockFinalization() diff, stateReads := balTracer.builder.FinalizedIdxChanges() allStateReads.Merge(stateReads) balIdx := len(block.Transactions()) + 1 if err := postTxState.BlockAccessList().ValidateStateDiff(balIdx, diff); err != nil { return &ProcessResultWithMetrics{ ProcessResult: &ProcessResult{Error: err}, } } if err := postTxState.BlockAccessList().ValidateStateReads(*allStateReads); err != nil { return &ProcessResultWithMetrics{ ProcessResult: &ProcessResult{Error: err}, } } tPostprocess := time.Since(tPostprocessStart) return &ProcessResultWithMetrics{ ProcessResult: &ProcessResult{ Receipts: receipts, Requests: requests, Logs: allLogs, GasUsed: cumulativeGasUsed, }, PostProcessTime: tPostprocess, ExecTime: tExec, } } type txExecResult struct { idx int // transaction index receipt *types.Receipt err error // non-EVM error which would render the block invalid stateReads bal.StateAccesses } // resultHandler polls until all transactions have finished executing and the // state root calculation is complete. The result is emitted on resCh. func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxStateReads bal.StateAccesses, postTxState *state.StateDB, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) { // 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd // 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL) var receipts []*types.Receipt gp := new(GasPool) gp.SetGas(block.GasLimit()) var execErr error var numTxComplete int allReads := make(bal.StateAccesses) allReads.Merge(preTxStateReads) if len(block.Transactions()) > 0 { loop: for { select { case res := <-txResCh: if execErr == nil { if res.err != nil { execErr = res.err } else { if err := gp.SubGas(res.receipt.GasUsed); err != nil { execErr = err } else { receipts = append(receipts, res.receipt) allReads.Merge(res.stateReads) } } } numTxComplete++ if numTxComplete == len(block.Transactions()) { break loop } } } if execErr != nil { resCh <- &ProcessResultWithMetrics{ProcessResult: &ProcessResult{Error: execErr}} return } } execResults := p.prepareExecResult(block, &allReads, tExecStart, postTxState, receipts) rootCalcRes := <-stateRootCalcResCh if execResults.ProcessResult.Error != nil { resCh <- execResults } else if rootCalcRes.err != nil { resCh <- &ProcessResultWithMetrics{ProcessResult: &ProcessResult{Error: rootCalcRes.err}} } else { // &{20.39677ms 0s 1.149668ms 735.295µs 0s 0s 0s 0s} execResults.StateTransitionMetrics = rootCalcRes.metrics resCh <- execResults } } type stateRootCalculationResult struct { err error metrics *state.BALStateTransitionMetrics root common.Hash } // calcAndVerifyRoot performs the post-state root hash calculation, verifying // it against what is reported by the block and returning a result on resCh. func (p *ParallelStateProcessor) calcAndVerifyRoot(preState *state.StateDB, block *types.Block, stateTransition *state.BALStateTransition, resCh chan stateRootCalculationResult) { // calculate and apply the block state modifications //root, prestateLoadTime, rootCalcTime := preState.BlockAccessList().StateRoot(preState) root := stateTransition.IntermediateRoot(false) res := stateRootCalculationResult{ // TODO: I think we can remove the root from this struct metrics: stateTransition.Metrics(), } // TODO: validate state root in block validator? if root != block.Root() { res.err = fmt.Errorf("state root mismatch. local: %x. remote: %x", root, block.Root()) } resCh <- res } // execTx executes single transaction returning a result which includes state accessed/modified func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transaction, txIdx int, db *state.StateDB, signer types.Signer) *txExecResult { header := block.Header() balTracer, hooks := NewBlockAccessListTracer() tracingStateDB := state.NewHookedState(db, hooks) context := NewEVMBlockContext(header, p.chain, nil) cfg := vm.Config{ Tracer: hooks, NoBaseFee: p.vmCfg.NoBaseFee, EnablePreimageRecording: p.vmCfg.EnablePreimageRecording, ExtraEips: slices.Clone(p.vmCfg.ExtraEips), StatelessSelfValidation: p.vmCfg.StatelessSelfValidation, EnableWitnessStats: p.vmCfg.EnableWitnessStats, } cfg.Tracer = hooks evm := vm.NewEVM(context, tracingStateDB, p.chainConfig(), cfg) msg, err := TransactionToMessage(tx, signer, header.BaseFee) if err != nil { err = fmt.Errorf("could not apply tx %d [%v]: %w", txIdx, tx.Hash().Hex(), err) return &txExecResult{err: err} } gp := new(GasPool) gp.SetGas(block.GasLimit()) db.SetTxContext(tx.Hash(), txIdx) var gasUsed uint64 receipt, err := ApplyTransactionWithEVM(msg, gp, db, block.Number(), block.Hash(), context.Time, tx, &gasUsed, evm) if err != nil { err := fmt.Errorf("could not apply tx %d [%v]: %w", txIdx, tx.Hash().Hex(), err) return &txExecResult{err: err} } diff, accesses := balTracer.builder.FinalizedIdxChanges() if err := db.BlockAccessList().ValidateStateDiff(txIdx+1, diff); err != nil { return &txExecResult{err: err} } return &txExecResult{ idx: txIdx, receipt: receipt, stateReads: accesses, } } // Process performs EVM execution and state root computation for a block which is known // to contain an access list. func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *state.BALStateTransition, statedb *state.StateDB, cfg vm.Config) (*ProcessResultWithMetrics, error) { var ( header = block.Header() resCh = make(chan *ProcessResultWithMetrics) signer = types.MakeSigner(p.chainConfig(), header.Number, header.Time) rootCalcResultCh = make(chan stateRootCalculationResult) context vm.BlockContext txResCh = make(chan txExecResult) pStart = time.Now() tExecStart time.Time tPreprocess time.Duration // time to create a set of prestates for parallel transaction execution ) balTracer, hooks := NewBlockAccessListTracer() tracingStateDB := state.NewHookedState(statedb, hooks) // TODO: figure out exactly why we need to set the hooks on the TracingStateDB and the vm.Config cfg.Tracer = hooks context = NewEVMBlockContext(header, p.chain, nil) evm := vm.NewEVM(context, tracingStateDB, p.chainConfig(), cfg) if beaconRoot := block.BeaconRoot(); beaconRoot != nil { ProcessBeaconBlockRoot(*beaconRoot, evm) } if p.chainConfig().IsPrague(block.Number(), block.Time()) || p.chainConfig().IsVerkle(block.Number(), block.Time()) { ProcessParentBlockHash(block.ParentHash(), evm) } balTracer.OnPreTxExecutionDone() diff, stateReads := balTracer.builder.FinalizedIdxChanges() if err := statedb.BlockAccessList().ValidateStateDiff(0, diff); err != nil { return nil, err } // compute the post-tx state prestate (before applying final block system calls and eip-4895 withdrawals) // the post-tx state transition is verified by resultHandler postTxState := statedb.Copy() tPreprocess = time.Since(pStart) // execute transactions and state root calculation in parallel tExecStart = time.Now() go p.resultHandler(block, stateReads, postTxState, tExecStart, txResCh, rootCalcResultCh, resCh) var workers errgroup.Group startingState := statedb.Copy() for i, tx := range block.Transactions() { tx := tx i := i workers.Go(func() error { res := p.execTx(block, tx, i, startingState.Copy(), signer) txResCh <- *res return nil }) } go p.calcAndVerifyRoot(statedb, block, stateTransition, rootCalcResultCh) res := <-resCh if res.ProcessResult.Error != nil { return nil, res.ProcessResult.Error } // TODO: remove preprocess metric ? res.PreProcessTime = tPreprocess return res, nil }