From 2721e8a1a2e764b10fc2f07829d081d9302ef64e Mon Sep 17 00:00:00 2001 From: Jared Wasinger Date: Sun, 2 Nov 2025 17:29:06 +0800 Subject: [PATCH] attempt to optimize state root calculation for BALs --- core/block_validator.go | 12 +- core/blockchain.go | 284 ++++++++++------ core/parallel_state_processor.go | 55 ++- core/state/bal_reader.go | 19 +- core/state/bal_state_transition.go | 522 +++++++++++++++++++++++++++++ core/state/state_object.go | 2 + core/state/statedb.go | 152 ++++++--- core/stateless.go | 2 +- core/types.go | 2 +- triedb/database.go | 1 - 10 files changed, 828 insertions(+), 223 deletions(-) create mode 100644 core/state/bal_state_transition.go diff --git a/core/block_validator.go b/core/block_validator.go index 4b6284a002..3bf4f428f7 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -147,7 +147,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { // ValidateState validates the various changes that happen after a state transition, // such as amount of used gas, the receipt roots and the state root itself. -func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, res *ProcessResult, validateStateRoot, stateless bool) error { +func (v *BlockValidator) ValidateState(block *types.Block, stateTransition state.BlockStateTransition, res *ProcessResult, stateless bool) error { if res == nil { return errors.New("nil ProcessResult value") } @@ -185,12 +185,10 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD return errors.New("block has requests before prague fork") } - if validateStateRoot { - // Validate the state root against the received state root and throw - // an error if they don't match. - if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root { - return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error()) - } + // Validate the state root against the received state root and throw + // an error if they don't match. + if root := stateTransition.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root { + return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, stateTransition.Error()) } return nil } diff --git a/core/blockchain.go b/core/blockchain.go index 7f41119fab..37c901504d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -99,10 +99,18 @@ var ( blockWriteTimer = metrics.NewRegisteredResettingTimer("chain/write", nil) // BAL-specific timers - blockPreprocessingTimer = metrics.NewRegisteredResettingTimer("chain/preprocess", nil) - blockPrestateLoadTimer = metrics.NewRegisteredResettingTimer("chain/prestateload", nil) - txExecutionTimer = metrics.NewRegisteredResettingTimer("chain/txexecution", nil) - stateRootCalctimer = metrics.NewRegisteredResettingTimer("chain/rootcalculation", nil) + blockPreprocessingTimer = metrics.NewRegisteredResettingTimer("chain/preprocess", nil) + txExecutionTimer = metrics.NewRegisteredResettingTimer("chain/txexecution", nil) + + stateTrieHashTimer = metrics.NewRegisteredResettingTimer("chain/statetriehash", nil) + accountTriesUpdateTimer = metrics.NewRegisteredResettingTimer("chain/accounttriesupdate", nil) + stateTriePrefetchTimer = metrics.NewRegisteredResettingTimer("chain/statetrieprefetch", nil) + stateTrieUpdateTimer = metrics.NewRegisteredResettingTimer("chain/statetrieupdate", nil) + originStorageLoadTimer = metrics.NewRegisteredResettingTimer("chain/originstorageload", nil) + + stateRootComputeTimer = metrics.NewRegisteredResettingTimer("chain/staterootcompute", nil) + stateCommitTimer = metrics.NewRegisteredResettingTimer("chain/statetriecommit", nil) + blockPostprocessingTimer = metrics.NewRegisteredResettingTimer("chain/postprocess", nil) blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil) @@ -1601,7 +1609,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error { // writeBlockWithState writes block, metadata and corresponding state data to the // database. -func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, statedb *state.StateDB) error { +func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, transition state.BlockStateTransition) error { if !bc.HasHeader(block.ParentHash(), block.NumberU64()-1) { return consensus.ErrUnknownAncestor } @@ -1612,12 +1620,13 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. blockBatch := bc.db.NewBatch() rawdb.WriteBlock(blockBatch, block) rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) - rawdb.WritePreimages(blockBatch, statedb.Preimages()) + // TODO: consider moving Preimages out of the interface definition + rawdb.WritePreimages(blockBatch, transition.Preimages()) if err := blockBatch.Write(); err != nil { log.Crit("Failed to write block into disk", "err", err) } // Commit all cached state changes into underlying memory database. - root, stateUpdate, err := statedb.CommitWithUpdate(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()), bc.chainConfig.IsCancun(block.Number(), block.Time())) + root, stateUpdate, err := transition.CommitWithUpdate(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()), bc.chainConfig.IsCancun(block.Number(), block.Time())) if err != nil { return err } @@ -1687,7 +1696,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead. // This function expects the chain mutex to be held. -func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { +func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state state.BlockStateTransition, emitHeadEvent bool) (status WriteStatus, err error) { if err := bc.writeBlockWithState(block, receipts, state); err != nil { return NonStatTy, err } @@ -2000,9 +2009,108 @@ func (bpr *blockProcessingResult) Witness() *stateless.Witness { return bpr.witness } +func (bc *BlockChain) processBlockWithAccessList(parentRoot common.Hash, block *types.Block, setHead bool) (procRes *blockProcessingResult, blockEndErr error) { + var ( + startTime = time.Now() + procTime time.Duration + ) + + reader, err := bc.statedb.Reader(parentRoot) + if err != nil { + return nil, err + } + + accessList := state.NewBALReader(block, reader) + stateTransition, err := state.NewBALStateTransition(accessList, bc.statedb, parentRoot) + if err != nil { + return nil, err + } + statedb, err := state.New(parentRoot, bc.statedb) + if err != nil { + return nil, err + } + + statedb.SetBlockAccessList(accessList) + + if bc.logger != nil && bc.logger.OnBlockStart != nil { + bc.logger.OnBlockStart(tracing.BlockEvent{ + Block: block, + Finalized: bc.CurrentFinalBlock(), + Safe: bc.CurrentSafeBlock(), + }) + } + if bc.logger != nil && bc.logger.OnBlockEnd != nil { + defer func() { + bc.logger.OnBlockEnd(blockEndErr) + }() + } + + res, err := bc.parallelProcessor.Process(block, stateTransition, statedb, bc.cfg.VmConfig) + if err != nil { + return nil, err + } + + if err := bc.validator.ValidateState(block, stateTransition, res.ProcessResult, false); err != nil { + return nil, err + } + + procTime = time.Since(startTime) + + // Write the block to the chain and get the status. + var ( + wstart = time.Now() + status WriteStatus + ) + if !setHead { + // Don't set the head, only insert the block + err = bc.writeBlockWithState(block, res.ProcessResult.Receipts, stateTransition) + } else { + status, err = bc.writeBlockAndSetHead(block, res.ProcessResult.Receipts, res.ProcessResult.Logs, stateTransition, false) + } + if err != nil { + return nil, err + } + + // Update the metrics touched during block commit + accountCommitTimer.Update(stateTransition.Metrics().AccountCommits) // Account commits are complete, we can mark them + storageCommitTimer.Update(stateTransition.Metrics().StorageCommits) // Storage commits are complete, we can mark them + snapshotCommitTimer.Update(stateTransition.Metrics().SnapshotCommits) // Snapshot commits are complete, we can mark them + triedbCommitTimer.Update(stateTransition.Metrics().TrieDBCommits) // Trie database commits are complete, we can mark them + + blockWriteTimer.Update(time.Since(wstart) - max(stateTransition.Metrics().AccountCommits, stateTransition.Metrics().StorageCommits) /* concurrent */ - statedb.SnapshotCommits - statedb.TrieDBCommits) + elapsed := time.Since(startTime) + 1 // prevent zero division + blockInsertTimer.Update(elapsed) + + // TODO(rjl493456442) generalize the ResettingTimer + mgasps := float64(res.ProcessResult.GasUsed) * 1000 / float64(elapsed) + chainMgaspsMeter.Update(time.Duration(mgasps)) + + blockPreprocessingTimer.Update(res.PreProcessTime) + txExecutionTimer.Update(res.ExecTime) + + accountTriesUpdateTimer.Update(res.StateTransitionMetrics.AccountUpdate) + stateTrieUpdateTimer.Update(res.StateTransitionMetrics.StateUpdate) + stateTrieHashTimer.Update(res.StateTransitionMetrics.StateHash) + stateRootComputeTimer.Update(res.StateTransitionMetrics.AccountUpdate + res.StateTransitionMetrics.StateUpdate + res.StateTransitionMetrics.StateHash) + + originStorageLoadTimer.Update(res.StateTransitionMetrics.OriginStorageLoadTime) + stateCommitTimer.Update(res.StateTransitionMetrics.TotalCommitTime) + blockPostprocessingTimer.Update(res.PostProcessTime) + + return &blockProcessingResult{ + usedGas: res.ProcessResult.GasUsed, + procTime: procTime, + status: status, + witness: nil, + }, nil +} + // ProcessBlock executes and validates the given block. If there was no error // it writes the block and associated state to database. func (bc *BlockChain) ProcessBlock(parentRoot common.Hash, block *types.Block, setHead bool, makeWitness bool, constructBALForTesting bool, validateBAL bool) (_ *blockProcessingResult, blockEndErr error) { + if block.Body().AccessList != nil { + return bc.processBlockWithAccessList(parentRoot, block, setHead) + } var ( err error startTime = time.Now() @@ -2109,85 +2217,53 @@ func (bc *BlockChain) ProcessBlock(parentRoot common.Hash, block *types.Block, s }() } - blockHadBAL := block.Body().AccessList != nil var res *ProcessResult - var resWithMetrics *ProcessResultWithMetrics var ptime, vtime time.Duration - if block.Body().AccessList != nil { - if block.NumberU64() == 0 { - return nil, fmt.Errorf("genesis block cannot have a block access list") - } - // TODO: rename 'validateBAL' to indicate that it's for validating that the BAL - // is present and we are after amsterdam fork. validateBAL=false is only used for - // testing BALs in pre-Amsterdam blocks. - if !validateBAL && !bc.chainConfig.IsAmsterdam(block.Number(), block.Time()) { - bc.reportBlock(block, res, fmt.Errorf("received block containing access list before glamsterdam activated")) - return nil, err - } - // Process block using the parent state as reference point - pstart := time.Now() - resWithMetrics, err = bc.parallelProcessor.Process(block, statedb, bc.cfg.VmConfig) - if err != nil { - // TODO: okay to pass nil here as execution result? - bc.reportBlock(block, nil, err) - return nil, err - } - ptime = time.Since(pstart) - vstart := time.Now() - var err error - err = bc.validator.ValidateState(block, statedb, resWithMetrics.ProcessResult, false, false) - if err != nil { - // TODO: okay to pass nil here as execution result? - bc.reportBlock(block, nil, err) - return nil, err - } - res = resWithMetrics.ProcessResult - vtime = time.Since(vstart) - } else { - var balTracer *BlockAccessListTracer - // Process block using the parent state as reference point - if constructBALForTesting { - balTracer, bc.cfg.VmConfig.Tracer = NewBlockAccessListTracer() - defer func() { - bc.cfg.VmConfig.Tracer = nil - }() + // BAL Tracer used for creating BALs in ProcessBlock in testing path only + var balTracer *BlockAccessListTracer - } - // Process block using the parent state as reference point - pstart := time.Now() - res, err = bc.processor.Process(block, statedb, bc.cfg.VmConfig) - if err != nil { - bc.reportBlock(block, res, err) - return nil, err - } - ptime = time.Since(pstart) + // Process block using the parent state as reference point + if constructBALForTesting { + balTracer, bc.cfg.VmConfig.Tracer = NewBlockAccessListTracer() + defer func() { + bc.cfg.VmConfig.Tracer = nil + }() - // TODO: if I remove this check before executing balTracer.Finalise, the following test fails: - // ExecutionSpecBlocktests/shanghai/eip3855_push0/push0/push0_storage_overwrite.json - if constructBALForTesting { - balTracer.OnBlockFinalization() - } + } + // Process block using the parent state as reference point + pstart := time.Now() + res, err = bc.processor.Process(block, statedb, bc.cfg.VmConfig) + if err != nil { + bc.reportBlock(block, res, err) + return nil, err + } + ptime = time.Since(pstart) - // unset the BAL-creation tracer (dirty) - bc.cfg.VmConfig.Tracer = nil + // TODO: if I remove this check before executing balTracer.Finalise, the following test fails: + // ExecutionSpecBlocktests/shanghai/eip3855_push0/push0/push0_storage_overwrite.json + if constructBALForTesting { + balTracer.OnBlockFinalization() + } - vstart := time.Now() - if err := bc.validator.ValidateState(block, statedb, res, true, false); err != nil { - bc.reportBlock(block, res, err) - return nil, err - } - vtime = time.Since(vstart) + // unset the BAL-creation tracer (dirty) + bc.cfg.VmConfig.Tracer = nil - if constructBALForTesting { - // very ugly... deep-copy the block body before setting the block access - // list on it to prevent mutating the block instance passed by the caller. - existingBody := block.Body() - block = block.WithBody(*existingBody) - existingBody = block.Body() - existingBody.AccessList = balTracer.AccessList().ToEncodingObj() - block = block.WithBody(*existingBody) - } + vstart := time.Now() + if err := bc.validator.ValidateState(block, statedb, res, false); err != nil { + bc.reportBlock(block, res, err) + return nil, err + } + vtime = time.Since(vstart) + + if constructBALForTesting { + // very ugly... deep-copy the block body before setting the block access + // list on it to prevent mutating the block instance passed by the caller. + existingBody := block.Body() + block = block.WithBody(*existingBody) + existingBody = block.Body() + existingBody.AccessList = balTracer.AccessList().ToEncodingObj() + block = block.WithBody(*existingBody) } // If witnesses was generated and stateless self-validation requested, do @@ -2220,36 +2296,26 @@ func (bc *BlockChain) ProcessBlock(parentRoot common.Hash, block *types.Block, s } var proctime time.Duration - if blockHadBAL { - blockPreprocessingTimer.Update(resWithMetrics.PreProcessTime) - blockPrestateLoadTimer.Update(resWithMetrics.PrestateLoadTime) - txExecutionTimer.Update(resWithMetrics.ExecTime) - stateRootCalctimer.Update(resWithMetrics.RootCalcTime) - blockPostprocessingTimer.Update(resWithMetrics.PostProcessTime) + xvtime := time.Since(xvstart) + proctime = time.Since(startTime) // processing + validation + cross validation - accountHashTimer.Update(statedb.AccountHashes) - } else { - xvtime := time.Since(xvstart) - proctime = time.Since(startTime) // processing + validation + cross validation - - // Update the metrics touched during block processing and validation - accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing) - storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing) - if statedb.AccountLoaded != 0 { - accountReadSingleTimer.Update(statedb.AccountReads / time.Duration(statedb.AccountLoaded)) - } - if statedb.StorageLoaded != 0 { - storageReadSingleTimer.Update(statedb.StorageReads / time.Duration(statedb.StorageLoaded)) - } - accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation) - storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation) - accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation) - triehash := statedb.AccountHashes // The time spent on tries hashing - trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update - blockExecutionTimer.Update(ptime - (statedb.AccountReads + statedb.StorageReads)) // The time spent on EVM processing - blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation - blockCrossValidationTimer.Update(xvtime) // The time spent on stateless cross validation + // Update the metrics touched during block processing and validation + accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing) + storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing) + if statedb.AccountLoaded != 0 { + accountReadSingleTimer.Update(statedb.AccountReads / time.Duration(statedb.AccountLoaded)) } + if statedb.StorageLoaded != 0 { + storageReadSingleTimer.Update(statedb.StorageReads / time.Duration(statedb.StorageLoaded)) + } + accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation) + storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation) + accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation) + triehash := statedb.AccountHashes // The time spent on tries hashing + trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update + blockExecutionTimer.Update(ptime - (statedb.AccountReads + statedb.StorageReads)) // The time spent on EVM processing + blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation + blockCrossValidationTimer.Update(xvtime) // The time spent on stateless cross validation // Write the block to the chain and get the status. var ( @@ -2783,6 +2849,10 @@ func (bc *BlockChain) reportBlock(block *types.Block, res *ProcessResult, err er log.Error(summarizeBadBlock(block, receipts, bc.Config(), err)) } +func (bc *BlockChain) reportBALBlock(block *types.Block, res *ProcessResult, err error) { + +} + // logForkReadiness will write a log when a future fork is scheduled, but not // active. This is useful so operators know their client is ready for the fork. func (bc *BlockChain) logForkReadiness(block *types.Block) { diff --git a/core/parallel_state_processor.go b/core/parallel_state_processor.go index 0bdbf7ca46..3f18b6feba 100644 --- a/core/parallel_state_processor.go +++ b/core/parallel_state_processor.go @@ -4,7 +4,6 @@ import ( "cmp" "fmt" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types/bal" @@ -17,18 +16,12 @@ import ( // ProcessResultWithMetrics wraps ProcessResult with some metrics that are // emitted when executing blocks containing access lists. type ProcessResultWithMetrics struct { - ProcessResult *ProcessResult - // the time it took to load modified prestate accounts from disk and instantiate statedbs for execution - PreProcessTime time.Duration - // the time it took to validate the block post transaction execution and state root calculation - PostProcessTime time.Duration - // the time it took to hash the state root, including intermediate node reads - RootCalcTime time.Duration - // the time that it took to load the prestate for accounts that were updated as part of - // the state root update - PrestateLoadTime time.Duration + ProcessResult *ProcessResult + PreProcessTime time.Duration + StateTransitionMetrics *state.BALStateTransitionMetrics // the time it took to execute all txs in the block - ExecTime time.Duration + ExecTime time.Duration + PostProcessTime time.Duration } // ParallelStateProcessor is used to execute and verify blocks containing @@ -214,31 +207,31 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxStateRea } else if rootCalcRes.err != nil { resCh <- &ProcessResultWithMetrics{ProcessResult: &ProcessResult{Error: rootCalcRes.err}} } else { - execResults.RootCalcTime = rootCalcRes.rootCalcTime - execResults.PrestateLoadTime = rootCalcRes.prestateLoadTime + // &{20.39677ms 0s 1.149668ms 735.295µs 0s 0s 0s 0s} + execResults.StateTransitionMetrics = rootCalcRes.metrics resCh <- execResults } } type stateRootCalculationResult struct { - err error - prestateLoadTime time.Duration - rootCalcTime time.Duration - root common.Hash + err error + metrics *state.BALStateTransitionMetrics + root common.Hash } // calcAndVerifyRoot performs the post-state root hash calculation, verifying // it against what is reported by the block and returning a result on resCh. -func (p *ParallelStateProcessor) calcAndVerifyRoot(preState *state.StateDB, block *types.Block, resCh chan stateRootCalculationResult) { +func (p *ParallelStateProcessor) calcAndVerifyRoot(preState *state.StateDB, block *types.Block, stateTransition *state.BALStateTransition, resCh chan stateRootCalculationResult) { // calculate and apply the block state modifications - root, prestateLoadTime, rootCalcTime := preState.BlockAccessList().StateRoot(preState) + //root, prestateLoadTime, rootCalcTime := preState.BlockAccessList().StateRoot(preState) + root := stateTransition.IntermediateRoot(false) res := stateRootCalculationResult{ - root: root, - prestateLoadTime: prestateLoadTime, - rootCalcTime: rootCalcTime, + // TODO: I think we can remove the root from this struct + metrics: stateTransition.Metrics(), } + // TODO: validate state root in block validator? if root != block.Root() { res.err = fmt.Errorf("state root mismatch. local: %x. remote: %x", root, block.Root()) } @@ -292,7 +285,8 @@ func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transactio // Process performs EVM execution and state root computation for a block which is known // to contain an access list. -func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*ProcessResultWithMetrics, error) { +func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *state.BALStateTransition, statedb *state.StateDB, cfg vm.Config) (*ProcessResultWithMetrics, error) { + //fmt.Println("Parallel Process") var ( header = block.Header() resCh = make(chan *ProcessResultWithMetrics) @@ -305,18 +299,9 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat tPreprocess time.Duration // time to create a set of prestates for parallel transaction execution tExecStart time.Time rootCalcResultCh = make(chan stateRootCalculationResult) + context vm.BlockContext ) - // Mutate the block and state according to any hard-fork specs - if p.chainConfig().DAOForkSupport && p.chainConfig().DAOForkBlock != nil && p.chainConfig().DAOForkBlock.Cmp(block.Number()) == 0 { - misc.ApplyDAOHardFork(statedb) - } - var ( - context vm.BlockContext - ) - alReader := state.NewBALReader(block, statedb) - statedb.SetBlockAccessList(alReader) - balTracer, hooks := NewBlockAccessListTracer() tracingStateDB := state.NewHookedState(statedb, hooks) // TODO: figure out exactly why we need to set the hooks on the TracingStateDB and the vm.Config @@ -364,7 +349,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat }) } - go p.calcAndVerifyRoot(statedb, block, rootCalcResultCh) + go p.calcAndVerifyRoot(statedb, block, stateTransition, rootCalcResultCh) res := <-resCh if res.ProcessResult.Error != nil { diff --git a/core/state/bal_reader.go b/core/state/bal_reader.go index 3bd343abf7..7a8278088b 100644 --- a/core/state/bal_reader.go +++ b/core/state/bal_reader.go @@ -143,12 +143,12 @@ type BALReader struct { } // NewBALReader constructs a new reader from an access list. db is expected to have been instantiated with a reader. -func NewBALReader(block *types.Block, db *StateDB) *BALReader { +func NewBALReader(block *types.Block, reader Reader) *BALReader { r := &BALReader{accesses: make(map[common.Address]*bal.AccountAccess), block: block} for _, acctDiff := range *block.Body().AccessList { r.accesses[acctDiff.Address] = &acctDiff } - r.prestateReader.resolve(db.Reader(), r.ModifiedAccounts()) + r.prestateReader.resolve(reader, r.ModifiedAccounts()) return r } @@ -211,21 +211,8 @@ func (r *BALReader) AccessedState() (res map[common.Address]map[common.Hash]stru // TODO: it feels weird that this modifies the prestate instance. However, it's needed because it will // subsequently be used in Commit. func (r *BALReader) StateRoot(prestate *StateDB) (root common.Hash, prestateLoadTime time.Duration, rootUpdateTime time.Duration) { - lastIdx := len(r.block.Transactions()) + 1 - modifiedAccts := r.ModifiedAccounts() - startPrestateLoad := time.Now() - for _, addr := range modifiedAccts { - diff := r.readAccountDiff(addr, lastIdx) - acct := r.prestateReader.account(addr) - obj := r.initMutatedObjFromDiff(prestate, addr, acct, diff) - if obj != nil { - prestate.setStateObject(obj) - } - } - prestateLoadTime = time.Since(startPrestateLoad) - rootUpdateStart := time.Now() root = prestate.IntermediateRoot(true) - rootUpdateTime = time.Since(rootUpdateStart) + // TODO: fix the metrics calculation here return root, prestateLoadTime, rootUpdateTime } diff --git a/core/state/bal_state_transition.go b/core/state/bal_state_transition.go new file mode 100644 index 0000000000..96f744ca51 --- /dev/null +++ b/core/state/bal_state_transition.go @@ -0,0 +1,522 @@ +package state + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/bal" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie/trienode" + "github.com/holiman/uint256" + "golang.org/x/sync/errgroup" + "maps" + "sync" + "sync/atomic" + "time" +) + +type BALStateTransition struct { + accessList *BALReader + db Database + reader Reader + stateTrie Trie + parentRoot common.Hash + + rootHash common.Hash + diffs map[common.Address]*bal.AccountMutations + prestates sync.Map //map[common.Address]*types.StateAccount + + poststates map[common.Address]*types.StateAccount + tries sync.Map //map[common.Address]Trie + + deletions map[common.Address]struct{} + + originStorages map[common.Address]map[common.Hash]common.Hash + originStoragesWG sync.WaitGroup + + accountDeleted int64 + accountUpdated int64 + storageDeleted atomic.Int64 + storageUpdated atomic.Int64 + + // TODO: maybe package these into their own 'CommitMetrics' struct instead of making them public fields + + stateUpdate *stateUpdate + + metrics BALStateTransitionMetrics +} + +func (s *BALStateTransition) Metrics() *BALStateTransitionMetrics { + return &s.metrics +} + +type BALStateTransitionMetrics struct { + // trie hashing metrics + AccountUpdate time.Duration + StatePrefetch time.Duration + StateUpdate time.Duration + StateHash time.Duration + OriginStorageLoadTime time.Duration + + // commit metrics + AccountCommits time.Duration + StorageCommits time.Duration + SnapshotCommits time.Duration + TrieDBCommits time.Duration + TotalCommitTime time.Duration +} + +func NewBALStateTransition(accessList *BALReader, db Database, parentRoot common.Hash) (*BALStateTransition, error) { + reader, err := db.Reader(parentRoot) + if err != nil { + panic("OH FUCK") + } + stateTrie, err := db.OpenTrie(parentRoot) + if err != nil { + return nil, err + } + + return &BALStateTransition{ + accessList: accessList, + db: db, + reader: reader, + stateTrie: stateTrie, + parentRoot: parentRoot, + rootHash: common.Hash{}, + diffs: make(map[common.Address]*bal.AccountMutations), + prestates: sync.Map{}, + poststates: make(map[common.Address]*types.StateAccount), + tries: sync.Map{}, + deletions: make(map[common.Address]struct{}), + originStorages: make(map[common.Address]map[common.Hash]common.Hash), + originStoragesWG: sync.WaitGroup{}, + stateUpdate: nil, + }, nil +} + +// TODO: make use of this method return the error from IntermediateRoot or Commit +func (s *BALStateTransition) Error() error { + return nil +} + +// TODO: refresh my knowledge of the storage-clearing EIP and ensure that my assumptions around +// an empty account which contains storage are valid here. +// +// isAccountDeleted checks whether the state account was deleted in this block. Post selfdestruct-removal, +// deletions can only occur if an account which has a balance becomes the target of a CREATE2 initcode +// which calls SENDALL, clearing the account and marking it for deletion. +func isAccountDeleted(prestate *types.StateAccount, mutations *bal.AccountMutations) bool { + // TODO: figure out how to simplify this method + if mutations.Code != nil && len(mutations.Code) != 0 { + return false + } + if mutations.Nonce != nil && *mutations.Nonce != 0 { + return false + } + if mutations.StorageWrites != nil && len(mutations.StorageWrites) > 0 { + return false + } + if mutations.Balance != nil { + if mutations.Balance.IsZero() { + if prestate.Nonce != 0 || prestate.Balance.IsZero() || common.BytesToHash(prestate.CodeHash) != types.EmptyCodeHash { + return false + } + // consider an empty account with storage to be deleted, so we don't check root here + return true + } + } + return false +} + +func (s *BALStateTransition) updateAccount(addr common.Address) (*types.StateAccount, []byte) { + a, _ := s.prestates.Load(addr) + acct := a.(*types.StateAccount) + + acct, diff := acct.Copy(), s.diffs[addr] + code := diff.Code + + if diff.Nonce != nil { + acct.Nonce = *diff.Nonce + } + if diff.Balance != nil { + acct.Balance = new(uint256.Int).Set(diff.Balance) + } + if tr, ok := s.tries.Load(addr); ok { + acct.Root = tr.(Trie).Hash() + } + return acct, code +} + +func (s *BALStateTransition) commitAccount(addr common.Address) (*accountUpdate, *trienode.NodeSet, error) { + var ( + encode = func(val common.Hash) []byte { + if val == (common.Hash{}) { + return nil + } + blob, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(val[:])) + return blob + } + ) + op := &accountUpdate{ + address: addr, + data: types.SlimAccountRLP(*s.poststates[addr]), // TODO: cache the updated state acocunt somewhere + } + if prestate, exist := s.prestates.Load(addr); exist { + prestate := prestate.(*types.StateAccount) + op.origin = types.SlimAccountRLP(*prestate) + } + + if s.diffs[addr].Code != nil { + op.code = &contractCode{ + crypto.Keccak256Hash(s.diffs[addr].Code), + s.diffs[addr].Code, + } + } + + if len(s.diffs[addr].StorageWrites) == 0 { + return op, nil, nil + } + + op.storages = make(map[common.Hash][]byte) + op.storagesOriginByHash = make(map[common.Hash][]byte) + op.storagesOriginByKey = make(map[common.Hash][]byte) + + for key, value := range s.diffs[addr].StorageWrites { + hash := crypto.Keccak256Hash(key.Bytes()) + op.storages[hash] = encode(value) + origin := encode(s.originStorages[addr][key]) + op.storagesOriginByHash[hash] = origin + op.storagesOriginByKey[key] = origin + } + tr, _ := s.tries.Load(addr) + root, nodes := tr.(Trie).Commit(false) + s.poststates[addr].Root = root + return op, nodes, nil +} + +func (s *BALStateTransition) CommitWithUpdate(block uint64, deleteEmptyObjects bool, noStorageWiping bool) (common.Hash, *stateUpdate, error) { + // 1) create a stateUpdate object + // Commit objects to the trie, measuring the elapsed time + var ( + commitStart = time.Now() + accountTrieNodesUpdated int + accountTrieNodesDeleted int + storageTrieNodesUpdated int + storageTrieNodesDeleted int + + lock sync.Mutex // protect two maps below + nodes = trienode.NewMergedNodeSet() // aggregated trie nodes + updates = make(map[common.Hash]*accountUpdate, len(s.diffs)) // aggregated account updates + + // merge aggregates the dirty trie nodes into the global set. + // + // Given that some accounts may be destroyed and then recreated within + // the same block, it's possible that a node set with the same owner + // may already exist. In such cases, these two sets are combined, with + // the later one overwriting the previous one if any nodes are modified + // or deleted in both sets. + // + // merge run concurrently across all the state objects and account trie. + merge = func(set *trienode.NodeSet) error { + if set == nil { + return nil + } + lock.Lock() + defer lock.Unlock() + + updates, deletes := set.Size() + if set.Owner == (common.Hash{}) { + accountTrieNodesUpdated += updates + accountTrieNodesDeleted += deletes + } else { + storageTrieNodesUpdated += updates + storageTrieNodesDeleted += deletes + } + return nodes.Merge(set) + } + ) + + destructedPrestates := make(map[common.Address]*types.StateAccount) + s.prestates.Range(func(key, value any) bool { + addr := key.(common.Address) + acct := value.(*types.StateAccount) + destructedPrestates[addr] = acct + return true + }) + + deletes, delNodes, err := handleDestruction(s.db, s.stateTrie, noStorageWiping, maps.Keys(s.deletions), destructedPrestates) + if err != nil { + return common.Hash{}, nil, err + } + for _, set := range delNodes { + if err := merge(set); err != nil { + return common.Hash{}, nil, err + } + } + + // Handle all state updates afterwards, concurrently to one another to shave + // off some milliseconds from the commit operation. Also accumulate the code + // writes to run in parallel with the computations. + var ( + start = time.Now() + root common.Hash + workers errgroup.Group + ) + // Schedule the account trie first since that will be the biggest, so give + // it the most time to crunch. + // + // TODO(karalabe): This account trie commit is *very* heavy. 5-6ms at chain + // heads, which seems excessive given that it doesn't do hashing, it just + // shuffles some data. For comparison, the *hashing* at chain head is 2-3ms. + // We need to investigate what's happening as it seems something's wonky. + // Obviously it's not an end of the world issue, just something the original + // code didn't anticipate for. + workers.Go(func() error { + // Write the account trie changes, measuring the amount of wasted time + newroot, set := s.stateTrie.Commit(true) + root = newroot + + if err := merge(set); err != nil { + return err + } + s.metrics.AccountCommits = time.Since(start) + return nil + }) + + s.originStoragesWG.Wait() + + // Schedule each of the storage tries that need to be updated, so they can + // run concurrently to one another. + // + // TODO(karalabe): Experimentally, the account commit takes approximately the + // same time as all the storage commits combined, so we could maybe only have + // 2 threads in total. But that kind of depends on the account commit being + // more expensive than it should be, so let's fix that and revisit this todo. + for addr, _ := range s.diffs { + if _, isDeleted := s.deletions[addr]; isDeleted { + continue + } + + address := addr + // Run the storage updates concurrently to one another + workers.Go(func() error { + // Write any storage changes in the state object to its storage trie + update, set, err := s.commitAccount(address) + if err != nil { + return err + } + + if err := merge(set); err != nil { + return err + } + lock.Lock() + updates[crypto.Keccak256Hash(address[:])] = update + s.metrics.StorageCommits = time.Since(start) // overwrite with the longest storage commit runtime + lock.Unlock() + return nil + }) + } + // Wait for everything to finish and update the metrics + if err := workers.Wait(); err != nil { + return common.Hash{}, nil, err + } + + accountUpdatedMeter.Mark(s.accountUpdated) + storageUpdatedMeter.Mark(s.storageUpdated.Load()) + accountDeletedMeter.Mark(s.accountDeleted) + storageDeletedMeter.Mark(s.storageDeleted.Load()) + accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated)) + accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted)) + storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated)) + storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted)) + + ret := newStateUpdate(noStorageWiping, s.parentRoot, root, block, deletes, updates, nodes) + + snapshotCommits, trieDBCommits, err := flushStateUpdate(s.db, block, ret) + if err != nil { + return common.Hash{}, nil, err + } + + s.metrics.SnapshotCommits, s.metrics.TrieDBCommits = snapshotCommits, trieDBCommits + s.metrics.TotalCommitTime = time.Since(commitStart) + return root, ret, nil +} + +func (s *BALStateTransition) IntermediateRoot(_ bool) common.Hash { + if s.rootHash != (common.Hash{}) { + return s.rootHash + } + + start := time.Now() + lastIdx := len(s.accessList.block.Transactions()) + 1 + + s.originStoragesWG.Add(1) + go func() { + defer s.originStoragesWG.Done() + for _, addr := range s.accessList.ModifiedAccounts() { + diff := s.accessList.readAccountDiff(addr, lastIdx) + if len(diff.StorageWrites) > 0 { + s.originStorages[addr] = make(map[common.Hash]common.Hash) + for key := range diff.StorageWrites { + val, err := s.reader.Storage(addr, key) + if err != nil { + panic("TODO: wat do?") + } + s.originStorages[addr][key] = val + } + } + } + s.metrics.OriginStorageLoadTime = time.Since(start) + }() + + var wg sync.WaitGroup + + // 1. resolve the entire state object for the updated addrs, in parallel prefetch them in the account trie + // 1. in parallel: + // * load the prestate of mutated state objects from the snapshot, update their tries. + // * prefetch all mutated account in the account trie + + for _, addr := range s.accessList.ModifiedAccounts() { + diff := s.accessList.readAccountDiff(addr, lastIdx) + s.diffs[addr] = diff + } + + for _, addr := range s.accessList.ModifiedAccounts() { + wg.Add(1) + address := addr + go func() { + acct := s.accessList.prestateReader.account(address) + diff := s.diffs[address] + if acct == nil { + acct = types.NewEmptyStateAccount() + } + s.prestates.Store(address, acct) + + if len(diff.StorageWrites) > 0 { + tr, err := s.db.OpenStorageTrie(s.parentRoot, address, acct.Root, s.stateTrie) + if err != nil { + panic("FUCK") + } + s.tries.Store(address, tr) + + var ( + updateKeys, updateValues [][]byte + deleteKeys [][]byte + ) + for key, val := range diff.StorageWrites { + if val != (common.Hash{}) { + updateKeys = append(updateKeys, key[:]) + updateValues = append(updateValues, common.TrimLeftZeroes(val[:])) + + s.storageUpdated.Add(1) + } else { + deleteKeys = append(deleteKeys, key[:]) + + s.storageDeleted.Add(1) + } + } + if err := tr.UpdateStorageBatch(address, updateKeys, updateValues); err != nil { + panic("FUCK1") + } + + for _, key := range deleteKeys { + if err := tr.DeleteStorage(address, key); err != nil { + panic("SHITTT") + } + } + + hashStart := time.Now() + tr.Hash() + s.metrics.StateHash = time.Since(hashStart) + /* + var objTrieData string + it, err := tr.NodeIterator([]byte{}) + if err != nil { + panic(err) + } + for it.Next(true) { + if it.Leaf() { + objTrieData += fmt.Sprintf("%x: %x\n", it.Path(), it.LeafBlob()) + } else { + objTrieData += fmt.Sprintf("%x: %x\n", it.Path(), it.Hash()) + } + } + fmt.Printf("acct hash %x. hash=%x, updated storage: %v, deleted storage: %v trie:\n%s\n", crypto.Keccak256Hash(address[:]), tr.Hash(), updateKeys, deleteKeys, objTrieData) + */ + } + + wg.Done() + }() + } + + wg.Add(1) + // prefetch all modified accounts in the main account trie. + go func() { + prefetchStart := time.Now() + if err := s.stateTrie.PrefetchAccount(s.accessList.ModifiedAccounts()); err != nil { + panic("FUCK2") + } + s.metrics.StatePrefetch = time.Since(prefetchStart) + wg.Done() + }() + + wg.Wait() + s.metrics.AccountUpdate = time.Since(start) + + // stage 2: update the main account trie + + stateUpdateStart := time.Now() + for mutatedAddr, _ := range s.diffs { + p, _ := s.prestates.Load(mutatedAddr) + prestate := p.(*types.StateAccount) + + isDeleted := isAccountDeleted(prestate, s.diffs[mutatedAddr]) + if isDeleted { + if err := s.stateTrie.DeleteAccount(mutatedAddr); err != nil { + panic("FUCK3") + } + s.deletions[mutatedAddr] = struct{}{} + } else { + acct, code := s.updateAccount(mutatedAddr) + + if code != nil { + codeHash := crypto.Keccak256Hash(code) + acct.CodeHash = codeHash.Bytes() + if err := s.stateTrie.UpdateContractCode(mutatedAddr, codeHash, code); err != nil { + panic("FUCK4") + } + } + if err := s.stateTrie.UpdateAccount(mutatedAddr, acct, len(code)); err != nil { + panic("FUCK4") + } + s.poststates[mutatedAddr] = acct + } + } + + s.metrics.StateUpdate = time.Since(stateUpdateStart) + + stateTrieHashStart := time.Now() + s.rootHash = s.stateTrie.Hash() + s.metrics.StateHash = time.Since(stateTrieHashStart) + + /* + it, err := s.stateTrie.NodeIterator([]byte{}) + if err != nil { + panic(err) + } + fmt.Println("state trie") + for it.Next(true) { + if it.Leaf() { + fmt.Printf("%x: %x\n", it.Path(), it.LeafBlob()) + } else { + fmt.Printf("%x: %x\n", it.Path(), it.Hash()) + } + } + */ + return s.rootHash +} + +func (s *BALStateTransition) Preimages() map[common.Hash][]byte { + // TODO: implement this + return make(map[common.Hash][]byte) +} diff --git a/core/state/state_object.go b/core/state/state_object.go index ed1ea72964..44e34fff8c 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -221,6 +221,7 @@ func (s *stateObject) SetState(key, value common.Hash) common.Hash { if prev == value { return prev } + // New value is different, update and journal the change s.db.journal.storageChange(s.address, key, prev, origin) s.setState(key, value, origin) @@ -296,6 +297,7 @@ func (s *stateObject) updateTrie() (Trie, error) { return s.trie, nil } } + // Retrieve a pretecher populated trie, or fall back to the database. This will // block until all prefetch tasks are done, which are needed for witnesses even // for unmodified state objects. diff --git a/core/state/statedb.go b/core/state/statedb.go index ebbd3f0d22..d02cbafb20 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -20,6 +20,7 @@ package state import ( "errors" "fmt" + "iter" "maps" "slices" "sync" @@ -65,6 +66,13 @@ func (m *mutation) isDelete() bool { return m.typ == deletion } +type BlockStateTransition interface { + CommitWithUpdate(block uint64, deleteEmptyObjects bool, noStorageWiping bool) (common.Hash, *stateUpdate, error) + IntermediateRoot(deleteEmpty bool) common.Hash + Error() error + Preimages() map[common.Hash][]byte +} + // StateDB structs within the ethereum protocol are used to store anything // within the merkle trie. StateDBs take care of caching and storing // nested states. It's the general query interface to retrieve: @@ -921,12 +929,33 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // later time. workers.SetLimit(1) } - for addr, op := range s.mutations { - if op.applied || op.isDelete() { - continue + var updatedAddrs []common.Address + + if s.blockAccessList != nil { + updatedAddrs = s.blockAccessList.ModifiedAccounts() + } else { + for addr, op := range s.mutations { + if op.applied || op.isDelete() { + continue + } + updatedAddrs = append(updatedAddrs, addr) } - obj := s.stateObjects[addr] // closure for the task runner below + } + + var m sync.Mutex + for _, addr := range updatedAddrs { workers.Go(func() error { + var obj *stateObject + if s.blockAccessList != nil { + lastIdx := len(s.blockAccessList.block.Transactions()) + 1 + diff := s.blockAccessList.readAccountDiff(addr, lastIdx) + acct := s.blockAccessList.prestateReader.account(addr) + m.Lock() + obj = s.blockAccessList.initMutatedObjFromDiff(s, addr, acct, diff) + m.Unlock() + } else { + obj = s.stateObjects[addr] // closure for the task runner below + } if s.db.TrieDB().IsVerkle() { obj.updateTrie() } else { @@ -1103,8 +1132,8 @@ func (s *StateDB) clearJournalAndRefund() { // of a specific account. It leverages the associated state snapshot for fast // storage iteration and constructs trie node deletion markers by creating // stack trie with iterated slots. -func (s *StateDB) fastDeleteStorage(snaps *snapshot.Tree, addrHash common.Hash, root common.Hash) (map[common.Hash][]byte, map[common.Hash][]byte, *trienode.NodeSet, error) { - iter, err := snaps.StorageIterator(s.originalRoot, addrHash, common.Hash{}) +func fastDeleteStorage(originalRoot common.Hash, snaps *snapshot.Tree, addrHash common.Hash, root common.Hash) (map[common.Hash][]byte, map[common.Hash][]byte, *trienode.NodeSet, error) { + iter, err := snaps.StorageIterator(originalRoot, addrHash, common.Hash{}) if err != nil { return nil, nil, nil, err } @@ -1143,8 +1172,8 @@ func (s *StateDB) fastDeleteStorage(snaps *snapshot.Tree, addrHash common.Hash, // slowDeleteStorage serves as a less-efficient alternative to "fastDeleteStorage," // employed when the associated state snapshot is not available. It iterates the // storage slots along with all internal trie nodes via trie directly. -func (s *StateDB) slowDeleteStorage(addr common.Address, addrHash common.Hash, root common.Hash) (map[common.Hash][]byte, map[common.Hash][]byte, *trienode.NodeSet, error) { - tr, err := s.db.OpenStorageTrie(s.originalRoot, addr, root, s.trie) +func slowDeleteStorage(db Database, trie Trie, originalRoot common.Hash, addr common.Address, addrHash common.Hash, root common.Hash) (map[common.Hash][]byte, map[common.Hash][]byte, *trienode.NodeSet, error) { + tr, err := db.OpenStorageTrie(originalRoot, addr, root, trie) if err != nil { return nil, nil, nil, fmt.Errorf("failed to open storage trie, err: %w", err) } @@ -1179,7 +1208,7 @@ func (s *StateDB) slowDeleteStorage(addr common.Address, addrHash common.Hash, r // The function will make an attempt to utilize an efficient strategy if the // associated state snapshot is reachable; otherwise, it will resort to a less // efficient approach. -func (s *StateDB) deleteStorage(addr common.Address, addrHash common.Hash, root common.Hash) (map[common.Hash][]byte, map[common.Hash][]byte, *trienode.NodeSet, error) { +func deleteStorage(db Database, trie Trie, addr common.Address, addrHash common.Hash, root, originalRoot common.Hash) (map[common.Hash][]byte, map[common.Hash][]byte, *trienode.NodeSet, error) { var ( err error nodes *trienode.NodeSet // the set for trie node mutations (value is nil) @@ -1189,12 +1218,12 @@ func (s *StateDB) deleteStorage(addr common.Address, addrHash common.Hash, root // The fast approach can be failed if the snapshot is not fully // generated, or it's internally corrupted. Fallback to the slow // one just in case. - snaps := s.db.Snapshot() + snaps := db.Snapshot() if snaps != nil { - storages, storageOrigins, nodes, err = s.fastDeleteStorage(snaps, addrHash, root) + storages, storageOrigins, nodes, err = fastDeleteStorage(originalRoot, snaps, addrHash, root) } if snaps == nil || err != nil { - storages, storageOrigins, nodes, err = s.slowDeleteStorage(addr, addrHash, root) + storages, storageOrigins, nodes, err = slowDeleteStorage(db, trie, originalRoot, addr, addrHash, root) } if err != nil { return nil, nil, nil, err @@ -1220,39 +1249,38 @@ func (s *StateDB) deleteStorage(addr common.Address, addrHash common.Hash, root // with their values be tracked as original value. // In case (d), **original** account along with its storages should be deleted, // with their values be tracked as original value. -func (s *StateDB) handleDestruction(noStorageWiping bool) (map[common.Hash]*accountDelete, []*trienode.NodeSet, error) { +func handleDestruction(db Database, trie Trie, noStorageWiping bool, destructions iter.Seq[common.Address], prestates map[common.Address]*types.StateAccount) (map[common.Hash]*accountDelete, []*trienode.NodeSet, error) { var ( nodes []*trienode.NodeSet deletes = make(map[common.Hash]*accountDelete) ) - for addr, prevObj := range s.stateObjectsDestruct { - prev := prevObj.origin - + for addr := range destructions { + prestate := prestates[addr] // The account was non-existent, and it's marked as destructed in the scope // of block. It can be either case (a) or (b) and will be interpreted as // null->null state transition. // - for (a), skip it without doing anything // - for (b), the resurrected account with nil as original will be handled afterwards - if prev == nil { + if prestate == nil { continue } // The account was existent, it can be either case (c) or (d). addrHash := crypto.Keccak256Hash(addr.Bytes()) op := &accountDelete{ address: addr, - origin: types.SlimAccountRLP(*prev), + origin: types.SlimAccountRLP(*prestate), } deletes[addrHash] = op // Short circuit if the origin storage was empty. - if prev.Root == types.EmptyRootHash || s.db.TrieDB().IsVerkle() { + if prestate.Root == types.EmptyRootHash || db.TrieDB().IsVerkle() { continue } if noStorageWiping { return nil, nil, fmt.Errorf("unexpected storage wiping, %x", addr) } // Remove storage slots belonging to the account. - storages, storagesOrigin, set, err := s.deleteStorage(addr, addrHash, prev.Root) + storages, storagesOrigin, set, err := deleteStorage(db, trie, addr, addrHash, prestate.Root, prestate.Root) if err != nil { return nil, nil, fmt.Errorf("failed to delete storage, err: %w", err) } @@ -1327,7 +1355,12 @@ func (s *StateDB) commit(deleteEmptyObjects bool, noStorageWiping bool, blockNum // the same block, account deletions must be processed first. This ensures // that the storage trie nodes deleted during destruction and recreated // during subsequent resurrection can be combined correctly. - deletes, delNodes, err := s.handleDestruction(noStorageWiping) + var stateAccountsDestruct, destructAccountsOrigins = make(map[common.Address]*types.StateAccount), make(map[common.Address]*types.StateAccount) + for addr, obj := range s.stateObjectsDestruct { + stateAccountsDestruct[addr] = &obj.data + destructAccountsOrigins[addr] = obj.origin + } + deletes, delNodes, err := handleDestruction(s.db, s.trie, noStorageWiping, maps.Keys(stateAccountsDestruct), destructAccountsOrigins) if err != nil { return nil, err } @@ -1428,6 +1461,44 @@ func (s *StateDB) commit(deleteEmptyObjects bool, noStorageWiping bool, blockNum return newStateUpdate(noStorageWiping, origin, root, blockNumber, deletes, updates, nodes), nil } +func flushStateUpdate(d Database, block uint64, update *stateUpdate) (snapshotCommits, trieDBCommits time.Duration, err error) { + if db := d.TrieDB().Disk(); db != nil && len(update.codes) > 0 { + batch := db.NewBatch() + for _, code := range update.codes { + rawdb.WriteCode(batch, code.hash, code.blob) + } + if err := batch.Write(); err != nil { + return 0, 0, err + } + } + if !update.empty() { + // If snapshotting is enabled, update the snapshot tree with this new version + if snap := d.Snapshot(); snap != nil && snap.Snapshot(update.originRoot) != nil { + start := time.Now() + if err := snap.Update(update.root, update.originRoot, update.accounts, update.storages); err != nil { + log.Warn("Failed to update snapshot tree", "from", update.originRoot, "to", update.root, "err", err) + } + // Keep 128 diff layers in the memory, persistent layer is 129th. + // - head layer is paired with HEAD state + // - head-1 layer is paired with HEAD-1 state + // - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state + if err := snap.Cap(update.root, TriesInMemory); err != nil { + log.Warn("Failed to cap snapshot tree", "root", update.root, "layers", TriesInMemory, "err", err) + } + snapshotCommits += time.Since(start) + } + // If trie database is enabled, commit the state update as a new layer + if db := d.TrieDB(); db != nil { + start := time.Now() + if err := db.Update(update.root, update.originRoot, block, update.nodes, update.stateSet()); err != nil { + return 0, 0, err + } + trieDBCommits += time.Since(start) + } + } + return snapshotCommits, trieDBCommits, nil +} + // commitAndFlush is a wrapper of commit which also commits the state mutations // to the configured data stores. func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool, noStorageWiping bool) (*stateUpdate, error) { @@ -1435,41 +1506,12 @@ func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool, noStorag if err != nil { return nil, err } - // Commit dirty contract code if any exists - if db := s.db.TrieDB().Disk(); db != nil && len(ret.codes) > 0 { - batch := db.NewBatch() - for _, code := range ret.codes { - rawdb.WriteCode(batch, code.hash, code.blob) - } - if err := batch.Write(); err != nil { - return nil, err - } - } - if !ret.empty() { - // If snapshotting is enabled, update the snapshot tree with this new version - if snap := s.db.Snapshot(); snap != nil && snap.Snapshot(ret.originRoot) != nil { - start := time.Now() - if err := snap.Update(ret.root, ret.originRoot, ret.accounts, ret.storages); err != nil { - log.Warn("Failed to update snapshot tree", "from", ret.originRoot, "to", ret.root, "err", err) - } - // Keep 128 diff layers in the memory, persistent layer is 129th. - // - head layer is paired with HEAD state - // - head-1 layer is paired with HEAD-1 state - // - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state - if err := snap.Cap(ret.root, TriesInMemory); err != nil { - log.Warn("Failed to cap snapshot tree", "root", ret.root, "layers", TriesInMemory, "err", err) - } - s.SnapshotCommits += time.Since(start) - } - // If trie database is enabled, commit the state update as a new layer - if db := s.db.TrieDB(); db != nil { - start := time.Now() - if err := db.Update(ret.root, ret.originRoot, block, ret.nodes, ret.stateSet()); err != nil { - return nil, err - } - s.TrieDBCommits += time.Since(start) - } + snapshotCommits, trieDBCommits, err := flushStateUpdate(s.db, block, ret) + if err != nil { + return nil, err } + s.SnapshotCommits = snapshotCommits + s.TrieDBCommits = trieDBCommits s.reader, _ = s.db.Reader(s.originalRoot) return ret, err } diff --git a/core/stateless.go b/core/stateless.go index 6067546387..b20c909da6 100644 --- a/core/stateless.go +++ b/core/stateless.go @@ -70,7 +70,7 @@ func ExecuteStateless(config *params.ChainConfig, vmconfig vm.Config, block *typ if err != nil { return common.Hash{}, common.Hash{}, err } - if err = validator.ValidateState(block, db, res, true, true); err != nil { + if err = validator.ValidateState(block, db, res, true); err != nil { return common.Hash{}, common.Hash{}, err } // Almost everything validated, but receipt and state root needs to be returned diff --git a/core/types.go b/core/types.go index 0f422a14e0..e4c907be13 100644 --- a/core/types.go +++ b/core/types.go @@ -32,7 +32,7 @@ type Validator interface { ValidateBody(block *types.Block) error // ValidateState validates the given statedb and optionally the process result. - ValidateState(block *types.Block, state *state.StateDB, res *ProcessResult, validateStateRoot, stateless bool) error + ValidateState(block *types.Block, state state.BlockStateTransition, res *ProcessResult, stateless bool) error } // Prefetcher is an interface for pre-caching transaction signatures and state. diff --git a/triedb/database.go b/triedb/database.go index d2637bd909..a8a66163f6 100644 --- a/triedb/database.go +++ b/triedb/database.go @@ -18,7 +18,6 @@ package triedb import ( "errors" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb"