mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 21:31:37 +00:00
core: clean up parallel state processor (#35143)
This commit is contained in:
parent
2bf974b6e6
commit
f773cfed30
3 changed files with 110 additions and 122 deletions
|
|
@ -433,7 +433,7 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine,
|
|||
bc.validator = NewBlockValidator(chainConfig, bc)
|
||||
bc.prefetcher = newStatePrefetcher(chainConfig, bc.hc)
|
||||
bc.processor = NewStateProcessor(bc.hc)
|
||||
bc.parallelProcessor = NewParallelStateProcessor(bc.hc, bc.GetVMConfig())
|
||||
bc.parallelProcessor = *NewParallelStateProcessor(bc.hc, bc.GetVMConfig())
|
||||
|
||||
genesisHeader := bc.GetHeaderByNumber(0)
|
||||
if genesisHeader == nil {
|
||||
|
|
|
|||
|
|
@ -24,6 +24,11 @@ type ProcessResultWithMetrics struct {
|
|||
PostProcessTime time.Duration
|
||||
}
|
||||
|
||||
// errResult wraps an error into a new ProcessResultWithMetrics instance
|
||||
func errResult(err error) *ProcessResultWithMetrics {
|
||||
return &ProcessResultWithMetrics{ProcessResult: &ProcessResult{Error: err}}
|
||||
}
|
||||
|
||||
// ParallelStateProcessor is used to execute and verify blocks containing
|
||||
// access lists.
|
||||
type ParallelStateProcessor struct {
|
||||
|
|
@ -32,11 +37,23 @@ type ParallelStateProcessor struct {
|
|||
}
|
||||
|
||||
// NewParallelStateProcessor returns a new ParallelStateProcessor instance.
|
||||
func NewParallelStateProcessor(chain *HeaderChain, vmConfig *vm.Config) ParallelStateProcessor {
|
||||
res := NewStateProcessor(chain)
|
||||
return ParallelStateProcessor{
|
||||
res,
|
||||
vmConfig,
|
||||
func NewParallelStateProcessor(chain *HeaderChain, vmConfig *vm.Config) *ParallelStateProcessor {
|
||||
return &ParallelStateProcessor{
|
||||
StateProcessor: NewStateProcessor(chain),
|
||||
vmCfg: vmConfig,
|
||||
}
|
||||
}
|
||||
|
||||
// execVMConfig returns the subset of the configured VM options that is safe to
|
||||
// reuse across the parallel per-transaction and post-transaction executions.
|
||||
// Only the fields explicitly copied here are propagated (mirroring the original
|
||||
// per-tx behaviour); notably the full caller-supplied config is used only for
|
||||
// pre-execution in processBlockPreTx.
|
||||
func (p *ParallelStateProcessor) execVMConfig() vm.Config {
|
||||
return vm.Config{
|
||||
NoBaseFee: p.vmCfg.NoBaseFee,
|
||||
EnablePreimageRecording: p.vmCfg.EnablePreimageRecording,
|
||||
ExtraEips: slices.Clone(p.vmCfg.ExtraEips),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -44,21 +61,17 @@ func NewParallelStateProcessor(chain *HeaderChain, vmConfig *vm.Config) Parallel
|
|||
// performs post-tx state transition (system contracts and withdrawals)
|
||||
// and calculates the ProcessResult, returning it to be sent on resCh
|
||||
// by resultHandler
|
||||
func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, preTxBal *bal.ConstructionBlockAccessList, prepared *bal.AccessListReader, statedb *state.StateDB, results []txExecResult) *ProcessResultWithMetrics {
|
||||
func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, preTxBAL *bal.ConstructionBlockAccessList, accessList *bal.AccessListReader, statedb *state.StateDB, results []txExecResult) *ProcessResultWithMetrics {
|
||||
tExec := time.Since(tExecStart)
|
||||
tPostprocessStart := time.Now()
|
||||
header := block.Header()
|
||||
|
||||
vmContext := NewEVMBlockContext(header, p.chain, nil)
|
||||
// The post-execution changes are recorded at the BAL index immediately
|
||||
// following the last transaction.
|
||||
lastBALIdx := len(block.Transactions()) + 1
|
||||
postTxState := statedb.WithReader(state.NewReaderWithPreparedAccessList(statedb.Reader(), prepared, lastBALIdx))
|
||||
postTxState := statedb.WithReader(state.NewReaderWithAccessList(statedb.Reader(), accessList, lastBALIdx))
|
||||
|
||||
cfg := vm.Config{
|
||||
NoBaseFee: p.vmCfg.NoBaseFee,
|
||||
EnablePreimageRecording: p.vmCfg.EnablePreimageRecording,
|
||||
ExtraEips: slices.Clone(p.vmCfg.ExtraEips),
|
||||
}
|
||||
evm := vm.NewEVM(vmContext, postTxState, p.chainConfig(), cfg)
|
||||
evm := vm.NewEVM(NewEVMBlockContext(header, p.chain, nil), postTxState, p.chainConfig(), p.execVMConfig())
|
||||
|
||||
// 1. order the receipts by tx index
|
||||
// 2. correctly calculate the cumulative gas used per receipt, returning bad block error if it goes over the allowed
|
||||
|
|
@ -71,10 +84,10 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar
|
|||
sumRegular uint64
|
||||
sumState uint64
|
||||
cumulativeReceipt uint64 // cumulative receipt gas (what users pay)
|
||||
)
|
||||
|
||||
var allLogs []*types.Log
|
||||
var allReceipts []*types.Receipt
|
||||
allLogs []*types.Log
|
||||
allReceipts []*types.Receipt
|
||||
)
|
||||
for _, result := range results {
|
||||
sumRegular += result.txRegular
|
||||
sumState += result.txState
|
||||
|
|
@ -87,24 +100,19 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar
|
|||
// Block gas = max(sum_regular, sum_state) per EIP-8037.
|
||||
blockGasUsed := max(sumRegular, sumState)
|
||||
if blockGasUsed > header.GasLimit {
|
||||
return &ProcessResultWithMetrics{
|
||||
ProcessResult: &ProcessResult{Error: fmt.Errorf("gas limit exceeded")},
|
||||
}
|
||||
return errResult(fmt.Errorf("gas limit exceeded"))
|
||||
}
|
||||
|
||||
requests, postBal, err := PostExecution(context.Background(), p.chainConfig(), block.Number(), block.Time(), allLogs, evm, uint32(len(block.Transactions())+1))
|
||||
requests, postBAL, err := PostExecution(context.Background(), p.chainConfig(), block.Number(), block.Time(), allLogs, evm, uint32(lastBALIdx))
|
||||
if err != nil {
|
||||
return &ProcessResultWithMetrics{
|
||||
ProcessResult: &ProcessResult{Error: err},
|
||||
}
|
||||
return errResult(err)
|
||||
}
|
||||
|
||||
p.chain.Engine().Finalize(p.chain, block.Header(), evm.StateDB, block.Body(), uint32(len(block.Transactions()))+1, postBal)
|
||||
p.chain.Engine().Finalize(p.chain, block.Header(), evm.StateDB, block.Body(), uint32(lastBALIdx), postBAL)
|
||||
|
||||
blockAccessList := bal.NewConstructionBlockAccessList()
|
||||
blockAccessList.Merge(preTxBal)
|
||||
blockAccessList.Merge(postBal)
|
||||
|
||||
blockAccessList.Merge(preTxBAL)
|
||||
blockAccessList.Merge(postBAL)
|
||||
for _, res := range results {
|
||||
blockAccessList.Merge(res.blockAccessList)
|
||||
}
|
||||
|
|
@ -112,9 +120,7 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar
|
|||
// TODO: do we move validation to ValidateState?
|
||||
if block.AccessList().Hash() != blockAccessList.ToEncodingObj().Hash() {
|
||||
// TODO: expose json string method on encoding block access list and log it here
|
||||
return &ProcessResultWithMetrics{
|
||||
ProcessResult: &ProcessResult{Error: fmt.Errorf("invalid block access list: mismatch between local and remote block access list")},
|
||||
}
|
||||
return errResult(fmt.Errorf("invalid block access list: mismatch between local and remote block access list"))
|
||||
}
|
||||
|
||||
tPostprocess := time.Since(tPostprocessStart)
|
||||
|
|
@ -133,11 +139,9 @@ func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStar
|
|||
}
|
||||
|
||||
type txExecResult struct {
|
||||
idx int // transaction index
|
||||
receipt *types.Receipt
|
||||
err error // non-EVM error which would render the block invalid
|
||||
blockGas uint64
|
||||
execGas uint64
|
||||
receipt *types.Receipt
|
||||
err error // non-EVM error which would render the block invalid
|
||||
execGas uint64 // gas reported on the receipt (what the user pays)
|
||||
|
||||
// Per-tx dimensional gas for Amsterdam 2D gas accounting (EIP-8037).
|
||||
txRegular uint64
|
||||
|
|
@ -151,38 +155,40 @@ type txExecResult struct {
|
|||
func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxBAL *bal.ConstructionBlockAccessList, prepared *bal.AccessListReader, statedb *state.StateDB, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) {
|
||||
// 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd
|
||||
// 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL)
|
||||
var results []txExecResult
|
||||
var cumulativeStateGas, cumulativeRegularGas uint64
|
||||
var execErr error
|
||||
var numTxComplete int
|
||||
var (
|
||||
results []txExecResult
|
||||
cumulativeStateGas uint64
|
||||
cumulativeRegularGas uint64
|
||||
execErr error
|
||||
)
|
||||
|
||||
if len(block.Transactions()) > 0 {
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case res := <-txResCh:
|
||||
numTxComplete++
|
||||
if execErr == nil {
|
||||
// short-circuit if invalid block was detected
|
||||
if res.err != nil {
|
||||
execErr = res.err
|
||||
} else if bottleneck := max(cumulativeRegularGas+res.txRegular, cumulativeStateGas+res.txState); bottleneck > block.GasLimit() {
|
||||
execErr = fmt.Errorf("block used too much gas in bottleneck dimension: %d. block gas limit is %d", bottleneck, block.GasLimit())
|
||||
} else {
|
||||
cumulativeStateGas += res.txState
|
||||
results = append(results, res)
|
||||
}
|
||||
}
|
||||
if numTxComplete == len(block.Transactions()) {
|
||||
break loop
|
||||
if numTx := len(block.Transactions()); numTx > 0 {
|
||||
for completed := 0; completed < numTx; completed++ {
|
||||
res := <-txResCh
|
||||
if execErr != nil {
|
||||
// A block-invalidating result was already seen; keep draining so
|
||||
// the worker goroutines don't block on their sends.
|
||||
continue
|
||||
}
|
||||
switch {
|
||||
case res.err != nil:
|
||||
execErr = res.err
|
||||
default:
|
||||
bottleneck := max(cumulativeRegularGas+res.txRegular, cumulativeStateGas+res.txState)
|
||||
if bottleneck > block.GasLimit() {
|
||||
execErr = fmt.Errorf("block used too much gas in bottleneck dimension: %d. block gas limit is %d", bottleneck, block.GasLimit())
|
||||
continue
|
||||
}
|
||||
cumulativeRegularGas += res.txRegular
|
||||
cumulativeStateGas += res.txState
|
||||
results = append(results, res)
|
||||
}
|
||||
}
|
||||
|
||||
if execErr != nil {
|
||||
// Drain stateRootCalcResCh so calcAndVerifyRoot goroutine can exit.
|
||||
// Drain stateRootCalcResCh so the calcAndVerifyRoot goroutine can exit.
|
||||
<-stateRootCalcResCh
|
||||
resCh <- &ProcessResultWithMetrics{ProcessResult: &ProcessResult{Error: execErr}}
|
||||
resCh <- errResult(execErr)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
@ -190,11 +196,12 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxBAL *bal
|
|||
execResults := p.prepareExecResult(block, tExecStart, preTxBAL, prepared, statedb, results)
|
||||
rootCalcRes := <-stateRootCalcResCh
|
||||
|
||||
if execResults.ProcessResult.Error != nil {
|
||||
switch {
|
||||
case execResults.ProcessResult.Error != nil:
|
||||
resCh <- execResults
|
||||
} else if rootCalcRes.err != nil {
|
||||
resCh <- &ProcessResultWithMetrics{ProcessResult: &ProcessResult{Error: rootCalcRes.err}}
|
||||
} else {
|
||||
case rootCalcRes.err != nil:
|
||||
resCh <- errResult(rootCalcRes.err)
|
||||
default:
|
||||
execResults.StateTransitionMetrics = rootCalcRes.metrics
|
||||
resCh <- execResults
|
||||
}
|
||||
|
|
@ -213,107 +220,88 @@ func (p *ParallelStateProcessor) calcAndVerifyRoot(block *types.Block, stateTran
|
|||
res := stateRootCalculationResult{
|
||||
metrics: stateTransition.Metrics(),
|
||||
}
|
||||
|
||||
if root != block.Root() {
|
||||
res.err = fmt.Errorf("state root mismatch. local: %x. remote: %x", root, block.Root())
|
||||
}
|
||||
resCh <- res
|
||||
}
|
||||
|
||||
// execTx executes single transaction returning a result which includes state accessed/modified
|
||||
// execTx executes a single transaction returning a result which includes state accessed/modified.
|
||||
func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transaction, balIdx int, db *state.StateDB, signer types.Signer) *txExecResult {
|
||||
header := block.Header()
|
||||
context := NewEVMBlockContext(header, p.chain, nil)
|
||||
|
||||
cfg := vm.Config{
|
||||
NoBaseFee: p.vmCfg.NoBaseFee,
|
||||
EnablePreimageRecording: p.vmCfg.EnablePreimageRecording,
|
||||
ExtraEips: slices.Clone(p.vmCfg.ExtraEips),
|
||||
}
|
||||
evm := vm.NewEVM(context, db, p.chainConfig(), cfg)
|
||||
evmContext := NewEVMBlockContext(header, p.chain, nil)
|
||||
evm := vm.NewEVM(evmContext, db, p.chainConfig(), p.execVMConfig())
|
||||
|
||||
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("could not apply tx %d [%v]: %w", balIdx, tx.Hash().Hex(), err)
|
||||
return &txExecResult{err: err}
|
||||
return &txExecResult{err: fmt.Errorf("could not apply tx %d [%v]: %w", balIdx, tx.Hash().Hex(), err)}
|
||||
}
|
||||
gp := NewGasPool(block.GasLimit())
|
||||
sender, err := signer.Sender(tx)
|
||||
if err != nil {
|
||||
// TODO: can this even happen at this stage?
|
||||
err = fmt.Errorf("could not recover sender for tx at bal idx %d: %v\n", balIdx, err)
|
||||
return &txExecResult{err: fmt.Errorf("could not recover sender for tx at bal idx %d: %w", balIdx, err)}
|
||||
}
|
||||
|
||||
gp := NewGasPool(block.GasLimit())
|
||||
// TODO: make precompiled addresses be resolvable from chain config + block
|
||||
db.Prepare(evm.GetRules(), sender, block.Coinbase(), tx.To(), vm.PrecompiledAddressesCancun, tx.AccessList())
|
||||
|
||||
db.SetTxContext(tx.Hash(), balIdx-1, uint32(balIdx))
|
||||
|
||||
receipt, txBAL, err := ApplyTransactionWithEVM(msg, gp, db, block.Number(), block.Hash(), context.Time, tx, evm)
|
||||
receipt, txBAL, err := ApplyTransactionWithEVM(msg, gp, db, block.Number(), block.Hash(), evmContext.Time, tx, evm)
|
||||
if err != nil {
|
||||
err := fmt.Errorf("could not apply tx %d [%v]: %w", balIdx, tx.Hash().Hex(), err)
|
||||
return &txExecResult{err: err}
|
||||
return &txExecResult{err: fmt.Errorf("could not apply tx %d [%v]: %w", balIdx, tx.Hash().Hex(), err)}
|
||||
}
|
||||
|
||||
return &txExecResult{
|
||||
idx: balIdx,
|
||||
receipt: receipt,
|
||||
execGas: receipt.GasUsed,
|
||||
blockGas: gp.Used(),
|
||||
txRegular: gp.cumulativeRegular,
|
||||
txState: gp.cumulativeState,
|
||||
blockAccessList: txBAL,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*bal.ConstructionBlockAccessList, error) {
|
||||
var (
|
||||
header = block.Header()
|
||||
)
|
||||
vmContext := NewEVMBlockContext(header, p.chain, nil)
|
||||
evm := vm.NewEVM(vmContext, statedb, p.chainConfig(), cfg)
|
||||
|
||||
accessList := PreExecution(context.Background(), block.BeaconRoot(), block.ParentHash(), p.chainConfig(), evm, block.Number(), block.Time())
|
||||
return accessList, nil
|
||||
func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, cfg vm.Config) *bal.ConstructionBlockAccessList {
|
||||
header := block.Header()
|
||||
evm := vm.NewEVM(NewEVMBlockContext(header, p.chain, nil), statedb, p.chainConfig(), cfg)
|
||||
return PreExecution(context.Background(), block.BeaconRoot(), block.ParentHash(), p.chainConfig(), evm, block.Number(), block.Time())
|
||||
}
|
||||
|
||||
// Process performs EVM execution and state root computation for a block which is known
|
||||
// to contain an access list.
|
||||
func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *state.BALStateTransition, statedb *state.StateDB, cfg vm.Config) (*ProcessResultWithMetrics, error) {
|
||||
header := block.Header()
|
||||
signer := types.MakeSigner(p.chainConfig(), header.Number, header.Time)
|
||||
|
||||
var (
|
||||
header = block.Header()
|
||||
resCh = make(chan *ProcessResultWithMetrics)
|
||||
signer = types.MakeSigner(p.chainConfig(), header.Number, header.Time)
|
||||
rootCalcResultCh = make(chan stateRootCalculationResult)
|
||||
txResCh = make(chan txExecResult)
|
||||
|
||||
pStart = time.Now()
|
||||
tExecStart time.Time
|
||||
tPreprocess time.Duration // time to create a set of prestates for parallel transaction execution
|
||||
)
|
||||
|
||||
// Pre-transaction processing: system-contract updates and the pre-tx BAL.
|
||||
pStart := time.Now()
|
||||
startingState := statedb.Copy()
|
||||
prepared := stateTransition.PreparedAccessList()
|
||||
preTxBal, err := p.processBlockPreTx(block, statedb, cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
preTxBAL := p.processBlockPreTx(block, statedb, cfg)
|
||||
tPreprocess := time.Since(pStart)
|
||||
|
||||
// compute the reads/mutations at the last bal index
|
||||
tPreprocess = time.Since(pStart)
|
||||
// Execute transactions and the state-root calculation in parallel.
|
||||
tExecStart := time.Now()
|
||||
go p.resultHandler(block, preTxBAL, prepared, statedb, tExecStart, txResCh, rootCalcResultCh, resCh)
|
||||
|
||||
// execute transactions and state root calculation in parallel
|
||||
tExecStart = time.Now()
|
||||
go p.resultHandler(block, preTxBal, prepared, statedb, tExecStart, txResCh, rootCalcResultCh, resCh)
|
||||
// Workers execute transactions concurrently against per-tx state copies.
|
||||
// Each worker reports completion (and any block-invalidating error) on
|
||||
// txResCh, which resultHandler drains. Worker errors therefore flow through
|
||||
// the channel rather than the errgroup, so the group is used purely to bound
|
||||
// concurrency and Wait() is intentionally not called.
|
||||
var workers errgroup.Group
|
||||
workers.SetLimit(runtime.NumCPU())
|
||||
for i, t := range block.Transactions() {
|
||||
tx := t
|
||||
idx := i
|
||||
sdb := startingState.Copy()
|
||||
for i, tx := range block.Transactions() {
|
||||
balIdx := i + 1
|
||||
prestate := startingState.Copy()
|
||||
workers.Go(func() error {
|
||||
startingState := sdb.WithReader(state.NewReaderWithPreparedAccessList(statedb.Reader(), prepared, idx+1))
|
||||
res := p.execTx(block, tx, idx+1, startingState, signer)
|
||||
txResCh <- *res
|
||||
prestate = prestate.WithReader(state.NewReaderWithAccessList(statedb.Reader(), prepared, balIdx))
|
||||
txResCh <- *p.execTx(block, tx, balIdx, prestate, signer)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -220,10 +220,10 @@ type ReaderWithBlockLevelAccessList struct {
|
|||
TxIndex int
|
||||
}
|
||||
|
||||
// NewReaderWithPreparedAccessList wraps a base reader with a shared, already
|
||||
// NewReaderWithAccessList wraps a base reader with a shared, already
|
||||
// preprocessed access list. This is the cheap constructor used on the hot path:
|
||||
// the prepared list is built once per block and borrowed by every per-tx reader.
|
||||
func NewReaderWithPreparedAccessList(base Reader, prepared *bal.AccessListReader, txIndex int) *ReaderWithBlockLevelAccessList {
|
||||
func NewReaderWithAccessList(base Reader, prepared *bal.AccessListReader, txIndex int) *ReaderWithBlockLevelAccessList {
|
||||
return &ReaderWithBlockLevelAccessList{
|
||||
Reader: base,
|
||||
prepared: prepared,
|
||||
|
|
@ -232,10 +232,10 @@ func NewReaderWithPreparedAccessList(base Reader, prepared *bal.AccessListReader
|
|||
}
|
||||
|
||||
// NewReaderWithBlockLevelAccessList wraps a base reader with a raw access list,
|
||||
// preprocessing it on the spot. Prefer NewReaderWithPreparedAccessList when the
|
||||
// preprocessing it on the spot. Prefer NewReaderWithAccessList when the
|
||||
// prepared list can be built once and shared across multiple readers.
|
||||
func NewReaderWithBlockLevelAccessList(base Reader, accessList bal.BlockAccessList, txIndex int) *ReaderWithBlockLevelAccessList {
|
||||
return NewReaderWithPreparedAccessList(base, bal.NewAccessListReader(accessList), txIndex)
|
||||
return NewReaderWithAccessList(base, bal.NewAccessListReader(accessList), txIndex)
|
||||
}
|
||||
|
||||
// Account implements Reader, returning the account with the specific address.
|
||||
|
|
|
|||
Loading…
Reference in a new issue