diff --git a/cmd/evm/blockrunner.go b/cmd/evm/blockrunner.go index c6fac5396e..070bdf4c55 100644 --- a/cmd/evm/blockrunner.go +++ b/cmd/evm/blockrunner.go @@ -117,7 +117,7 @@ func runBlockTest(ctx *cli.Context, fname string) ([]testResult, error) { test := tests[name] result := &testResult{Name: name, Pass: true} var finalRoot *common.Hash - if err := test.Run(false, rawdb.PathScheme, ctx.Bool(WitnessCrossCheckFlag.Name), tracer, func(res error, chain *core.BlockChain) { + if err := test.Run(false, rawdb.PathScheme, ctx.Bool(WitnessCrossCheckFlag.Name), true, tracer, func(res error, chain *core.BlockChain) { if ctx.Bool(DumpFlag.Name) { if s, _ := chain.State(); s != nil { result.State = dump(s) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index c02e307bdc..a6f42ee327 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -20,6 +20,7 @@ import ( "bufio" "errors" "fmt" + "github.com/ethereum/go-ethereum/core/types/bal" "os" "reflect" "runtime" @@ -241,6 +242,28 @@ func makeFullNode(ctx *cli.Context) *node.Node { cfg.Eth.OverrideUBT = &v } + if ctx.IsSet(utils.BALExecutionModeFlag.Name) { + val := ctx.String(utils.BALExecutionModeFlag.Name) + switch val { + case utils.BalExecutionModeOptimized: + cfg.Eth.BALExecutionMode = bal.BALExecutionOptimized + case utils.BalExecutionModeNoBatchIO: + cfg.Eth.BALExecutionMode = bal.BALExecutionNoBatchIO + case utils.BalExecutionModeSequential: + cfg.Eth.BALExecutionMode = bal.BALExecutionSequential + default: + utils.Fatalf("invalid option for --bal.executionmode: %s. acceptable values are full|nobatchio|sequential", val) + } + } + cfg.Eth.BlockingPrefetch = ctx.Bool(utils.BlockingPrefetchFlag.Name) + + prefetchWorkers := ctx.Uint(utils.PrefetchWorkersFlag.Name) + if ctx.IsSet(utils.PrefetchWorkersFlag.Name) && prefetchWorkers == 0 { + prefetchWorkers = uint(runtime.NumCPU()) + log.Warn(fmt.Sprintf("invalid value for --bal.prefetchworkers. got 0. sanitizing to %d", prefetchWorkers)) + } + cfg.Eth.PrefetchWorkers = prefetchWorkers + // Start metrics export if enabled. utils.SetupMetrics(&cfg.Metrics) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 0f88dbbcf1..36942189af 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -96,6 +96,8 @@ var ( utils.LightKDFFlag, utils.EthRequiredBlocksFlag, utils.BALExecutionModeFlag, + utils.PrefetchWorkersFlag, + utils.BlockingPrefetchFlag, utils.LegacyWhitelistFlag, // deprecated utils.CacheFlag, utils.CacheDatabaseFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index ad70d48c2d..6601818674 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -28,6 +28,7 @@ import ( "net/http" "os" "path/filepath" + "runtime" godebug "runtime/debug" "strconv" "strings" @@ -248,6 +249,17 @@ var ( Usage: "EIP-7928 block-access-list execution mode (no-op placeholder)", Category: flags.EthCategory, } + PrefetchWorkersFlag = &cli.UintFlag{ + Name: "bal.prefetchworkers", + Usage: "The number of concurrent state loading tasks to perform when prefetching BAL state. Default to the number of cpus", + Value: uint(runtime.NumCPU()), + Category: flags.MiscCategory, + } + BlockingPrefetchFlag = &cli.BoolFlag{ + Name: "bal.blockingprefetch", + Usage: "only relevant when executing in parallel with a BAL: if true, the prefetcher will block tx/state-root calculation until all scheduled fetching tasks have completed.", + Category: flags.MiscCategory, + } BloomFilterSizeFlag = &cli.Uint64Flag{ Name: "bloomfilter.size", Usage: "Megabytes of memory allocated to bloom-filter for pruning", @@ -1119,6 +1131,12 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server. } ) +const ( + BalExecutionModeOptimized = "full" + BalExecutionModeNoBatchIO = "nobatchio" + BalExecutionModeSequential = "sequential" +) + var ( // TestnetFlags is the flag group of all built-in supported testnets. TestnetFlags = []cli.Flag{ diff --git a/core/block_validator.go b/core/block_validator.go index 962fffb82a..f2df77a27d 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -19,9 +19,9 @@ package core import ( "errors" "fmt" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie" @@ -143,9 +143,14 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { return nil } +type StateRootSource interface { + IntermediateRoot(deleteEmptyObjects bool) common.Hash + Error() error +} + // ValidateState validates the various changes that happen after a state transition, // such as amount of used gas, the receipt roots and the state root itself. -func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, res *ProcessResult, stateless bool) error { +func (v *BlockValidator) ValidateState(block *types.Block, state StateRootSource, res *ProcessResult, stateless bool) error { if res == nil { return errors.New("nil ProcessResult value") } @@ -201,8 +206,8 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD } // Validate the state root against the received state root and throw // an error if they don't match. - if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root { - return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error()) + if root := state.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root { + return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, state.Error()) } return nil } diff --git a/core/blockchain.go b/core/blockchain.go index eb4ea7ebc1..29eca4800d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "github.com/ethereum/go-ethereum/core/types/bal" "io" "math/big" "runtime" @@ -221,6 +222,10 @@ type BlockChainConfig struct { // Execution configs StatelessSelfValidation bool // Generate execution witnesses and self-check against them (testing purpose) EnableWitnessStats bool // Whether trie access statistics collection is enabled + + BALExecutionMode bal.BALExecutionMode + BlockingPrefetch bool + PrefetchWorkers int } // DefaultConfig returns the default config. @@ -360,12 +365,13 @@ type BlockChain struct { stopping atomic.Bool // false if chain is running, true when stopped procInterrupt atomic.Bool // interrupt signaler for block processing - engine consensus.Engine - validator Validator // Block and state validator interface - prefetcher Prefetcher - processor Processor // Block transaction processor interface - logger *tracing.Hooks - stateSizer *state.SizeTracker // State size tracking + engine consensus.Engine + validator Validator // Block and state validator interface + prefetcher Prefetcher + processor Processor // Block transaction processor interface + parallelProcessor ParallelStateProcessor // block processor for use with access lists + logger *tracing.Hooks + stateSizer *state.SizeTracker // State size tracking lastForkReadyAlert time.Time // Last time there was a fork readiness print out slowBlockThreshold time.Duration // Block execution time threshold beyond which detailed statistics will be logged @@ -427,6 +433,7 @@ func NewBlockChain(db ethdb.Database, genesis *Genesis, engine consensus.Engine, bc.validator = NewBlockValidator(chainConfig, bc) bc.prefetcher = newStatePrefetcher(chainConfig, bc.hc) bc.processor = NewStateProcessor(bc.hc) + bc.parallelProcessor = NewParallelStateProcessor(bc.hc, bc.GetVMConfig()) genesisHeader := bc.GetHeaderByNumber(0) if genesisHeader == nil { @@ -1642,7 +1649,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error { // writeBlockWithState writes block, metadata and corresponding state data to the // database. -func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, statedb *state.StateDB) error { +func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, statedb state.Committer) error { if !bc.HasHeader(block.ParentHash(), block.NumberU64()-1) { return consensus.ErrUnknownAncestor } @@ -1756,7 +1763,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead. // This function expects the chain mutex to be held. -func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { +func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state state.Committer, emitHeadEvent bool) (status WriteStatus, err error) { if err := bc.writeBlockWithState(block, receipts, state); err != nil { return NonStatTy, err } @@ -2111,16 +2118,133 @@ type ExecuteConfig struct { EnableWitnessStats bool } +func (bc *BlockChain) processBlockWithAccessList(parentRoot common.Hash, block *types.Block, setHead bool) (procRes *blockProcessingResult, blockEndErr error) { + var ( + startTime = time.Now() + procTime time.Duration + statedb *state.StateDB + ) + + sdb := state.NewMPTDatabase(bc.triedb, bc.codedb).WithSnapshot(bc.snaps) + + useAsyncReads := bc.cfg.BALExecutionMode != bal.BALExecutionNoBatchIO + al := block.AccessList() + accessListReader := bal.NewAccessListReader(*al) + prefetchReader, err := sdb.ReaderWithPrefetch(parentRoot, accessListReader.StorageKeys(useAsyncReads), bc.cfg.PrefetchWorkers, bc.cfg.BlockingPrefetch) + if err != nil { + return nil, err + } + + stateTransition, err := state.NewBALStateTransition(block, prefetchReader, sdb, parentRoot) + if err != nil { + return nil, err + } + statedb, err = state.NewWithReader(parentRoot, sdb, prefetchReader) + if err != nil { + return nil, err + } + + if bc.logger != nil && bc.logger.OnBlockStart != nil { + bc.logger.OnBlockStart(tracing.BlockEvent{ + Block: block, + Finalized: bc.CurrentFinalBlock(), + Safe: bc.CurrentSafeBlock(), + }) + } + if bc.logger != nil && bc.logger.OnBlockEnd != nil { + defer func() { + bc.logger.OnBlockEnd(blockEndErr) + }() + } + + res, err := bc.parallelProcessor.Process(block, stateTransition, statedb, bc.cfg.VmConfig) + if err != nil { + return nil, err + } + + if err := bc.validator.ValidateState(block, stateTransition, res.ProcessResult, false); err != nil { + return nil, err + } + + procTime = time.Since(startTime) + writeStart := time.Now() + // Write the block to the chain and get the status. + var ( + //wstart = time.Now() + status WriteStatus + ) + if !setHead { + // Don't set the head, only insert the block + err = bc.writeBlockWithState(block, res.ProcessResult.Receipts, stateTransition) + } else { + status, err = bc.writeBlockAndSetHead(block, res.ProcessResult.Receipts, res.ProcessResult.Logs, stateTransition, false) + } + if err != nil { + return nil, err + } + writeTime := time.Since(writeStart) + var stats ExecuteStats + + wc := stateTransition.WrittenCounts() + d := stateTransition.Deletions() + //codeLoaded, codeLoadBytes := prefetchReader.(state.CodeLoadTracker).CodeLoads() + //stats.AccountLoaded = al.UniqueAccountCount() + stats.AccountUpdated = wc.Accounts - d.Accounts + stats.AccountDeleted = d.Accounts + //stats.StorageLoaded = al.UniqueStorageSlotCount() + stats.StorageUpdated = wc.StorageSlots - d.Storage + stats.StorageDeleted = d.Storage + //stats.CodeLoaded = codeLoaded + //stats.CodeLoadBytes = codeLoadBytes + stats.CodeUpdated = wc.Codes + stats.CodeUpdateBytes = wc.CodeBytes + + //stats.ExecWall = res.ExecTime + //stats.PostProcess = res.PostProcessTime + + if m := res.StateTransitionMetrics; m != nil { + stats.AccountHashes = m.AccountUpdate + m.StateUpdate + m.StateHash + stats.AccountCommits = m.AccountCommits + stats.StorageCommits = m.StorageCommits + stats.DatabaseCommit = m.TrieDBCommits + //stats.Prefetch = m.StatePrefetch + } + //stats.Prefetch = prefetchReader.(state.PrefetcherMetricer).Metrics().Elapsed + + stats.StateReadCacheStats = prefetchReader.(state.ReaderStater).GetStats() + + elapsed := time.Since(startTime) + 1 // prevent zero division + stats.TotalTime = elapsed + stats.MgasPerSecond = float64(res.ProcessResult.GasUsed) * 1000 / float64(elapsed) + stats.BlockWrite = writeTime + + // TODO: reinstate + //stats.balTransitionStats = res.StateTransitionMetrics + + return &blockProcessingResult{ + usedGas: res.ProcessResult.GasUsed, + procTime: procTime, + status: status, + witness: nil, + stats: &stats, + }, nil +} + // ProcessBlock executes and validates the given block. If there was no error // it writes the block and associated state to database. func (bc *BlockChain) ProcessBlock(ctx context.Context, parentRoot common.Hash, block *types.Block, config ExecuteConfig) (result *blockProcessingResult, blockEndErr error) { var ( - err error - startTime = time.Now() - statedb *state.StateDB - interrupt atomic.Bool - sdb state.Database + err error + startTime = time.Now() + statedb *state.StateDB + interrupt atomic.Bool + sdb state.Database + blockHasAccessList = block.AccessList() != nil ) + + if blockHasAccessList && bc.cfg.BALExecutionMode != bal.BALExecutionSequential { + return bc.processBlockWithAccessList(parentRoot, block, config.WriteHead) + } defer interrupt.Store(true) // terminate the prefetch at the end if bc.chainConfig.IsUBT(block.Number(), block.Time()) { diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go new file mode 100644 index 0000000000..3407aabad7 --- /dev/null +++ b/core/parallel_state_processor.go @@ -0,0 +1,329 @@ +package core + +import ( + "cmp" + "context" + "fmt" + "runtime" + "slices" + "time" + + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/bal" + "github.com/ethereum/go-ethereum/core/vm" + "golang.org/x/sync/errgroup" +) + +// ProcessResultWithMetrics wraps ProcessResult with timing breakdown for BAL block processing. +type ProcessResultWithMetrics struct { + ProcessResult *ProcessResult + PreProcessTime time.Duration + StateTransitionMetrics *state.BALStateTransitionMetrics + ExecTime time.Duration + PostProcessTime time.Duration +} + +// ParallelStateProcessor is used to execute and verify blocks containing +// access lists. +type ParallelStateProcessor struct { + *StateProcessor + vmCfg *vm.Config +} + +// NewParallelStateProcessor returns a new ParallelStateProcessor instance. +func NewParallelStateProcessor(chain *HeaderChain, vmConfig *vm.Config) ParallelStateProcessor { + res := NewStateProcessor(chain) + return ParallelStateProcessor{ + res, + vmConfig, + } +} + +// called by resultHandler when all transactions have successfully executed. +// performs post-tx state transition (system contracts and withdrawals) +// and calculates the ProcessResult, returning it to be sent on resCh +// by resultHandler +func (p *ParallelStateProcessor) prepareExecResult(block *types.Block, tExecStart time.Time, preTxBal *bal.ConstructionBlockAccessList, statedb *state.StateDB, results []txExecResult) *ProcessResultWithMetrics { + tExec := time.Since(tExecStart) + tPostprocessStart := time.Now() + header := block.Header() + + vmContext := NewEVMBlockContext(header, p.chain, nil) + lastBALIdx := len(block.Transactions()) + 1 + postTxState := statedb.WithReader(state.NewReaderWithBlockLevelAccessList(statedb.Reader(), *block.AccessList(), lastBALIdx)) + + cfg := vm.Config{ + NoBaseFee: p.vmCfg.NoBaseFee, + EnablePreimageRecording: p.vmCfg.EnablePreimageRecording, + ExtraEips: slices.Clone(p.vmCfg.ExtraEips), + } + evm := vm.NewEVM(vmContext, postTxState, p.chainConfig(), cfg) + + // 1. order the receipts by tx index + // 2. correctly calculate the cumulative gas used per receipt, returning bad block error if it goes over the allowed + slices.SortFunc(results, func(a, b txExecResult) int { + return cmp.Compare(a.receipt.TransactionIndex, b.receipt.TransactionIndex) + }) + + var ( + // Per-dimension cumulative sums for 2D block gas (EIP-8037). + sumRegular uint64 + sumState uint64 + cumulativeReceipt uint64 // cumulative receipt gas (what users pay) + ) + + var allLogs []*types.Log + var allReceipts []*types.Receipt + for _, result := range results { + sumRegular += result.txRegular + sumState += result.txState + + cumulativeReceipt += result.execGas + result.receipt.CumulativeGasUsed = cumulativeReceipt + allLogs = append(allLogs, result.receipt.Logs...) + allReceipts = append(allReceipts, result.receipt) + } + // Block gas = max(sum_regular, sum_state) per EIP-8037. + blockGasUsed := max(sumRegular, sumState) + if blockGasUsed > header.GasLimit { + return &ProcessResultWithMetrics{ + ProcessResult: &ProcessResult{Error: fmt.Errorf("gas limit exceeded")}, + } + } + + requests, postBal, err := PostExecution(context.Background(), p.chainConfig(), block.Number(), block.Time(), allLogs, evm, uint32(len(block.Transactions())+1)) + if err != nil { + return &ProcessResultWithMetrics{ + ProcessResult: &ProcessResult{Error: err}, + } + } + + p.chain.Engine().Finalize(p.chain, block.Header(), evm.StateDB, block.Body(), uint32(len(block.Transactions()))+1, postBal) + + blockAccessList := bal.NewConstructionBlockAccessList() + blockAccessList.Merge(preTxBal) + blockAccessList.Merge(postBal) + + for _, res := range results { + blockAccessList.Merge(res.blockAccessList) + } + + // TODO: do we move validation to ValidateState? + if block.AccessList().Hash() != blockAccessList.ToEncodingObj().Hash() { + // TODO: expose json string method on encoding block access list and log it here + return &ProcessResultWithMetrics{ + ProcessResult: &ProcessResult{Error: fmt.Errorf("invalid block access list: mismatch between local and remote block access list")}, + } + } + + tPostprocess := time.Since(tPostprocessStart) + + return &ProcessResultWithMetrics{ + ProcessResult: &ProcessResult{ + Receipts: allReceipts, + Requests: requests, + Logs: allLogs, + GasUsed: blockGasUsed, + Bal: blockAccessList, + }, + PostProcessTime: tPostprocess, + ExecTime: tExec, + } +} + +type txExecResult struct { + idx int // transaction index + receipt *types.Receipt + err error // non-EVM error which would render the block invalid + blockGas uint64 + execGas uint64 + + // Per-tx dimensional gas for Amsterdam 2D gas accounting (EIP-8037). + txRegular uint64 + txState uint64 + + blockAccessList *bal.ConstructionBlockAccessList +} + +// resultHandler polls until all transactions have finished executing and the +// state root calculation is complete. The result is emitted on resCh. +func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxBAL *bal.ConstructionBlockAccessList, statedb *state.StateDB, tExecStart time.Time, txResCh <-chan txExecResult, stateRootCalcResCh <-chan stateRootCalculationResult, resCh chan *ProcessResultWithMetrics) { + // 1. if the block has transactions, receive the execution results from all of them and return an error on resCh if any txs err'd + // 2. once all txs are executed, compute the post-tx state transition and produce the ProcessResult sending it on resCh (or an error if the post-tx state didn't match what is reported in the BAL) + var results []txExecResult + var cumulativeStateGas, cumulativeRegularGas uint64 + var execErr error + var numTxComplete int + + if len(block.Transactions()) > 0 { + loop: + for { + select { + case res := <-txResCh: + numTxComplete++ + if execErr == nil { + // short-circuit if invalid block was detected + if res.err != nil { + execErr = res.err + } else if bottleneck := max(cumulativeRegularGas+res.txRegular, cumulativeStateGas+res.txState); bottleneck > block.GasLimit() { + execErr = fmt.Errorf("block used too much gas in bottleneck dimension: %d. block gas limit is %d", bottleneck, block.GasLimit()) + } else { + cumulativeStateGas += res.txState + results = append(results, res) + } + } + if numTxComplete == len(block.Transactions()) { + break loop + } + } + } + + if execErr != nil { + // Drain stateRootCalcResCh so calcAndVerifyRoot goroutine can exit. + <-stateRootCalcResCh + resCh <- &ProcessResultWithMetrics{ProcessResult: &ProcessResult{Error: execErr}} + return + } + } + + execResults := p.prepareExecResult(block, tExecStart, preTxBAL, statedb, results) + rootCalcRes := <-stateRootCalcResCh + + if execResults.ProcessResult.Error != nil { + resCh <- execResults + } else if rootCalcRes.err != nil { + resCh <- &ProcessResultWithMetrics{ProcessResult: &ProcessResult{Error: rootCalcRes.err}} + } else { + execResults.StateTransitionMetrics = rootCalcRes.metrics + resCh <- execResults + } +} + +type stateRootCalculationResult struct { + err error + metrics *state.BALStateTransitionMetrics +} + +// calcAndVerifyRoot performs the post-state root hash calculation, verifying +// it against what is reported by the block and returning a result on resCh. +func (p *ParallelStateProcessor) calcAndVerifyRoot(block *types.Block, stateTransition *state.BALStateTransition, resCh chan stateRootCalculationResult) { + root := stateTransition.IntermediateRoot(false) + + res := stateRootCalculationResult{ + metrics: stateTransition.Metrics(), + } + + if root != block.Root() { + res.err = fmt.Errorf("state root mismatch. local: %x. remote: %x", root, block.Root()) + } + resCh <- res +} + +// execTx executes single transaction returning a result which includes state accessed/modified +func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transaction, balIdx int, db *state.StateDB, signer types.Signer) *txExecResult { + header := block.Header() + context := NewEVMBlockContext(header, p.chain, nil) + + cfg := vm.Config{ + NoBaseFee: p.vmCfg.NoBaseFee, + EnablePreimageRecording: p.vmCfg.EnablePreimageRecording, + ExtraEips: slices.Clone(p.vmCfg.ExtraEips), + } + evm := vm.NewEVM(context, db, p.chainConfig(), cfg) + + msg, err := TransactionToMessage(tx, signer, header.BaseFee) + if err != nil { + err = fmt.Errorf("could not apply tx %d [%v]: %w", balIdx, tx.Hash().Hex(), err) + return &txExecResult{err: err} + } + gp := NewGasPool(block.GasLimit()) + sender, err := signer.Sender(tx) + if err != nil { + // TODO: can this even happen at this stage? + err = fmt.Errorf("could not recover sender for tx at bal idx %d: %v\n", balIdx, err) + } + // TODO: make precompiled addresses be resolvable from chain config + block + db.Prepare(evm.GetRules(), sender, block.Coinbase(), tx.To(), vm.PrecompiledAddressesCancun, tx.AccessList()) + + db.SetTxContext(tx.Hash(), balIdx-1, uint32(balIdx)) + + receipt, txBAL, err := ApplyTransactionWithEVM(msg, gp, db, block.Number(), block.Hash(), context.Time, tx, evm) + if err != nil { + err := fmt.Errorf("could not apply tx %d [%v]: %w", balIdx, tx.Hash().Hex(), err) + return &txExecResult{err: err} + } + + return &txExecResult{ + idx: balIdx, + receipt: receipt, + execGas: receipt.GasUsed, + blockGas: gp.Used(), + txRegular: gp.cumulativeRegular, + txState: gp.cumulativeState, + blockAccessList: txBAL, + } +} + +func (p *ParallelStateProcessor) processBlockPreTx(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*bal.ConstructionBlockAccessList, error) { + var ( + header = block.Header() + ) + vmContext := NewEVMBlockContext(header, p.chain, nil) + evm := vm.NewEVM(vmContext, statedb, p.chainConfig(), cfg) + + accessList := PreExecution(context.Background(), block.BeaconRoot(), block.ParentHash(), p.chainConfig(), evm, block.Number(), block.Time()) + return accessList, nil +} + +// Process performs EVM execution and state root computation for a block which is known +// to contain an access list. +func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *state.BALStateTransition, statedb *state.StateDB, cfg vm.Config) (*ProcessResultWithMetrics, error) { + var ( + header = block.Header() + resCh = make(chan *ProcessResultWithMetrics) + signer = types.MakeSigner(p.chainConfig(), header.Number, header.Time) + rootCalcResultCh = make(chan stateRootCalculationResult) + txResCh = make(chan txExecResult) + + pStart = time.Now() + tExecStart time.Time + tPreprocess time.Duration // time to create a set of prestates for parallel transaction execution + ) + + startingState := statedb.Copy() + preTxBal, err := p.processBlockPreTx(block, statedb, cfg) + if err != nil { + return nil, err + } + + // compute the reads/mutations at the last bal index + tPreprocess = time.Since(pStart) + + // execute transactions and state root calculation in parallel + tExecStart = time.Now() + go p.resultHandler(block, preTxBal, statedb, tExecStart, txResCh, rootCalcResultCh, resCh) + var workers errgroup.Group + workers.SetLimit(runtime.NumCPU()) + for i, t := range block.Transactions() { + tx := t + idx := i + sdb := startingState.Copy() + workers.Go(func() error { + startingState := sdb.WithReader(state.NewReaderWithBlockLevelAccessList(statedb.Reader(), *block.AccessList(), idx+1)) + res := p.execTx(block, tx, idx+1, startingState, signer) + txResCh <- *res + return nil + }) + } + + go p.calcAndVerifyRoot(block, stateTransition, rootCalcResultCh) + + res := <-resCh + if res.ProcessResult.Error != nil { + return nil, res.ProcessResult.Error + } + // TODO: remove preprocess metric ? + res.PreProcessTime = tPreprocess + return res, nil +} diff --git a/core/state/bal_state_transition.go b/core/state/bal_state_transition.go new file mode 100644 index 0000000000..de643ab0d1 --- /dev/null +++ b/core/state/bal_state_transition.go @@ -0,0 +1,530 @@ +package state + +import ( + "slices" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/bal" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/trie/trienode" + "github.com/holiman/uint256" + "golang.org/x/sync/errgroup" +) + +// BALStateTransition is responsible for performing the state root update +// and commit for EIP 7928 access-list-containing blocks. An instance of +// this object is only used for a single block. +type BALStateTransition struct { + accessList bal.AccessListReader + written bal.WrittenCounts + db Database + reader Reader + stateTrie Trie + parentRoot common.Hash + + // the computed state root of the block + rootHash common.Hash + // the state modifications performed by the block + diffs bal.StateMutations + + // a map of common.Address -> *types.StateAccount containing the block + // prestate of all accounts that will be modified + prestates sync.Map + + postStates map[common.Address]*types.StateAccount + // a map of common.Address -> Trie containing the account tries for all + // accounts with mutated storage + tries sync.Map //map[common.Address]Trie + deletions map[common.Address]struct{} + + // Deletion counters; not derivable from the BAL alone (selfdestruct vs + // balance drain is indistinguishable without prestate). + accountDeleted int + storageDeleted atomic.Int64 + + stateUpdate *StateUpdate + + metrics BALStateTransitionMetrics + maxBALIdx int + + err error +} + +func (s *BALStateTransition) Metrics() *BALStateTransitionMetrics { + return &s.metrics +} + +// DeletionCounts holds per-block deletion counters for accounts/storage +type DeletionCounts struct { + Accounts int + Storage int +} + +func (s *BALStateTransition) Deletions() DeletionCounts { + return DeletionCounts{ + Accounts: s.accountDeleted, + Storage: int(s.storageDeleted.Load()), + } +} + +type BALStateTransitionMetrics struct { + // trie hashing metrics + AccountUpdate time.Duration + StatePrefetch time.Duration + StateUpdate time.Duration + StateHash time.Duration + + // commit metrics + AccountCommits time.Duration + StorageCommits time.Duration + SnapshotCommits time.Duration + TrieDBCommits time.Duration + TotalCommitTime time.Duration +} + +func NewBALStateTransition(block *types.Block, prefetchReader Reader, db Database, parentRoot common.Hash) (*BALStateTransition, error) { + stateTrie, err := db.OpenTrie(parentRoot) + if err != nil { + return nil, err + } + + return &BALStateTransition{ + accessList: bal.NewAccessListReader(*block.AccessList()), + written: block.AccessList().WrittenCounts(), + db: db, + reader: prefetchReader, + stateTrie: stateTrie, + parentRoot: parentRoot, + rootHash: common.Hash{}, + diffs: make(bal.StateMutations), + prestates: sync.Map{}, + postStates: make(map[common.Address]*types.StateAccount), + tries: sync.Map{}, + deletions: make(map[common.Address]struct{}), + stateUpdate: nil, + maxBALIdx: len(block.Transactions()) + 1, + }, nil +} + +// WrittenCounts returns the cached BAL write counts (computed once per block). +func (s *BALStateTransition) WrittenCounts() bal.WrittenCounts { + return s.written +} + +func (s *BALStateTransition) Error() error { + return s.err +} + +func (s *BALStateTransition) setError(err error) { + if s.err == nil { + s.err = err + } +} + +// isAccountDeleted checks whether the state account was deleted in this block. Post selfdestruct-removal, +// deletions can only occur if an account which has a balance becomes the target of a CREATE2 initcode +// which calls SENDALL, clearing the account and marking it for deletion. +func isAccountDeleted(prestate *types.StateAccount, mutations bal.AccountMutations) bool { + // TODO: figure out how to simplify this method + if mutations.Code != nil && len(mutations.Code) != 0 { + return false + } + if mutations.Nonce != nil && *mutations.Nonce != 0 { + return false + } + if mutations.StorageWrites != nil && len(mutations.StorageWrites) > 0 { + return false + } + if mutations.Balance != nil { + if mutations.Balance.IsZero() { + if prestate.Nonce != 0 || prestate.Balance.IsZero() || common.BytesToHash(prestate.CodeHash) != types.EmptyCodeHash { + return false + } + // consider an empty account with storage to be deleted, so we don't check root here + return true + } + } + return false +} + +// updateAccount applies the block state mutations to a given account returning +// the updated state account and new code (if the account code changed) +func (s *BALStateTransition) updateAccount(addr common.Address) (*types.StateAccount, []byte) { + a, _ := s.prestates.Load(addr) + acct := a.(*types.StateAccount) + + acct, diff := acct.Copy(), s.diffs[addr] + code := diff.Code + + if diff.Nonce != nil { + acct.Nonce = *diff.Nonce + } + if diff.Balance != nil { + acct.Balance = new(uint256.Int).Set(diff.Balance) + } + if tr, ok := s.tries.Load(addr); ok { + acct.Root = tr.(Trie).Hash() + } + return acct, code +} + +func (s *BALStateTransition) commitAccount(addr common.Address) (*AccountUpdate, *trienode.NodeSet, error) { + op := &AccountUpdate{ + Address: addr, + Data: s.postStates[addr], // TODO: cache the updated state account somewhere + } + var prestate *types.StateAccount + if ps, exist := s.prestates.Load(addr); exist { + op.Origin = ps.(*types.StateAccount) + } + + if s.diffs[addr].Code != nil { + code := ContractCode{ + Hash: crypto.Keccak256Hash(s.diffs[addr].Code), + Blob: s.diffs[addr].Code, + } + if prestate == nil { + code.OriginHash = types.EmptyCodeHash + } else { + code.OriginHash = common.BytesToHash(prestate.CodeHash) + } + op.Code = &code + } + + if len(s.diffs[addr].StorageWrites) == 0 { + return op, nil, nil + } + + op.Storages = make(map[common.Hash]common.Hash) + op.StoragesOriginByHash = make(map[common.Hash]common.Hash) + op.StoragesOriginByKey = make(map[common.Hash]common.Hash) + + for key, value := range s.diffs[addr].StorageWrites { + hash := crypto.Keccak256Hash(key[:]) + op.Storages[hash] = value + origin, err := s.reader.Storage(addr, key) + if err != nil { + return nil, nil, err + } + op.StoragesOriginByHash[hash] = origin + op.StoragesOriginByKey[key] = origin + } + tr, _ := s.tries.Load(addr) + root, nodes := tr.(Trie).Commit(false) + s.postStates[addr].Root = root + return op, nodes, nil +} + +// CommitWithUpdate flushes mutated trie nodes and state accounts to disk. +func (s *BALStateTransition) CommitWithUpdate(block uint64, deleteEmptyObjects bool, noStorageWiping bool) (common.Hash, *StateUpdate, error) { + // 1) create a stateUpdate object + // Commit objects to the trie, measuring the elapsed time + var ( + //commitStart = time.Now() + accountTrieNodesUpdated int + accountTrieNodesDeleted int + storageTrieNodesUpdated int + storageTrieNodesDeleted int + + lock sync.Mutex // protect two maps below + nodes = trienode.NewMergedNodeSet() // aggregated trie nodes + updates = make(map[common.Hash]*AccountUpdate, len(s.diffs)) // aggregated account updates + + // merge aggregates the dirty trie nodes into the global set. + // + // Given that some accounts may be destroyed and then recreated within + // the same block, it's possible that a node set with the same owner + // may already exist. In such cases, these two sets are combined, with + // the later one overwriting the previous one if any nodes are modified + // or deleted in both sets. + // + // merge run concurrently across all the state objects and account trie. + merge = func(set *trienode.NodeSet) error { + if set == nil { + return nil + } + lock.Lock() + defer lock.Unlock() + + updates, deletes := set.Size() + if set.Owner == (common.Hash{}) { + accountTrieNodesUpdated += updates + accountTrieNodesDeleted += deletes + } else { + storageTrieNodesUpdated += updates + storageTrieNodesDeleted += deletes + } + return nodes.Merge(set) + } + ) + + destructedPrestates := make(map[common.Address]*types.StateAccount) + s.prestates.Range(func(key, value any) bool { + addr := key.(common.Address) + acct := value.(*types.StateAccount) + destructedPrestates[addr] = acct + return true + }) + + deletes, delNodes, err := handleDestruction(s.db, s.stateTrie, s.parentRoot, noStorageWiping, slices.Values(s.accessList.AllDestructions()), destructedPrestates) + if err != nil { + return common.Hash{}, nil, err + } + for _, set := range delNodes { + if err := merge(set); err != nil { + return common.Hash{}, nil, err + } + } + + // Handle all state updates afterwards, concurrently to one another to shave + // off some milliseconds from the commit operation. Also accumulate the code + // writes to run in parallel with the computations. + var ( + start = time.Now() + root common.Hash + workers errgroup.Group + ) + // Schedule the account trie first since that will be the biggest, so give + // it the most time to crunch. + // + // TODO(karalabe): This account trie commit is *very* heavy. 5-6ms at chain + // heads, which seems excessive given that it doesn't do hashing, it just + // shuffles some data. For comparison, the *hashing* at chain head is 2-3ms. + // We need to investigate what's happening as it seems something's wonky. + // Obviously it's not an end of the world issue, just something the original + // code didn't anticipate for. + workers.Go(func() error { + // Write the account trie changes, measuring the amount of wasted time + newroot, set := s.stateTrie.Commit(true) + root = newroot + + if err := merge(set); err != nil { + return err + } + s.metrics.AccountCommits = time.Since(start) + return nil + }) + + // Schedule each of the storage tries that need to be updated, so they can + // run concurrently to one another. + // + // TODO(karalabe): Experimentally, the account commit takes approximately the + // same time as all the storage commits combined, so we could maybe only have + // 2 threads in total. But that kind of depends on the account commit being + // more expensive than it should be, so let's fix that and revisit this todo. + for addr, _ := range s.diffs { + if _, isDeleted := s.deletions[addr]; isDeleted { + continue + } + + address := addr + // Run the storage updates concurrently to one another + workers.Go(func() error { + // Write any storage changes in the state object to its storage trie + update, set, err := s.commitAccount(address) + if err != nil { + return err + } + + if err := merge(set); err != nil { + return err + } + lock.Lock() + updates[crypto.Keccak256Hash(address[:])] = update + s.metrics.StorageCommits = time.Since(start) // overwrite with the longest storage commit runtime + lock.Unlock() + return nil + }) + } + // Wait for everything to finish and update the metrics + if err := workers.Wait(); err != nil { + return common.Hash{}, nil, err + } + + storageDeleted := s.storageDeleted.Load() + accountUpdatedMeter.Mark(int64(s.written.Accounts - s.accountDeleted)) + storageUpdatedMeter.Mark(int64(s.written.StorageSlots) - storageDeleted) + accountDeletedMeter.Mark(int64(s.accountDeleted)) + storageDeletedMeter.Mark(storageDeleted) + accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated)) + accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted)) + storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated)) + storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted)) + + storageKeyType := StorageKeyHashed + if noStorageWiping { + storageKeyType = StorageKeyPlain + } + update := NewStateUpdate(storageKeyType, s.parentRoot, root, block, deletes, updates, nodes) + + if err := s.db.Commit(update); err != nil { + return common.Hash{}, nil, err + } + // TODO: fix the following metrics: + /* + snapshotCommits, trieDBCommits, err := flushStateUpdate(s.db, block, ret) + if err != nil { + return common.Hash{}, nil, err + } + + s.metrics.SnapshotCommits, s.metrics.TrieDBCommits = snapshotCommits, trieDBCommits + s.metrics.TotalCommitTime = time.Since(commitStart) + */ + return root, update, nil +} + +func (s *BALStateTransition) Commit(block uint64, deleteEmptyObjects bool, noStorageWiping bool) (common.Hash, error) { + hash, _, err := s.CommitWithUpdate(block, deleteEmptyObjects, noStorageWiping) + return hash, err +} + +// IntermediateRoot applies block state mutations and computes the updated state +// trie root. +func (s *BALStateTransition) IntermediateRoot(_ bool) common.Hash { + if s.rootHash != (common.Hash{}) { + return s.rootHash + } + + // State root calculation proceeds as follows: + + // 1 (a): load the origin storage values for all slots which were modified during the block (this is needed for computing the stateUpdate) + // 1 (b): update each mutated account, producing the post-block state object by applying the state mutations to the prestate (retrieved in 1a). + // 1 (c): prefetch the intermediate trie nodes of the mutated state set from the account trie. + // + // 2: compute the post-state root of the account trie + // + // Steps 1/2 are performed sequentially, with steps 1a-d performed in parallel + + start := time.Now() + + var wg sync.WaitGroup + + s.diffs = *s.accessList.Mutations(s.maxBALIdx + 1) + + for addr, d := range s.diffs { + wg.Add(1) + address := addr + diff := d + go func() { + defer wg.Done() + + // 1 (b): update each mutated account, producing the post-block state object by applying the state mutations to the prestate (retrieved in 1a). + acct, err := s.reader.Account(address) + if err != nil { + s.setError(err) + return + } + + if acct == nil { + acct = types.NewEmptyStateAccount() + } + s.prestates.Store(address, acct) + + if len(diff.StorageWrites) > 0 { + tr, err := s.db.OpenStorageTrie(s.parentRoot, address, acct.Root, s.stateTrie) + if err != nil { + s.setError(err) + return + } + s.tries.Store(address, tr) + + var ( + updateKeys, updateValues [][]byte + deleteKeys [][]byte + ) + for key, val := range diff.StorageWrites { + if val != (common.Hash{}) { + updateKeys = append(updateKeys, key[:]) + updateValues = append(updateValues, common.TrimLeftZeroes(val[:])) + } else { + deleteKeys = append(deleteKeys, key[:]) + } + } + + if err := tr.UpdateStorageBatch(address, updateKeys, updateValues); err != nil { + s.setError(err) + return + } + + for _, key := range deleteKeys { + if err := tr.DeleteStorage(address, key); err != nil { + s.setError(err) + return + } + } + + hashStart := time.Now() + tr.Hash() + s.metrics.StateHash = time.Since(hashStart) + } + }() + } + + wg.Add(1) + // 1 (c): prefetch the intermediate trie nodes of the mutated state set from the account trie. + go func() { + defer wg.Done() + prefetchStart := time.Now() + var prefetchAddrs []common.Address + for addr, _ := range s.diffs { + prefetchAddrs = append(prefetchAddrs, addr) + } + if err := s.stateTrie.PrefetchAccount(prefetchAddrs); err != nil { + s.setError(err) + return + } + s.metrics.StatePrefetch = time.Since(prefetchStart) + }() + + wg.Wait() + s.metrics.AccountUpdate = time.Since(start) + + // 2: compute the post-state root of the account trie + stateUpdateStart := time.Now() + for mutatedAddr, _ := range s.diffs { + p, _ := s.prestates.Load(mutatedAddr) + prestate := p.(*types.StateAccount) + + isDeleted := isAccountDeleted(prestate, s.diffs[mutatedAddr]) + if isDeleted { + if err := s.stateTrie.DeleteAccount(mutatedAddr); err != nil { + s.setError(err) + return common.Hash{} + } + s.deletions[mutatedAddr] = struct{}{} + s.accountDeleted++ + } else { + acct, code := s.updateAccount(mutatedAddr) + + if code != nil { + codeHash := crypto.Keccak256Hash(code) + acct.CodeHash = codeHash.Bytes() + if err := s.stateTrie.UpdateContractCode(mutatedAddr, codeHash, code); err != nil { + s.setError(err) + return common.Hash{} + } + } + if err := s.stateTrie.UpdateAccount(mutatedAddr, acct, len(code)); err != nil { + s.setError(err) + return common.Hash{} + } + s.postStates[mutatedAddr] = acct + } + } + + s.metrics.StateUpdate = time.Since(stateUpdateStart) + + stateTrieHashStart := time.Now() + s.rootHash = s.stateTrie.Hash() + s.metrics.StateHash = time.Since(stateTrieHashStart) + return s.rootHash +} + +func (s *BALStateTransition) Preimages() map[common.Hash][]byte { + // TODO: implement this + return make(map[common.Hash][]byte) +} diff --git a/core/state/database.go b/core/state/database.go index 3b1e627f28..3eebd18bac 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -54,6 +54,10 @@ type Database interface { // Reader returns a state reader associated with the specified state root. Reader(root common.Hash) (Reader, error) + // ReaderWithPrefetch returns a reader which asynchronously fetches block + // access list state in the background. + ReaderWithPrefetch(stateRoot common.Hash, accessList map[common.Address][]common.Hash, threads int, block bool) (Reader, error) + // Iteratee returns a state iteratee associated with the specified state root, // through which the account iterator and storage iterator can be created. Iteratee(root common.Hash) (Iteratee, error) @@ -107,12 +111,18 @@ type Trie interface { // in the trie with provided address. UpdateAccount(address common.Address, account *types.StateAccount, codeLen int) error + // UpdateAccountBatch attempts to update a list accounts in the batch manner. + UpdateAccountBatch(addresses []common.Address, accounts []*types.StateAccount, codeLengths []int) error + // UpdateStorage associates key with value in the trie. If value has length zero, // any existing value is deleted from the trie. The value bytes must not be modified // by the caller while they are stored in the trie. If a node was not found in the // database, a trie.MissingNodeError is returned. UpdateStorage(addr common.Address, key, value []byte) error + // UpdateStorageBatch attempts to update a list storages in the batch manner. + UpdateStorageBatch(_ common.Address, keys [][]byte, values [][]byte) error + // DeleteAccount abstracts an account deletion from the trie. DeleteAccount(address common.Address) error diff --git a/core/state/database_history.go b/core/state/database_history.go index fbf4ab5f9c..ddc6d79238 100644 --- a/core/state/database_history.go +++ b/core/state/database_history.go @@ -223,6 +223,10 @@ type HistoricDB struct { codedb *CodeDB } +func (db *HistoricDB) ReaderWithPrefetch(stateRoot common.Hash, accessList map[common.Address][]common.Hash, threads int, block bool) (Reader, error) { + panic("not implemented") +} + // Type returns the trie type of the underlying database. func (db *HistoricDB) Type() DatabaseType { // TODO(rjl493456442) support UBT in the future diff --git a/core/state/database_mpt.go b/core/state/database_mpt.go index 42c5f2e5ef..5e7d278232 100644 --- a/core/state/database_mpt.go +++ b/core/state/database_mpt.go @@ -185,3 +185,22 @@ func (db *MPTDatabase) Commit(update *StateUpdate) error { func (db *MPTDatabase) Iteratee(root common.Hash) (Iteratee, error) { return newStateIteratee(true, root, db.triedb, db.snap) } + +func (db *MPTDatabase) ReaderWithPrefetch(stateRoot common.Hash, accessList map[common.Address][]common.Hash, threads int, block bool) (Reader, error) { + base, err := db.StateReader(stateRoot) + if err != nil { + return nil, err + } + // Construct the state reader with native cache and associated statistics + r := newStateReaderWithStats(newStateReaderWithCache(base)) + + // Construct the state reader with background prefetching + pr := newPrefetchStateReader(r, accessList, threads) + if block { + if err := pr.Wait(); err != nil { + panic("this should unreachable") + } + } + + return newReaderWithPrefetch(db.codedb.Reader(), pr, pr), nil +} diff --git a/core/state/database_ubt.go b/core/state/database_ubt.go index 16579f6d6a..5dbbab4ae1 100644 --- a/core/state/database_ubt.go +++ b/core/state/database_ubt.go @@ -80,6 +80,10 @@ func (db *UBTDatabase) Reader(stateRoot common.Hash) (Reader, error) { return newReader(db.codedb.Reader(), sr), nil } +func (db *UBTDatabase) ReaderWithPrefetch(stateRoot common.Hash, accessList map[common.Address][]common.Hash, threads int, block bool) (Reader, error) { + panic("not implemented") +} + // ReadersWithCacheStats creates a pair of state readers that share the same // underlying state reader and internal state cache, while maintaining separate // statistics respectively. diff --git a/core/state/reader.go b/core/state/reader.go index be07cec0f9..c485af96ed 100644 --- a/core/state/reader.go +++ b/core/state/reader.go @@ -560,6 +560,7 @@ func (r *stateReaderWithStats) GetStateStats() StateReaderStats { type reader struct { ContractCodeReader StateReader + PrefetcherMetricer } // newReader constructs a reader with the supplied code reader and state reader. @@ -570,6 +571,14 @@ func newReader(codeReader ContractCodeReader, stateReader StateReader) *reader { } } +func newReaderWithPrefetch(codeReader ContractCodeReader, stateReader StateReader, metricer PrefetcherMetricer) *reader { + return &reader{ + ContractCodeReader: codeReader, + StateReader: stateReader, + PrefetcherMetricer: metricer, + } +} + // GetCodeStats returns the statistics of code access. func (r *reader) GetCodeStats() ContractCodeReaderStats { if stater, ok := r.ContractCodeReader.(ContractCodeReaderStater); ok { diff --git a/core/state/reader_eip_7928.go b/core/state/reader_eip_7928.go index ff315ac5eb..72727d35c3 100644 --- a/core/state/reader_eip_7928.go +++ b/core/state/reader_eip_7928.go @@ -16,14 +16,6 @@ package state -import ( - "sync" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/core/types/bal" -) - // The EIP27928 reader utilizes a hierarchical architecture to optimize state // access during block execution: // @@ -39,15 +31,13 @@ import ( // This layer provides a "unified view" by merging the pre-transition state // with mutated states from preceding transactions in the block. // -// - Tracking Layer: Finally, the readerTracker wraps the execution reader to -// capture all state reads made during a specific transaction. These individual -// reads are subsequently merged to construct a comprehensive access list -// for the entire block. -// // The architecture can be illustrated by the diagram below: -// + +// [ Block Level Access List ] <────────────────┐ +// ▲ │ (Merge) +// │ │ // ┌──────────────┴──────────────┐ ┌──────────────┴──────────────┐ -// │ ReaderWithBlockLevelAL │ │ ReaderWithBlockLevelAL │ +// │ ReaderWithBlockLevelAL │ │ ReaderWithBlockLevelAL │ (Unified View) // │ (Pre-state + Mutations) │ │ (Pre-state + Mutations) │ // └──────────────┬──────────────┘ └──────────────┬──────────────┘ // │ │ @@ -63,11 +53,16 @@ import ( // │ (State & Contract Code) │ // └─────────────────────────────┘ -// Note: The block producer, which is responsible for generating the block -// along with the block-level access list, does not maintain the internal -// hierarchy (e.g., PrefetchStateReader or ReaderWithBlockLevelAL). -// Instead, it directly utilizes the readerTracker, wrapped around the -// base reader, to construct the access list. +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/crypto" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/bal" +) type fetchTask struct { addr common.Address @@ -78,16 +73,27 @@ func (t *fetchTask) weight() int { return 1 + len(t.slots) } type prefetchStateReader struct { StateReader - tasks []*fetchTask nThreads int done chan struct{} term chan struct{} closeOnce sync.Once + start time.Time + metrics PrefetchMetrics } -// nolint:unused -func newPrefetchStateReader(reader StateReader, accessList map[common.Address][]common.Hash, nThreads int) *prefetchStateReader { +type PrefetchMetrics struct { + // the total amount of time it took to complete the scheduled workload + Elapsed time.Duration +} + +// PrefetcherMetricer is an object that can expose metrics related to the state +// prefetching. +type PrefetcherMetricer interface { + Metrics() PrefetchMetrics +} + +func newPrefetchStateReader(reader StateReader, accessList bal.StorageKeys, nThreads int) *prefetchStateReader { tasks := make([]*fetchTask, 0, len(accessList)) for addr, slots := range accessList { tasks = append(tasks, &fetchTask{ @@ -105,11 +111,16 @@ func newPrefetchStateReaderInternal(reader StateReader, tasks []*fetchTask, nThr nThreads: nThreads, done: make(chan struct{}), term: make(chan struct{}), + start: time.Now(), } go r.prefetch() return r } +func (r *prefetchStateReader) Metrics() PrefetchMetrics { + return r.metrics +} + func (r *prefetchStateReader) Close() { r.closeOnce.Do(func() { close(r.term) @@ -127,7 +138,10 @@ func (r *prefetchStateReader) Wait() error { } func (r *prefetchStateReader) prefetch() { - defer close(r.done) + defer func() { + r.metrics = PrefetchMetrics{time.Since(r.start)} + close(r.done) + }() if len(r.tasks) == 0 { return @@ -198,50 +212,88 @@ func (r *prefetchStateReader) process(start, limit int) { // prior to TxIndex. type ReaderWithBlockLevelAccessList struct { Reader - AccessList *bal.ConstructionBlockAccessList + AccessList bal.AccessListReader TxIndex int } -// NewReaderWithBlockLevelAccessList constructs a reader for accessing states -// with the mutations made by transactions prior to txIndex. -// -// The txIndex refers to the call frame as such: -// - 0 for pre‑execution system contract calls. -// - 1 … n for transactions (in block order). -// - n + 1 for post‑execution system contract calls. -func NewReaderWithBlockLevelAccessList(base Reader, accessList *bal.ConstructionBlockAccessList, txIndex int) *ReaderWithBlockLevelAccessList { +func NewReaderWithBlockLevelAccessList(base Reader, accessList bal.BlockAccessList, txIndex int) *ReaderWithBlockLevelAccessList { return &ReaderWithBlockLevelAccessList{ Reader: base, - AccessList: accessList, + AccessList: bal.NewAccessListReader(accessList), TxIndex: txIndex, } } // Account implements Reader, returning the account with the specific address. -func (r *ReaderWithBlockLevelAccessList) Account(addr common.Address) (*types.StateAccount, error) { - panic("implement me") +func (r *ReaderWithBlockLevelAccessList) Account(addr common.Address) (acct *types.StateAccount, err error) { + acct, err = r.Reader.Account(addr) + if err != nil { + return nil, err + } + + mut := r.AccessList.AccountMutations(addr, r.TxIndex) + if mut == nil { + return + } + + if acct == nil { + acct = types.NewEmptyStateAccount() + } else { + // the account returned by the underlying reader is a reference + // copy it to avoid mutating the reader's instance + acct = acct.Copy() + } + + if mut.Balance != nil { + acct.Balance = mut.Balance + } + if mut.Code != nil { + codeHash := crypto.Keccak256Hash(mut.Code) + acct.CodeHash = codeHash[:] + } + if mut.Nonce != nil { + acct.Nonce = *mut.Nonce + } + return } // Storage implements Reader, returning the storage slot with the specific // address and slot key. func (r *ReaderWithBlockLevelAccessList) Storage(addr common.Address, slot common.Hash) (common.Hash, error) { - panic("implement me") + val := r.AccessList.Storage(addr, slot, r.TxIndex) + if val != nil { + return *val, nil + } + return r.Reader.Storage(addr, slot) } // Has implements Reader, returning the flag indicating whether the contract // code with specified address and hash exists or not. func (r *ReaderWithBlockLevelAccessList) Has(addr common.Address, codeHash common.Hash) bool { - panic("implement me") + mut := r.AccessList.AccountMutations(addr, r.TxIndex) + if mut != nil && mut.Code != nil { + return crypto.Keccak256Hash(mut.Code) == codeHash + } + return r.Reader.Has(addr, codeHash) } // Code implements Reader, returning the contract code with specified address // and hash. -func (r *ReaderWithBlockLevelAccessList) Code(addr common.Address, codeHash common.Hash) ([]byte, error) { - panic("implement me") +func (r *ReaderWithBlockLevelAccessList) Code(addr common.Address, codeHash common.Hash) []byte { + mut := r.AccessList.AccountMutations(addr, r.TxIndex) + if mut != nil && mut.Code != nil && crypto.Keccak256Hash(mut.Code) == codeHash { + // TODO: need to copy here? + return mut.Code + } + return r.Reader.Code(addr, codeHash) } // CodeSize implements Reader, returning the contract code size with specified // address and hash. -func (r *ReaderWithBlockLevelAccessList) CodeSize(addr common.Address, codeHash common.Hash) (int, error) { - panic("implement me") +func (r *ReaderWithBlockLevelAccessList) CodeSize(addr common.Address, codeHash common.Hash) int { + mut := r.AccessList.AccountMutations(addr, r.TxIndex) + if mut != nil && mut.Code != nil && crypto.Keccak256Hash(mut.Code) == codeHash { + return len(mut.Code) + } + return r.Reader.CodeSize(addr, codeHash) } diff --git a/core/state/statedb.go b/core/state/statedb.go index 15c175fd30..79f4f2b530 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -21,6 +21,7 @@ import ( "bytes" "errors" "fmt" + "iter" "maps" "slices" "sort" @@ -182,6 +183,13 @@ func New(root common.Hash, db Database) (*StateDB, error) { return NewWithReader(root, db, reader) } +// WithReader returns a copy of the statedb instance with the specified reader. +func (s *StateDB) WithReader(reader Reader) *StateDB { + cpy := s.Copy() + cpy.reader = reader + return cpy +} + // NewWithReader creates a new state for the specified state root. Unlike New, // this function accepts an additional Reader which is bound to the given root. func NewWithReader(root common.Hash, db Database, reader Reader) (*StateDB, error) { @@ -1122,12 +1130,15 @@ func (s *StateDB) clearJournalAndRefund() { // deleteStorage is designed to delete the storage trie of a designated account. func (s *StateDB) deleteStorage(addrHash common.Hash, root common.Hash) (map[common.Hash]common.Hash, map[common.Hash]common.Hash, *trienode.NodeSet, error) { + return deleteStorage(s.db, s.originalRoot, addrHash, root) +} +func deleteStorage(db Database, originalRoot common.Hash, addrHash common.Hash, root common.Hash) (map[common.Hash]common.Hash, map[common.Hash]common.Hash, *trienode.NodeSet, error) { var ( nodes = trienode.NewNodeSet(addrHash) // the set for trie node mutations (value is nil) storages = make(map[common.Hash]common.Hash) // the set for storage mutations (value is nil) storageOrigins = make(map[common.Hash]common.Hash) // the set for tracking the original value of slot ) - iteratee, err := s.db.Iteratee(s.originalRoot) + iteratee, err := db.Iteratee(originalRoot) if err != nil { return nil, nil, nil, err } @@ -1556,3 +1567,72 @@ func (s *StateDB) Witness() *stateless.Witness { func (s *StateDB) AccessEvents() *AccessEvents { return s.accessEvents } + +// handleDestruction processes all destruction markers and deletes the account +// and associated storage slots if necessary. There are four potential scenarios +// as following: +// +// (a) the account was not existent and be marked as destructed +// (b) the account was not existent and be marked as destructed, +// however, it's resurrected later in the same block. +// (c) the account was existent and be marked as destructed +// (d) the account was existent and be marked as destructed, +// however it's resurrected later in the same block. +// +// In case (a), nothing needs be deleted, nil to nil transition can be ignored. +// In case (b), nothing needs be deleted, nil is used as the original value for +// newly created account and storages +// In case (c), **original** account along with its storages should be deleted, +// with their values be tracked as original value. +// In case (d), **original** account along with its storages should be deleted, +// with their values be tracked as original value. +func handleDestruction(db Database, trie Trie, root common.Hash, noStorageWiping bool, destructions iter.Seq[common.Address], prestates map[common.Address]*types.StateAccount) (map[common.Hash]*AccountDelete, []*trienode.NodeSet, error) { + var ( + nodes []*trienode.NodeSet + deletes = make(map[common.Hash]*AccountDelete) + ) + for addr := range destructions { + prestate := prestates[addr] + // The account was non-existent, and it's marked as destructed in the scope + // of block. It can be either case (a) or (b) and will be interpreted as + // null->null state transition. + // - for (a), skip it without doing anything + // - for (b), the resurrected account with nil as original will be handled afterwards + if prestate == nil { + continue + } + // The account was existent, it can be either case (c) or (d). + addrHash := crypto.Keccak256Hash(addr.Bytes()) + op := &AccountDelete{ + Address: addr, + Origin: prestate, + } + deletes[addrHash] = op + + // Short circuit if the origin storage was empty. + if prestate.Root == types.EmptyRootHash || db.TrieDB().IsUBT() { + continue + } + if noStorageWiping { + return nil, nil, fmt.Errorf("unexpected storage wiping, %x", addr) + } + // Remove storage slots belonging to the account. + storages, storagesOrigin, set, err := deleteStorage(db, prestate.Root, addrHash, root) + if err != nil { + return nil, nil, fmt.Errorf("failed to delete storage, err: %w", err) + } + op.Storages = storages + op.StoragesOrigin = storagesOrigin + + // Aggregate the associated trie node changes. + nodes = append(nodes, set) + } + return deletes, nodes, nil +} + +// TODO: find better location for this +type Committer interface { + Commit(block uint64, deleteEmptyObjects bool, noStorageWiping bool) (common.Hash, error) + CommitWithUpdate(block uint64, deleteEmptyObjects bool, noStorageWiping bool) (common.Hash, *StateUpdate, error) + Preimages() map[common.Hash][]byte +} diff --git a/core/state_transition.go b/core/state_transition.go index b8ca6714fa..fdd4b7d663 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -327,7 +327,7 @@ func ApplyMessage(evm *vm.EVM, msg *Message, gp *GasPool) (*ExecutionResult, err return newStateTransition(evm, msg, gp).execute() } -// stateTransition represents a state transition. +// StateTransition represents a state transition. // // == The State Transitioning Model // @@ -349,7 +349,7 @@ func ApplyMessage(evm *vm.EVM, msg *Message, gp *GasPool) (*ExecutionResult, err // // 5. Run Script section // 6. Derive new state root -type stateTransition struct { +type StateTransition struct { gp *GasPool msg *Message initialBudget vm.GasBudget @@ -359,8 +359,8 @@ type stateTransition struct { } // newStateTransition initialises and returns a new state transition object. -func newStateTransition(evm *vm.EVM, msg *Message, gp *GasPool) *stateTransition { - return &stateTransition{ +func newStateTransition(evm *vm.EVM, msg *Message, gp *GasPool) *StateTransition { + return &StateTransition{ gp: gp, evm: evm, msg: msg, @@ -369,7 +369,7 @@ func newStateTransition(evm *vm.EVM, msg *Message, gp *GasPool) *stateTransition } // to returns the recipient of the message. -func (st *stateTransition) to() common.Address { +func (st *StateTransition) to() common.Address { if st.msg == nil || st.msg.To == nil /* contract creation */ { return common.Address{} } @@ -394,7 +394,7 @@ func (st *stateTransition) to() common.Address { // - Amsterdam+ (EIP-8037): two-dimensional budget. Regular gas is // capped at `MaxTxGas` (EIP-7825, 16_777_216); any excess from // `msg.GasLimit` above that cap becomes the state-gas reservoir. -func (st *stateTransition) buyGas() error { +func (st *StateTransition) buyGas() error { mgval := new(uint256.Int).SetUint64(st.msg.GasLimit) _, overflow := mgval.MulOverflow(mgval, st.msg.GasPrice) if overflow { @@ -482,7 +482,7 @@ func (st *stateTransition) buyGas() error { // // The SkipNonceChecks / SkipTransactionChecks / NoBaseFee flags bypass // subsets of these checks for simulation paths (eth_call, eth_estimateGas). -func (st *stateTransition) preCheck() error { +func (st *StateTransition) preCheck() error { // Only check transactions that are not fake msg := st.msg if !msg.SkipNonceChecks { @@ -581,7 +581,7 @@ func (st *stateTransition) preCheck() error { // reserveBlockGasBudget checks if the remaining gas budget in the block pool is // sufficient for including this transaction. -func (st *stateTransition) reserveBlockGasBudget(rules params.Rules, gasLimit uint64, intrinsicCost vm.GasCosts) error { +func (st *StateTransition) reserveBlockGasBudget(rules params.Rules, gasLimit uint64, intrinsicCost vm.GasCosts) error { var err error if rules.IsAmsterdam { // EIP-8037 per-tx 2D block-inclusion check. For each dimension, @@ -620,7 +620,7 @@ func (st *stateTransition) reserveBlockGasBudget(rules params.Rules, gasLimit ui // // If a consensus error is encountered, it is returned directly with a // nil EVM execution result. -func (st *stateTransition) execute() (*ExecutionResult, error) { +func (st *StateTransition) execute() (*ExecutionResult, error) { // The state-transition pipeline below runs in stages. Each stage may // abort with a consensus error before the EVM is invoked: // @@ -867,7 +867,7 @@ func (st *stateTransition) execute() (*ExecutionResult, error) { } // validateAuthorization validates an EIP-7702 authorization against the state. -func (st *stateTransition) validateAuthorization(auth *types.SetCodeAuthorization) (authority common.Address, err error) { +func (st *StateTransition) validateAuthorization(auth *types.SetCodeAuthorization) (authority common.Address, err error) { // Verify chain ID is null or equal to current chain ID. if !auth.ChainID.IsZero() && auth.ChainID.CmpBig(st.evm.ChainConfig().ChainID) != 0 { return authority, ErrAuthorizationWrongChainID @@ -904,7 +904,7 @@ func (st *stateTransition) validateAuthorization(auth *types.SetCodeAuthorizatio // // Invalid authorizations are silently skipped (their auth-base intrinsic // state gas remains charged, matching the pre-existing behavior). -func (st *stateTransition) applyAuthorizations(rules params.Rules, auths []types.SetCodeAuthorization) { +func (st *StateTransition) applyAuthorizations(rules params.Rules, auths []types.SetCodeAuthorization) { if len(auths) == 0 { return } @@ -938,7 +938,7 @@ func (st *stateTransition) applyAuthorizations(rules params.Rules, auths []types // authority "first creation" budget). The caller is expected to do an // end-of-loop pass over this set and refund any entry whose final delegation // state ended up empty. -func (st *stateTransition) applyAuthorization(rules params.Rules, auth *types.SetCodeAuthorization, authBilledCreations map[common.Address]struct{}) error { +func (st *StateTransition) applyAuthorization(rules params.Rules, auth *types.SetCodeAuthorization, authBilledCreations map[common.Address]struct{}) error { authority, err := st.validateAuthorization(auth) if err != nil { return err @@ -1004,7 +1004,7 @@ func (st *stateTransition) applyAuthorization(rules params.Rules, auth *types.Se } // calcRefund computes refund counter, capped to a refund quotient. -func (st *stateTransition) calcRefund() uint64 { +func (st *StateTransition) calcRefund() uint64 { var refund uint64 if !st.evm.ChainConfig().IsLondon(st.evm.Context.BlockNumber) { // Before EIP-3529: refunds were capped to gasUsed / 2 @@ -1026,7 +1026,7 @@ func (st *stateTransition) calcRefund() uint64 { } // returnGas returns ETH for remaining gas, exchanged at the original rate. -func (st *stateTransition) returnGas() uint64 { +func (st *StateTransition) returnGas() uint64 { gas := st.gasRemaining.RegularGas + st.gasRemaining.StateGas remaining := uint256.NewInt(gas) remaining.Mul(remaining, st.msg.GasPrice) @@ -1039,11 +1039,11 @@ func (st *stateTransition) returnGas() uint64 { } // gasUsed returns the amount of gas used up by the state transition. -func (st *stateTransition) gasUsed() uint64 { +func (st *StateTransition) gasUsed() uint64 { return st.gasRemaining.Used(st.initialBudget) } // blobGasUsed returns the amount of blob gas used by the message. -func (st *stateTransition) blobGasUsed() uint64 { +func (st *StateTransition) blobGasUsed() uint64 { return uint64(len(st.msg.BlobHashes) * params.BlobTxBlobGasPerBlob) } diff --git a/core/types.go b/core/types.go index edbfc43db3..32f22bc534 100644 --- a/core/types.go +++ b/core/types.go @@ -34,7 +34,7 @@ type Validator interface { ValidateBody(block *types.Block) error // ValidateState validates the given statedb and optionally the process result. - ValidateState(block *types.Block, state *state.StateDB, res *ProcessResult, stateless bool) error + ValidateState(block *types.Block, state StateRootSource, res *ProcessResult, stateless bool) error } // Prefetcher is an interface for pre-caching transaction signatures and state. @@ -63,4 +63,6 @@ type ProcessResult struct { // BAL is only meaningful for post-Amsterdam blocks. Please ensure // fork validation is performed before accessing it. Bal *bal.ConstructionBlockAccessList + + Error error } diff --git a/core/types/bal/bal.go b/core/types/bal/bal.go index 2eb5fe93cd..9845916765 100644 --- a/core/types/bal/bal.go +++ b/core/types/bal/bal.go @@ -18,6 +18,7 @@ package bal import ( "bytes" + "encoding/json" "maps" "github.com/ethereum/go-ethereum/common" @@ -223,3 +224,137 @@ func (b *ConstructionBlockAccessList) Copy() *ConstructionBlockAccessList { } return res } + +type StorageMutations map[common.Hash]common.Hash + +// AccountMutations contains mutations that were made to an account across +// one or more access list indices. +type AccountMutations struct { + Balance *uint256.Int `json:"Balance,omitempty"` + Nonce *uint64 `json:"Nonce,omitempty"` + Code []byte `json:"Code,omitempty"` + StorageWrites StorageMutations `json:"StorageWrites,omitempty"` +} + +// String returns a human-readable JSON representation of the account mutations. +func (a *AccountMutations) String() string { + var res bytes.Buffer + enc := json.NewEncoder(&res) + enc.SetIndent("", " ") + enc.Encode(a) + return res.String() +} + +// Copy returns a deep-copy of the instance. +func (a *AccountMutations) Copy() *AccountMutations { + res := &AccountMutations{ + nil, + nil, + nil, + nil, + } + if a.Nonce != nil { + res.Nonce = new(uint64) + *res.Nonce = *a.Nonce + } + if a.Code != nil { + res.Code = bytes.Clone(a.Code) + } + if a.Balance != nil { + res.Balance = new(uint256.Int).Set(a.Balance) + } + if a.StorageWrites != nil { + res.StorageWrites = maps.Clone(a.StorageWrites) + } + return res +} + +// Eq returns whether the calling instance is equal to the provided one. +func (a *AccountMutations) Eq(other *AccountMutations) bool { + if a.Balance != nil || other.Balance != nil { + if a.Balance == nil || other.Balance == nil { + return false + } + + if !a.Balance.Eq(other.Balance) { + return false + } + } + + if (len(a.Code) != 0 || len(other.Code) != 0) && !bytes.Equal(a.Code, other.Code) { + return false + } + + if a.Nonce != nil || other.Nonce != nil { + if a.Nonce == nil || other.Nonce == nil { + return false + } + + if *a.Nonce != *other.Nonce { + return false + } + } + + if a.StorageWrites != nil || other.StorageWrites != nil { + if !maps.Equal(a.StorageWrites, other.StorageWrites) { + return false + } + } + return true +} + +type BALExecutionMode int + +const ( + BALExecutionOptimized BALExecutionMode = iota + BALExecutionNoBatchIO + BALExecutionSequential +) + +// WrittenCounts groups per-block aggregate write counts derived from the BAL. +type WrittenCounts struct { + Accounts int + StorageSlots int + Codes int + CodeBytes int +} + +// WrittenCounts walks the BAL once and returns the aggregate write counts. +func (e BlockAccessList) WrittenCounts() WrittenCounts { + var w WrittenCounts + for i := range e { + a := &e[i] + if len(a.StorageChanges) > 0 || len(a.BalanceChanges) > 0 || + len(a.NonceChanges) > 0 || len(a.CodeChanges) > 0 { + w.Accounts++ + } + w.StorageSlots += len(a.StorageChanges) + if n := len(a.CodeChanges); n > 0 { + w.Codes++ + w.CodeBytes += len(a.CodeChanges[n-1].NewCode) + } + } + return w +} + +type StateMutations map[common.Address]AccountMutations + +type StorageKeySet map[common.Hash]struct{} +type StateAccesses map[common.Address]StorageKeySet + +func (s StateAccesses) Eq(other StateAccesses) bool { + if len(s) != len(other) { + return false + } + + for addr, set := range s { + otherSet, ok := other[addr] + if !ok { + return false + } + if !maps.Equal(set, otherSet) { + return false + } + } + return true +} diff --git a/core/types/bal/bal_encoding.go b/core/types/bal/bal_encoding.go index 612d2f8777..26d5aaf7d7 100644 --- a/core/types/bal/bal_encoding.go +++ b/core/types/bal/bal_encoding.go @@ -395,7 +395,7 @@ func (a *ConstructionAccountAccess) toEncodingObj(addr common.Address) AccountAc obj.SlotChanges = make([]encodingStorageWrite, 0, len(slotWrites)) indices := slices.Collect(maps.Keys(slotWrites)) - slices.SortFunc(indices, cmp.Compare) + slices.Sort(indices) for _, index := range indices { val := slotWrites[index] obj.SlotChanges = append(obj.SlotChanges, encodingStorageWrite{ @@ -415,7 +415,7 @@ func (a *ConstructionAccountAccess) toEncodingObj(addr common.Address) AccountAc // Convert balance changes balanceIndices := slices.Collect(maps.Keys(a.BalanceChanges)) - slices.SortFunc(balanceIndices, cmp.Compare) + slices.Sort(balanceIndices) for _, idx := range balanceIndices { res.BalanceChanges = append(res.BalanceChanges, encodingBalanceChange{ BlockAccessIndex: idx, @@ -425,7 +425,7 @@ func (a *ConstructionAccountAccess) toEncodingObj(addr common.Address) AccountAc // Convert nonce changes nonceIndices := slices.Collect(maps.Keys(a.NonceChanges)) - slices.SortFunc(nonceIndices, cmp.Compare) + slices.Sort(nonceIndices) for _, idx := range nonceIndices { res.NonceChanges = append(res.NonceChanges, encodingAccountNonce{ BlockAccessIndex: idx, @@ -435,7 +435,7 @@ func (a *ConstructionAccountAccess) toEncodingObj(addr common.Address) AccountAc // Convert code change codeIndices := slices.Collect(maps.Keys(a.CodeChange)) - slices.SortFunc(codeIndices, cmp.Compare) + slices.Sort(codeIndices) for _, idx := range codeIndices { res.CodeChanges = append(res.CodeChanges, encodingCodeChange{ BlockAccessIndex: idx, diff --git a/core/types/bal/bal_reader.go b/core/types/bal/bal_reader.go new file mode 100644 index 0000000000..e6cffc922e --- /dev/null +++ b/core/types/bal/bal_reader.go @@ -0,0 +1,114 @@ +package bal + +import ( + "bytes" + "github.com/ethereum/go-ethereum/common" +) + +// AccessListReader exposes utilities to read state mutations and accesses from an access list +type AccessListReader map[common.Address]*AccountAccess + +func NewAccessListReader(bal BlockAccessList) (reader AccessListReader) { + reader = make(AccessListReader) + for _, accountAccess := range bal { + reader[accountAccess.Address] = &accountAccess + } + return +} + +// AccountMutations returns the aggregate mutation for an account up until (and not including) the given block access +// list index. +func (a AccessListReader) AccountMutations(addr common.Address, idx int) (res *AccountMutations) { + diff, exist := a[addr] + if !exist { + return nil + } + + res = &AccountMutations{} + + for i := 0; i < len(diff.BalanceChanges) && diff.BalanceChanges[i].BlockAccessIndex < uint32(idx); i++ { + res.Balance = diff.BalanceChanges[i].PostBalance.Clone() + } + + for i := 0; i < len(diff.CodeChanges) && diff.CodeChanges[i].BlockAccessIndex < uint32(idx); i++ { + res.Code = bytes.Clone(diff.CodeChanges[i].NewCode) + } + + for i := 0; i < len(diff.NonceChanges) && diff.NonceChanges[i].BlockAccessIndex < uint32(idx); i++ { + res.Nonce = new(uint64) + *res.Nonce = diff.NonceChanges[i].PostNonce + } + + if len(diff.StorageChanges) > 0 { + res.StorageWrites = make(map[common.Hash]common.Hash) + for _, slotWrites := range diff.StorageChanges { + for i := 0; i < len(slotWrites.SlotChanges) && slotWrites.SlotChanges[i].BlockAccessIndex < uint32(idx); i++ { + res.StorageWrites[slotWrites.Slot.Bytes32()] = slotWrites.SlotChanges[i].PostValue.Bytes32() + } + } + } + + if res.Code == nil && res.Nonce == nil && len(res.StorageWrites) == 0 && res.Balance == nil { + return nil + } + return res +} + +type StorageKeys map[common.Address][]common.Hash + +// StorageKeys returns the set of accounts and storage keys mutated in the access list. +// If reads is set, the un-mutated accounts/keys are included in the result. +func (a AccessListReader) StorageKeys(reads bool) (keys StorageKeys) { + keys = make(StorageKeys) + for addr, acct := range a { + for _, storageChange := range acct.StorageChanges { + keys[addr] = append(keys[addr], storageChange.Slot.Bytes32()) + } + if !(reads && len(acct.StorageReads) > 0) { + continue + } + for _, storageRead := range acct.StorageReads { + keys[addr] = append(keys[addr], storageRead.Bytes32()) + } + } + return +} + +// Storage returns the value of a storage key at the start of executing an index. +// If the slot has no mutations in the access list, it returns nil. +func (a AccessListReader) Storage(addr common.Address, key common.Hash, idx int) (val *common.Hash) { + storageMuts := a.AccountMutations(addr, idx) + if storageMuts != nil { + res, ok := storageMuts.StorageWrites[key] + if ok { + return &res + } + } + return nil +} + +// Mutations returns the aggregate state mutations from bal indices [0, idx) +func (a AccessListReader) Mutations(idx int) *StateMutations { + res := make(StateMutations) + for addr := range a { + if mut := a.AccountMutations(addr, idx); mut != nil { + res[addr] = *mut + } + } + return &res +} + +// AllDestructions returns all accounts that experienced a destruction, regardless of whether +// they were later resurrected and exist after the block. It excludes ephemeral contracts from +// the result. +func (a AccessListReader) AllDestructions() (res []common.Address) { + for addr, access := range a { + for _, nonce := range access.NonceChanges { + if nonce.PostNonce == 0 { + res = append(res, addr) + break + } + } + } + return res +} diff --git a/eth/backend.go b/eth/backend.go index af8b04bda6..fe80d113cc 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -280,6 +280,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } options.Overrides = &overrides + options.BALExecutionMode = config.BALExecutionMode + options.BlockingPrefetch = config.BlockingPrefetch + options.PrefetchWorkers = int(config.PrefetchWorkers) + eth.blockchain, err = core.NewBlockChain(chainDb, config.Genesis, eth.engine, options) if err != nil { return nil, err diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index b51b78e199..2a7e66de11 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -19,6 +19,7 @@ package ethconfig import ( "errors" + "github.com/ethereum/go-ethereum/core/types/bal" "time" "github.com/ethereum/go-ethereum/common" @@ -216,6 +217,10 @@ type Config struct { // RangeLimit restricts the maximum range (end - start) for range queries. RangeLimit uint64 `toml:",omitempty"` + + BALExecutionMode bal.BALExecutionMode + PrefetchWorkers uint + BlockingPrefetch bool } // CreateConsensusEngine creates a consensus engine for the given chain config. diff --git a/tests/block_test.go b/tests/block_test.go index 31bb4f9d36..d309d81670 100644 --- a/tests/block_test.go +++ b/tests/block_test.go @@ -127,7 +127,7 @@ func execBlockTest(t *testing.T, bt *testMatcher, test *BlockTest) { } for _, snapshot := range snapshotConf { for _, dbscheme := range dbschemeConf { - if err := bt.checkFailure(t, test.Run(snapshot, dbscheme, true, nil, nil)); err != nil { + if err := bt.checkFailure(t, test.Run(snapshot, dbscheme, true, true, nil, nil)); err != nil { t.Errorf("test with config {snapshotter:%v, scheme:%v} failed: %v", snapshot, dbscheme, err) return } diff --git a/tests/block_test_util.go b/tests/block_test_util.go index bece8ae610..ab0c908470 100644 --- a/tests/block_test_util.go +++ b/tests/block_test_util.go @@ -113,27 +113,20 @@ type btHeaderMarshaling struct { SlotNumber *math.HexOrDecimal64 } -func (t *BlockTest) Run(snapshotter bool, scheme string, witness bool, tracer *tracing.Hooks, postCheck func(error, *core.BlockChain)) (result error) { - config, ok := Forks[t.json.Network] - if !ok { - return UnsupportedForkError{t.json.Network} - } - +func (t *BlockTest) createTestBlockChain(config *params.ChainConfig, snapshotter bool, scheme string, witness, createAndVerifyBAL bool, tracer *tracing.Hooks) (*core.BlockChain, error) { // import pre accounts & construct test genesis block & state root - // Commit genesis state var ( - gspec = t.genesis(config) db = rawdb.NewMemoryDatabase() tconf = &triedb.Config{ Preimages: true, - IsUBT: gspec.Config.UBTTime != nil && *gspec.Config.UBTTime <= gspec.Timestamp, } ) - if scheme == rawdb.PathScheme || tconf.IsUBT { + if scheme == rawdb.PathScheme { tconf.PathDB = pathdb.Defaults } else { tconf.HashDB = hashdb.Defaults } + gspec := t.genesis(config) // if ttd is not specified, set an arbitrary huge value if gspec.Config.TerminalTotalDifficulty == nil { @@ -142,15 +135,15 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, witness bool, tracer *t triedb := triedb.NewDatabase(db, tconf) gblock, err := gspec.Commit(db, triedb, nil) if err != nil { - return err + return nil, err } triedb.Close() // close the db to prevent memory leak if gblock.Hash() != t.json.Genesis.Hash { - return fmt.Errorf("genesis block hash doesn't match test: computed=%x, test=%x", gblock.Hash().Bytes()[:6], t.json.Genesis.Hash[:6]) + return nil, fmt.Errorf("genesis block hash doesn't match test: computed=%x, test=%x", gblock.Hash().Bytes()[:6], t.json.Genesis.Hash[:6]) } if gblock.Root() != t.json.Genesis.StateRoot { - return fmt.Errorf("genesis block state root does not match test: computed=%x, test=%x", gblock.Root().Bytes()[:6], t.json.Genesis.StateRoot[:6]) + return nil, fmt.Errorf("genesis block state root does not match test: computed=%x, test=%x", gblock.Root().Bytes()[:6], t.json.Genesis.StateRoot[:6]) } // Wrap the original engine within the beacon-engine engine := beacon.New(ethash.NewFaker()) @@ -164,12 +157,28 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, witness bool, tracer *t Tracer: tracer, }, StatelessSelfValidation: witness, + NoPrefetch: true, + BlockingPrefetch: true, + PrefetchWorkers: 100, // note: this is totally unrelated to NoPrefetch, just for BAL execution } if snapshotter { options.SnapshotLimit = 1 options.SnapshotWait = true } chain, err := core.NewBlockChain(db, gspec, engine, options) + if err != nil { + return nil, err + } + return chain, nil +} + +func (t *BlockTest) Run(snapshotter bool, scheme string, witness, createAndVerifyBAL bool, tracer *tracing.Hooks, postCheck func(error, *core.BlockChain)) (result error) { + config, ok := Forks[t.json.Network] + if !ok { + return UnsupportedForkError{t.json.Network} + } + + chain, err := t.createTestBlockChain(config, snapshotter, scheme, witness, createAndVerifyBAL, tracer) if err != nil { return err } @@ -203,7 +212,50 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, witness bool, tracer *t } } } - return t.validateImportedHeaders(chain, validBlocks) + err = t.validateImportedHeaders(chain, validBlocks) + if err != nil { + return err + } + + if createAndVerifyBAL { + newChain, _ := t.createTestBlockChain(config, snapshotter, scheme, witness, createAndVerifyBAL, tracer) + defer newChain.Stop() + + var blocksWithBAL types.Blocks + for i := uint64(1); i <= chain.CurrentBlock().Number.Uint64(); i++ { + block := chain.GetBlockByNumber(i) + if chain.Config().IsAmsterdam(block.Number(), block.Time()) && block.AccessList() == nil { + return fmt.Errorf("block %d missing BAL", block.NumberU64()) + } + blocksWithBAL = append(blocksWithBAL, block) + } + + amt, err := newChain.InsertChain(blocksWithBAL) + if err != nil { + return err + } + _ = amt + newDB, err := newChain.State() + if err != nil { + return err + } + if err = t.validatePostState(newDB); err != nil { + return fmt.Errorf("post state validation failed: %v", err) + } + // Cross-check the snapshot-to-hash against the trie hash + if snapshotter { + if newChain.Snapshots() != nil { + if err := chain.Snapshots().Verify(chain.CurrentBlock().Root); err != nil { + return err + } + } + } + err = t.validateImportedHeaders(newChain, validBlocks) + if err != nil { + return err + } + } + return nil } // Network returns the network/fork name for this test. diff --git a/trie/bintrie/trie.go b/trie/bintrie/trie.go index e3436e3df1..3d236174d4 100644 --- a/trie/bintrie/trie.go +++ b/trie/bintrie/trie.go @@ -419,3 +419,11 @@ func (t *BinaryTrie) PrefetchStorage(addr common.Address, keys [][]byte) error { func (t *BinaryTrie) Witness() map[string][]byte { return t.tracer.Values() } + +func (t *BinaryTrie) UpdateStorageBatch(_ common.Address, keys [][]byte, values [][]byte) error { + panic("not implemented") +} + +func (t *BinaryTrie) UpdateAccountBatch(addresses []common.Address, accounts []*types.StateAccount, _ []int) error { + panic("not implemented") +} diff --git a/trie/secure_trie.go b/trie/secure_trie.go index 4d03ca45f0..f2176310d0 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -210,6 +210,29 @@ func (t *StateTrie) UpdateStorage(_ common.Address, key, value []byte) error { return nil } +// UpdateStorageBatch attempts to update a list storages in the batch manner. +func (t *StateTrie) UpdateStorageBatch(_ common.Address, keys [][]byte, values [][]byte) error { + var ( + hkeys = make([][]byte, 0, len(keys)) + evals = make([][]byte, 0, len(values)) + ) + for _, key := range keys { + hk := crypto.Keccak256(key) + if t.preimages != nil { + t.secKeyCache[common.Hash(hk)] = key + } + hkeys = append(hkeys, hk) + } + for _, val := range values { + data, err := rlp.EncodeToBytes(val) + if err != nil { + return err + } + evals = append(evals, data) + } + return t.trie.UpdateBatch(hkeys, evals) +} + // UpdateAccount will abstract the write of an account to the secure trie. func (t *StateTrie) UpdateAccount(address common.Address, acc *types.StateAccount, _ int) error { hk := crypto.Keccak256(address.Bytes()) @@ -226,6 +249,29 @@ func (t *StateTrie) UpdateAccount(address common.Address, acc *types.StateAccoun return nil } +// UpdateAccountBatch attempts to update a list accounts in the batch manner. +func (t *StateTrie) UpdateAccountBatch(addresses []common.Address, accounts []*types.StateAccount, _ []int) error { + var ( + hkeys = make([][]byte, 0, len(addresses)) + values = make([][]byte, 0, len(accounts)) + ) + for _, addr := range addresses { + hk := crypto.Keccak256(addr.Bytes()) + if t.preimages != nil { + t.secKeyCache[common.Hash(hk)] = addr.Bytes() + } + hkeys = append(hkeys, hk) + } + for _, acc := range accounts { + data, err := rlp.EncodeToBytes(acc) + if err != nil { + return err + } + values = append(values, data) + } + return t.trie.UpdateBatch(hkeys, values) +} + func (t *StateTrie) UpdateContractCode(_ common.Address, _ common.Hash, _ []byte) error { return nil } diff --git a/trie/tracer.go b/trie/tracer.go index 04122d1384..042fa468bf 100644 --- a/trie/tracer.go +++ b/trie/tracer.go @@ -33,12 +33,10 @@ import ( // while the latter is inserted/deleted in order to follow the rule of trie. // This tool can track all of them no matter the node is embedded in its // parent or not, but valueNode is never tracked. -// -// Note opTracer is not thread-safe, callers should be responsible for handling -// the concurrency issues by themselves. type opTracer struct { inserts map[string]struct{} deletes map[string]struct{} + lock sync.RWMutex } // newOpTracer initializes the tracer for capturing trie changes. @@ -53,6 +51,9 @@ func newOpTracer() *opTracer { // in the deletion set (resurrected node), then just wipe it from // the deletion set as it's "untouched". func (t *opTracer) onInsert(path []byte) { + t.lock.Lock() + defer t.lock.Unlock() + if _, present := t.deletes[string(path)]; present { delete(t.deletes, string(path)) return @@ -64,6 +65,9 @@ func (t *opTracer) onInsert(path []byte) { // in the addition set, then just wipe it from the addition set // as it's untouched. func (t *opTracer) onDelete(path []byte) { + t.lock.Lock() + defer t.lock.Unlock() + if _, present := t.inserts[string(path)]; present { delete(t.inserts, string(path)) return @@ -73,12 +77,18 @@ func (t *opTracer) onDelete(path []byte) { // reset clears the content tracked by tracer. func (t *opTracer) reset() { + t.lock.Lock() + defer t.lock.Unlock() + clear(t.inserts) clear(t.deletes) } // copy returns a deep copied tracer instance. func (t *opTracer) copy() *opTracer { + t.lock.RLock() + defer t.lock.RUnlock() + return &opTracer{ inserts: maps.Clone(t.inserts), deletes: maps.Clone(t.deletes), @@ -87,6 +97,9 @@ func (t *opTracer) copy() *opTracer { // deletedList returns a list of node paths which are deleted from the trie. func (t *opTracer) deletedList() [][]byte { + t.lock.RLock() + defer t.lock.RUnlock() + paths := make([][]byte, 0, len(t.deletes)) for path := range t.deletes { paths = append(paths, []byte(path)) diff --git a/trie/transitiontrie/transition.go b/trie/transitiontrie/transition.go index 3e5511be9e..d939e804e3 100644 --- a/trie/transitiontrie/transition.go +++ b/trie/transitiontrie/transition.go @@ -144,6 +144,19 @@ func (t *TransitionTrie) UpdateStorage(address common.Address, key []byte, value return t.overlay.UpdateStorage(address, key, v) } +// UpdateStorageBatch attempts to update a list storages in the batch manner. +func (t *TransitionTrie) UpdateStorageBatch(address common.Address, keys [][]byte, values [][]byte) error { + if len(keys) != len(values) { + return fmt.Errorf("keys and values length mismatch: %d != %d", len(keys), len(values)) + } + for i, key := range keys { + if err := t.UpdateStorage(address, key, values[i]); err != nil { + return err + } + } + return nil +} + // UpdateAccount abstract an account write to the trie. func (t *TransitionTrie) UpdateAccount(addr common.Address, account *types.StateAccount, codeLen int) error { // NOTE: before the rebase, this was saving the state root, so that OpenStorageTrie @@ -152,6 +165,22 @@ func (t *TransitionTrie) UpdateAccount(addr common.Address, account *types.State return t.overlay.UpdateAccount(addr, account, codeLen) } +// UpdateAccountBatch attempts to update a list accounts in the batch manner. +func (t *TransitionTrie) UpdateAccountBatch(addresses []common.Address, accounts []*types.StateAccount, codeLens []int) error { + if len(addresses) != len(accounts) { + return fmt.Errorf("address and accounts length mismatch: %d != %d", len(addresses), len(accounts)) + } + if len(addresses) != len(codeLens) { + return fmt.Errorf("address and code length mismatch: %d != %d", len(addresses), len(codeLens)) + } + for i, addr := range addresses { + if err := t.UpdateAccount(addr, accounts[i], codeLens[i]); err != nil { + return err + } + } + return nil +} + // DeleteStorage removes any existing value for key from the trie. If a node was not // found in the database, a trie.MissingNodeError is returned. func (t *TransitionTrie) DeleteStorage(addr common.Address, key []byte) error { diff --git a/trie/trie.go b/trie/trie.go index 1ef2c2f1a6..7e69a90823 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -480,6 +480,69 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error } } +// UpdateBatch updates a batch of entries concurrently. +func (t *Trie) UpdateBatch(keys [][]byte, values [][]byte) error { + // Short circuit if the trie is already committed and unusable. + if t.committed { + return ErrCommitted + } + if len(keys) != len(values) { + return fmt.Errorf("keys and values length mismatch: %d != %d", len(keys), len(values)) + } + // Insert the entries sequentially if there are not too many + // trie nodes in the trie. + fn, ok := t.root.(*fullNode) + if !ok || len(keys) < 4 { // TODO(rjl493456442) the parallelism threshold should be twisted + for i, key := range keys { + err := t.Update(key, values[i]) + if err != nil { + return err + } + } + return nil + } + var ( + ikeys = make(map[byte][][]byte) + ivals = make(map[byte][][]byte) + eg errgroup.Group + ) + for i, key := range keys { + hkey := keybytesToHex(key) + ikeys[hkey[0]] = append(ikeys[hkey[0]], hkey) + ivals[hkey[0]] = append(ivals[hkey[0]], values[i]) + } + if len(keys) > 0 { + fn.flags = t.newFlag() + } + for pos, ks := range ikeys { + eg.Go(func() error { + vs := ivals[pos] + for i, k := range ks { + if len(vs[i]) != 0 { + _, n, err := t.insert(fn.Children[pos], []byte{pos}, k[1:], valueNode(vs[i])) + if err != nil { + return err + } + fn.Children[pos] = n + } else { + _, n, err := t.delete(fn.Children[pos], []byte{pos}, k[1:]) + if err != nil { + return err + } + fn.Children[pos] = n + } + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return err + } + t.unhashed += len(keys) + t.uncommitted += len(keys) + return nil +} + // MustDelete is a wrapper of Delete and will omit any encountered error but // just print out an error message. func (t *Trie) MustDelete(key []byte) { diff --git a/trie/trie_test.go b/trie/trie_test.go index 3661933e22..949f381f07 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -1580,3 +1580,57 @@ func BenchmarkTrieSeqPrefetch(b *testing.B) { } } } + +func TestUpdateBatch(t *testing.T) { + testUpdateBatch(t, []kv{ + {k: []byte("do"), v: []byte("verb")}, + {k: []byte("ether"), v: []byte("wookiedoo")}, + {k: []byte("horse"), v: []byte("stallion")}, + {k: []byte("shaman"), v: []byte("horse")}, + {k: []byte("doge"), v: []byte("coin")}, + {k: []byte("dog"), v: []byte("puppy")}, + }) + + var entries []kv + for i := 0; i < 256; i++ { + entries = append(entries, kv{k: testrand.Bytes(32), v: testrand.Bytes(32)}) + } + testUpdateBatch(t, entries) +} + +func testUpdateBatch(t *testing.T, entries []kv) { + var ( + base = NewEmpty(nil) + keys [][]byte + vals [][]byte + ) + for _, entry := range entries { + base.Update(entry.k, entry.v) + keys = append(keys, entry.k) + vals = append(vals, entry.v) + } + for i := 0; i < 10; i++ { + k, v := testrand.Bytes(32), testrand.Bytes(32) + base.Update(k, v) + keys = append(keys, k) + vals = append(vals, v) + } + + cmp := NewEmpty(nil) + if err := cmp.UpdateBatch(keys, vals); err != nil { + t.Fatalf("Failed to update batch, %v", err) + } + + // Traverse the original tree, the changes made on the copy one shouldn't + // affect the old one + for _, key := range keys { + v1, _ := base.Get(key) + v2, _ := cmp.Get(key) + if !bytes.Equal(v1, v2) { + t.Errorf("Unexpected data, key: %v, want: %v, got: %v", key, v1, v2) + } + } + if base.Hash() != cmp.Hash() { + t.Errorf("Hash mismatch: want %x, got %x", base.Hash(), cmp.Hash()) + } +}