diff --git a/core/blockchain.go b/core/blockchain.go index a81d435fea..d8cdd237b7 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -439,7 +439,7 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine, bc.validator = NewBlockValidator(chainConfig, bc) bc.prefetcher = newStatePrefetcher(chainConfig, bc.hc) bc.processor = NewStateProcessor(bc.hc) - bc.parallelProcessor = NewParallelStateProcessor(bc.hc, bc.GetVMConfig()) + bc.parallelProcessor = *NewParallelStateProcessor(bc.hc, bc.GetVMConfig()) genesisHeader := bc.GetHeaderByNumber(0) if genesisHeader == nil { diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index be2af36dbf..4c4d09b747 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -24,6 +24,11 @@ type ProcessResultWithMetrics struct { PostProcessTime time.Duration } +// errResult wraps an error into a new ProcessResultWithMetrics instance +func errResult(err error) *ProcessResultWithMetrics { + return &ProcessResultWithMetrics{ProcessResult: &ProcessResult{Error: err}} +} + // ParallelStateProcessor is used to execute and verify blocks containing // access lists. type ParallelStateProcessor struct { @@ -32,11 +37,23 @@ type ParallelStateProcessor struct { } // NewParallelStateProcessor returns a new ParallelStateProcessor instance. -func NewParallelStateProcessor(chain *HeaderChain, vmConfig *vm.Config) ParallelStateProcessor { - res := NewStateProcessor(chain) - return ParallelStateProcessor{ - res, - vmConfig, +func NewParallelStateProcessor(chain *HeaderChain, vmConfig *vm.Config) *ParallelStateProcessor { + return &ParallelStateProcessor{ + StateProcessor: NewStateProcessor(chain), + vmCfg: vmConfig, + } +} + +// execVMConfig returns the subset of the configured VM options that is safe to +// reuse across the parallel per-transaction and post-transaction executions. +// Only the fields explicitly copied here are propagated (mirroring the original +// per-tx behaviour); notably the full caller-supplied config is used only for +// pre-execution in processBlockPreTx. +func (p *ParallelStateProcessor) execVMConfig() vm.Config { + return vm.Config{ + NoBaseFee: p.vmCfg.NoBaseFee, + EnablePreimageRecording: p.vmCfg.EnablePreimageRecording, + ExtraEips: slices.Clone(p.vmCfg.ExtraEips), } } @@ -44,21 +61,17 @@ func NewParallelStateProcessor(chain *HeaderChain, vmConfig *vm.Config) Parallel // performs post-tx state transition (system contracts and withdrawals) // and calculates the ProcessResult, returning it to be sent on resCh // by resultHandler -func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, preTxBal *bal.ConstructionBlockAccessList, prepared *bal.AccessListReader, statedb *state.StateDB, results []txExecResult) *ProcessResultWithMetrics { +func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, preTxBAL *bal.ConstructionBlockAccessList, accessList *bal.AccessListReader, statedb *state.StateDB, results []txExecResult) *ProcessResultWithMetrics { tExec := time.Since(tExecStart) tPostprocessStart := time.Now() header := block.Header() - vmContext := NewEVMBlockContext(header, p.chain, nil) + // The post-execution changes are recorded at the BAL index immediately + // following the last transaction. lastBALIdx := len(block.Transactions()) + 1 - postTxState := statedb.WithReader(state.NewReaderWithPreparedAccessList(statedb.Reader(), prepared, lastBALIdx)) + postTxState := statedb.WithReader(state.NewReaderWithAccessList(statedb.Reader(), accessList, lastBALIdx)) - cfg := vm.Config{ - NoBaseFee: p.vmCfg.NoBaseFee, - EnablePreimageRecording: p.vmCfg.EnablePreimageRecording, - ExtraEips: slices.Clone(p.vmCfg.ExtraEips), - } - evm := vm.NewEVM(vmContext, postTxState, p.chainConfig(), cfg) + evm := vm.NewEVM(NewEVMBlockContext(header, p.chain, nil), postTxState, p.chainConfig(), p.execVMConfig()) // 1. order the receipts by tx index // 2. correctly calculate the cumulative gas used per receipt, returning bad block error if it goes over the allowed @@ -71,10 +84,10 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar sumRegular uint64 sumState uint64 cumulativeReceipt uint64 // cumulative receipt gas (what users pay) - ) - var allLogs []*types.Log - var allReceipts []*types.Receipt + allLogs []*types.Log + allReceipts []*types.Receipt + ) for _, result := range results { sumRegular += result.txRegular sumState += result.txState @@ -87,24 +100,19 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar // Block gas = max(sum_regular, sum_state) per EIP-8037. blockGasUsed := max(sumRegular, sumState) if blockGasUsed > header.GasLimit { - return &ProcessResultWithMetrics{ - ProcessResult: &ProcessResult{Error: fmt.Errorf("gas limit exceeded")}, - } + return errResult(fmt.Errorf("gas limit exceeded")) } - requests, postBal, err := PostExecution(context.Background(), p.chainConfig(), block.Number(), block.Time(), allLogs, evm, uint32(len(block.Transactions())+1)) + requests, postBAL, err := PostExecution(context.Background(), p.chainConfig(), block.Number(), block.Time(), allLogs, evm, uint32(lastBALIdx)) if err != nil { - return &ProcessResultWithMetrics{ - ProcessResult: &ProcessResult{Error: err}, - } + return errResult(err) } - p.chain.Engine().Finalize(p.chain, block.Header(), evm.StateDB, block.Body(), uint32(len(block.Transactions()))+1, postBal) + p.chain.Engine().Finalize(p.chain, block.Header(), evm.StateDB, block.Body(), uint32(lastBALIdx), postBAL) blockAccessList := bal.NewConstructionBlockAccessList() - blockAccessList.Merge(preTxBal) - blockAccessList.Merge(postBal) - + blockAccessList.Merge(preTxBAL) + blockAccessList.Merge(postBAL) for _, res := range results { blockAccessList.Merge(res.blockAccessList) } @@ -112,9 +120,7 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar // TODO: do we move validation to ValidateState? if block.AccessList().Hash() != blockAccessList.ToEncodingObj().Hash() { // TODO: expose json string method on encoding block access list and log it here - return &ProcessResultWithMetrics{ - ProcessResult: &ProcessResult{Error: fmt.Errorf("invalid block access list: mismatch between local and remote block access list")}, - } + return errResult(fmt.Errorf("invalid block access list: mismatch between local and remote block access list")) } tPostprocess := time.Since(tPostprocessStart) @@ -133,11 +139,9 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar } type txExecResult struct { - idx int // transaction index - receipt *types.Receipt - err error // non-EVM error which would render the block invalid - blockGas uint64 - execGas uint64 + receipt *types.Receipt + err error // non-EVM error which would render the block invalid + execGas uint64 // gas reported on the receipt (what the user pays) // Per-tx dimensional gas for Amsterdam 2D gas accounting (EIP-8037). txRegular uint64 @@ -151,38 +155,40 @@ type txExecResult struct { func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxBAL *bal.ConstructionBlockAccessList, prepared *bal.AccessListReader, statedb *state.StateDB, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) { // 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd // 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL) - var results []txExecResult - var cumulativeStateGas, cumulativeRegularGas uint64 - var execErr error - var numTxComplete int + var ( + results []txExecResult + cumulativeStateGas uint64 + cumulativeRegularGas uint64 + execErr error + ) - if len(block.Transactions()) > 0 { - loop: - for { - select { - case res := <-txResCh: - numTxComplete++ - if execErr == nil { - // short-circuit if invalid block was detected - if res.err != nil { - execErr = res.err - } else if bottleneck := max(cumulativeRegularGas+res.txRegular, cumulativeStateGas+res.txState); bottleneck > block.GasLimit() { - execErr = fmt.Errorf("block used too much gas in bottleneck dimension: %d. block gas limit is %d", bottleneck, block.GasLimit()) - } else { - cumulativeStateGas += res.txState - results = append(results, res) - } - } - if numTxComplete == len(block.Transactions()) { - break loop + if numTx := len(block.Transactions()); numTx > 0 { + for completed := 0; completed < numTx; completed++ { + res := <-txResCh + if execErr != nil { + // A block-invalidating result was already seen; keep draining so + // the worker goroutines don't block on their sends. + continue + } + switch { + case res.err != nil: + execErr = res.err + default: + bottleneck := max(cumulativeRegularGas+res.txRegular, cumulativeStateGas+res.txState) + if bottleneck > block.GasLimit() { + execErr = fmt.Errorf("block used too much gas in bottleneck dimension: %d. block gas limit is %d", bottleneck, block.GasLimit()) + continue } + cumulativeRegularGas += res.txRegular + cumulativeStateGas += res.txState + results = append(results, res) } } if execErr != nil { - // Drain stateRootCalcResCh so calcAndVerifyRoot goroutine can exit. + // Drain stateRootCalcResCh so the calcAndVerifyRoot goroutine can exit. <-stateRootCalcResCh - resCh <- &ProcessResultWithMetrics{ProcessResult: &ProcessResult{Error: execErr}} + resCh <- errResult(execErr) return } } @@ -190,11 +196,12 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxBAL *bal execResults := p.prepareExecResult(block, tExecStart, preTxBAL, prepared, statedb, results) rootCalcRes := <-stateRootCalcResCh - if execResults.ProcessResult.Error != nil { + switch { + case execResults.ProcessResult.Error != nil: resCh <- execResults - } else if rootCalcRes.err != nil { - resCh <- &ProcessResultWithMetrics{ProcessResult: &ProcessResult{Error: rootCalcRes.err}} - } else { + case rootCalcRes.err != nil: + resCh <- errResult(rootCalcRes.err) + default: execResults.StateTransitionMetrics = rootCalcRes.metrics resCh <- execResults } @@ -213,107 +220,88 @@ func (p *ParallelStateProcessor) calcAndVerifyRoot(block *types.Block, stateTran res := stateRootCalculationResult{ metrics: stateTransition.Metrics(), } - if root != block.Root() { res.err = fmt.Errorf("state root mismatch. local: %x. remote: %x", root, block.Root()) } resCh <- res } -// execTx executes single transaction returning a result which includes state accessed/modified +// execTx executes a single transaction returning a result which includes state accessed/modified. func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transaction, balIdx int, db *state.StateDB, signer types.Signer) *txExecResult { header := block.Header() - context := NewEVMBlockContext(header, p.chain, nil) - - cfg := vm.Config{ - NoBaseFee: p.vmCfg.NoBaseFee, - EnablePreimageRecording: p.vmCfg.EnablePreimageRecording, - ExtraEips: slices.Clone(p.vmCfg.ExtraEips), - } - evm := vm.NewEVM(context, db, p.chainConfig(), cfg) + evmContext := NewEVMBlockContext(header, p.chain, nil) + evm := vm.NewEVM(evmContext, db, p.chainConfig(), p.execVMConfig()) msg, err := TransactionToMessage(tx, signer, header.BaseFee) if err != nil { - err = fmt.Errorf("could not apply tx %d [%v]: %w", balIdx, tx.Hash().Hex(), err) - return &txExecResult{err: err} + return &txExecResult{err: fmt.Errorf("could not apply tx %d [%v]: %w", balIdx, tx.Hash().Hex(), err)} } - gp := NewGasPool(block.GasLimit()) sender, err := signer.Sender(tx) if err != nil { - // TODO: can this even happen at this stage? - err = fmt.Errorf("could not recover sender for tx at bal idx %d: %v\n", balIdx, err) + return &txExecResult{err: fmt.Errorf("could not recover sender for tx at bal idx %d: %w", balIdx, err)} } + + gp := NewGasPool(block.GasLimit()) // TODO: make precompiled addresses be resolvable from chain config + block db.Prepare(evm.GetRules(), sender, block.Coinbase(), tx.To(), vm.PrecompiledAddressesCancun, tx.AccessList()) - db.SetTxContext(tx.Hash(), balIdx-1, uint32(balIdx)) - receipt, txBAL, err := ApplyTransactionWithEVM(msg, gp, db, block.Number(), block.Hash(), context.Time, tx, evm) + receipt, txBAL, err := ApplyTransactionWithEVM(msg, gp, db, block.Number(), block.Hash(), evmContext.Time, tx, evm) if err != nil { - err := fmt.Errorf("could not apply tx %d [%v]: %w", balIdx, tx.Hash().Hex(), err) - return &txExecResult{err: err} + return &txExecResult{err: fmt.Errorf("could not apply tx %d [%v]: %w", balIdx, tx.Hash().Hex(), err)} } return &txExecResult{ - idx: balIdx, receipt: receipt, execGas: receipt.GasUsed, - blockGas: gp.Used(), txRegular: gp.cumulativeRegular, txState: gp.cumulativeState, blockAccessList: txBAL, } } -func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*bal.ConstructionBlockAccessList, error) { - var ( - header = block.Header() - ) - vmContext := NewEVMBlockContext(header, p.chain, nil) - evm := vm.NewEVM(vmContext, statedb, p.chainConfig(), cfg) - - accessList := PreExecution(context.Background(), block.BeaconRoot(), block.ParentHash(), p.chainConfig(), evm, block.Number(), block.Time()) - return accessList, nil +func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, cfg vm.Config) *bal.ConstructionBlockAccessList { + header := block.Header() + evm := vm.NewEVM(NewEVMBlockContext(header, p.chain, nil), statedb, p.chainConfig(), cfg) + return PreExecution(context.Background(), block.BeaconRoot(), block.ParentHash(), p.chainConfig(), evm, block.Number(), block.Time()) } // Process performs EVM execution and state root computation for a block which is known // to contain an access list. func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *state.BALStateTransition, statedb *state.StateDB, cfg vm.Config) (*ProcessResultWithMetrics, error) { + header := block.Header() + signer := types.MakeSigner(p.chainConfig(), header.Number, header.Time) + var ( - header = block.Header() resCh = make(chan *ProcessResultWithMetrics) - signer = types.MakeSigner(p.chainConfig(), header.Number, header.Time) rootCalcResultCh = make(chan stateRootCalculationResult) txResCh = make(chan txExecResult) - - pStart = time.Now() - tExecStart time.Time - tPreprocess time.Duration // time to create a set of prestates for parallel transaction execution ) + // Pre-transaction processing: system-contract updates and the pre-tx BAL. + pStart := time.Now() startingState := statedb.Copy() prepared := stateTransition.PreparedAccessList() - preTxBal, err := p.processBlockPreTx(block, statedb, cfg) - if err != nil { - return nil, err - } + preTxBAL := p.processBlockPreTx(block, statedb, cfg) + tPreprocess := time.Since(pStart) - // compute the reads/mutations at the last bal index - tPreprocess = time.Since(pStart) + // Execute transactions and the state-root calculation in parallel. + tExecStart := time.Now() + go p.resultHandler(block, preTxBAL, prepared, statedb, tExecStart, txResCh, rootCalcResultCh, resCh) - // execute transactions and state root calculation in parallel - tExecStart = time.Now() - go p.resultHandler(block, preTxBal, prepared, statedb, tExecStart, txResCh, rootCalcResultCh, resCh) + // Workers execute transactions concurrently against per-tx state copies. + // Each worker reports completion (and any block-invalidating error) on + // txResCh, which resultHandler drains. Worker errors therefore flow through + // the channel rather than the errgroup, so the group is used purely to bound + // concurrency and Wait() is intentionally not called. var workers errgroup.Group workers.SetLimit(runtime.NumCPU()) - for i, t := range block.Transactions() { - tx := t - idx := i - sdb := startingState.Copy() + for i, tx := range block.Transactions() { + balIdx := i + 1 + prestate := startingState.Copy() workers.Go(func() error { - startingState := sdb.WithReader(state.NewReaderWithPreparedAccessList(statedb.Reader(), prepared, idx+1)) - res := p.execTx(block, tx, idx+1, startingState, signer) - txResCh <- *res + prestate = prestate.WithReader(state.NewReaderWithAccessList(statedb.Reader(), prepared, balIdx)) + txResCh <- *p.execTx(block, tx, balIdx, prestate, signer) return nil }) } diff --git a/core/state/reader_eip_7928.go b/core/state/reader_eip_7928.go index b89636870d..72d9672b19 100644 --- a/core/state/reader_eip_7928.go +++ b/core/state/reader_eip_7928.go @@ -220,10 +220,10 @@ type ReaderWithBlockLevelAccessList struct { TxIndex int } -// NewReaderWithPreparedAccessList wraps a base reader with a shared, already +// NewReaderWithAccessList wraps a base reader with a shared, already // preprocessed access list. This is the cheap constructor used on the hot path: // the prepared list is built once per block and borrowed by every per-tx reader. -func NewReaderWithPreparedAccessList(base Reader, prepared *bal.AccessListReader, txIndex int) *ReaderWithBlockLevelAccessList { +func NewReaderWithAccessList(base Reader, prepared *bal.AccessListReader, txIndex int) *ReaderWithBlockLevelAccessList { return &ReaderWithBlockLevelAccessList{ Reader: base, prepared: prepared, @@ -232,10 +232,10 @@ func NewReaderWithPreparedAccessList(base Reader, prepared *bal.AccessListReader } // NewReaderWithBlockLevelAccessList wraps a base reader with a raw access list, -// preprocessing it on the spot. Prefer NewReaderWithPreparedAccessList when the +// preprocessing it on the spot. Prefer NewReaderWithAccessList when the // prepared list can be built once and shared across multiple readers. func NewReaderWithBlockLevelAccessList(base Reader, accessList bal.BlockAccessList, txIndex int) *ReaderWithBlockLevelAccessList { - return NewReaderWithPreparedAccessList(base, bal.NewAccessListReader(accessList), txIndex) + return NewReaderWithAccessList(base, bal.NewAccessListReader(accessList), txIndex) } // Account implements Reader, returning the account with the specific address.