mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-02-26 15:47:21 +00:00
attempt to optimize state root calculation for BALs
This commit is contained in:
parent
b24306eacc
commit
2721e8a1a2
10 changed files with 828 additions and 223 deletions
|
|
@ -147,7 +147,7 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error {
|
|||
|
||||
// ValidateState validates the various changes that happen after a state transition,
|
||||
// such as amount of used gas, the receipt roots and the state root itself.
|
||||
func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateDB, res *ProcessResult, validateStateRoot, stateless bool) error {
|
||||
func (v *BlockValidator) ValidateState(block *types.Block, stateTransition state.BlockStateTransition, res *ProcessResult, stateless bool) error {
|
||||
if res == nil {
|
||||
return errors.New("nil ProcessResult value")
|
||||
}
|
||||
|
|
@ -185,12 +185,10 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD
|
|||
return errors.New("block has requests before prague fork")
|
||||
}
|
||||
|
||||
if validateStateRoot {
|
||||
// Validate the state root against the received state root and throw
|
||||
// an error if they don't match.
|
||||
if root := statedb.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
|
||||
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, statedb.Error())
|
||||
}
|
||||
// Validate the state root against the received state root and throw
|
||||
// an error if they don't match.
|
||||
if root := stateTransition.IntermediateRoot(v.config.IsEIP158(header.Number)); header.Root != root {
|
||||
return fmt.Errorf("invalid merkle root (remote: %x local: %x) dberr: %w", header.Root, root, stateTransition.Error())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,10 +99,18 @@ var (
|
|||
blockWriteTimer = metrics.NewRegisteredResettingTimer("chain/write", nil)
|
||||
|
||||
// BAL-specific timers
|
||||
blockPreprocessingTimer = metrics.NewRegisteredResettingTimer("chain/preprocess", nil)
|
||||
blockPrestateLoadTimer = metrics.NewRegisteredResettingTimer("chain/prestateload", nil)
|
||||
txExecutionTimer = metrics.NewRegisteredResettingTimer("chain/txexecution", nil)
|
||||
stateRootCalctimer = metrics.NewRegisteredResettingTimer("chain/rootcalculation", nil)
|
||||
blockPreprocessingTimer = metrics.NewRegisteredResettingTimer("chain/preprocess", nil)
|
||||
txExecutionTimer = metrics.NewRegisteredResettingTimer("chain/txexecution", nil)
|
||||
|
||||
stateTrieHashTimer = metrics.NewRegisteredResettingTimer("chain/statetriehash", nil)
|
||||
accountTriesUpdateTimer = metrics.NewRegisteredResettingTimer("chain/accounttriesupdate", nil)
|
||||
stateTriePrefetchTimer = metrics.NewRegisteredResettingTimer("chain/statetrieprefetch", nil)
|
||||
stateTrieUpdateTimer = metrics.NewRegisteredResettingTimer("chain/statetrieupdate", nil)
|
||||
originStorageLoadTimer = metrics.NewRegisteredResettingTimer("chain/originstorageload", nil)
|
||||
|
||||
stateRootComputeTimer = metrics.NewRegisteredResettingTimer("chain/staterootcompute", nil)
|
||||
stateCommitTimer = metrics.NewRegisteredResettingTimer("chain/statetriecommit", nil)
|
||||
|
||||
blockPostprocessingTimer = metrics.NewRegisteredResettingTimer("chain/postprocess", nil)
|
||||
|
||||
blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil)
|
||||
|
|
@ -1601,7 +1609,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {
|
|||
|
||||
// writeBlockWithState writes block, metadata and corresponding state data to the
|
||||
// database.
|
||||
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, statedb *state.StateDB) error {
|
||||
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, transition state.BlockStateTransition) error {
|
||||
if !bc.HasHeader(block.ParentHash(), block.NumberU64()-1) {
|
||||
return consensus.ErrUnknownAncestor
|
||||
}
|
||||
|
|
@ -1612,12 +1620,13 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
|||
blockBatch := bc.db.NewBatch()
|
||||
rawdb.WriteBlock(blockBatch, block)
|
||||
rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts)
|
||||
rawdb.WritePreimages(blockBatch, statedb.Preimages())
|
||||
// TODO: consider moving Preimages out of the interface definition
|
||||
rawdb.WritePreimages(blockBatch, transition.Preimages())
|
||||
if err := blockBatch.Write(); err != nil {
|
||||
log.Crit("Failed to write block into disk", "err", err)
|
||||
}
|
||||
// Commit all cached state changes into underlying memory database.
|
||||
root, stateUpdate, err := statedb.CommitWithUpdate(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()), bc.chainConfig.IsCancun(block.Number(), block.Time()))
|
||||
root, stateUpdate, err := transition.CommitWithUpdate(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()), bc.chainConfig.IsCancun(block.Number(), block.Time()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -1687,7 +1696,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
|||
|
||||
// writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead.
|
||||
// This function expects the chain mutex to be held.
|
||||
func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
|
||||
func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state state.BlockStateTransition, emitHeadEvent bool) (status WriteStatus, err error) {
|
||||
if err := bc.writeBlockWithState(block, receipts, state); err != nil {
|
||||
return NonStatTy, err
|
||||
}
|
||||
|
|
@ -2000,9 +2009,108 @@ func (bpr *blockProcessingResult) Witness() *stateless.Witness {
|
|||
return bpr.witness
|
||||
}
|
||||
|
||||
func (bc *BlockChain) processBlockWithAccessList(parentRoot common.Hash, block *types.Block, setHead bool) (procRes *blockProcessingResult, blockEndErr error) {
|
||||
var (
|
||||
startTime = time.Now()
|
||||
procTime time.Duration
|
||||
)
|
||||
|
||||
reader, err := bc.statedb.Reader(parentRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
accessList := state.NewBALReader(block, reader)
|
||||
stateTransition, err := state.NewBALStateTransition(accessList, bc.statedb, parentRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
statedb, err := state.New(parentRoot, bc.statedb)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
statedb.SetBlockAccessList(accessList)
|
||||
|
||||
if bc.logger != nil && bc.logger.OnBlockStart != nil {
|
||||
bc.logger.OnBlockStart(tracing.BlockEvent{
|
||||
Block: block,
|
||||
Finalized: bc.CurrentFinalBlock(),
|
||||
Safe: bc.CurrentSafeBlock(),
|
||||
})
|
||||
}
|
||||
if bc.logger != nil && bc.logger.OnBlockEnd != nil {
|
||||
defer func() {
|
||||
bc.logger.OnBlockEnd(blockEndErr)
|
||||
}()
|
||||
}
|
||||
|
||||
res, err := bc.parallelProcessor.Process(block, stateTransition, statedb, bc.cfg.VmConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := bc.validator.ValidateState(block, stateTransition, res.ProcessResult, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
procTime = time.Since(startTime)
|
||||
|
||||
// Write the block to the chain and get the status.
|
||||
var (
|
||||
wstart = time.Now()
|
||||
status WriteStatus
|
||||
)
|
||||
if !setHead {
|
||||
// Don't set the head, only insert the block
|
||||
err = bc.writeBlockWithState(block, res.ProcessResult.Receipts, stateTransition)
|
||||
} else {
|
||||
status, err = bc.writeBlockAndSetHead(block, res.ProcessResult.Receipts, res.ProcessResult.Logs, stateTransition, false)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Update the metrics touched during block commit
|
||||
accountCommitTimer.Update(stateTransition.Metrics().AccountCommits) // Account commits are complete, we can mark them
|
||||
storageCommitTimer.Update(stateTransition.Metrics().StorageCommits) // Storage commits are complete, we can mark them
|
||||
snapshotCommitTimer.Update(stateTransition.Metrics().SnapshotCommits) // Snapshot commits are complete, we can mark them
|
||||
triedbCommitTimer.Update(stateTransition.Metrics().TrieDBCommits) // Trie database commits are complete, we can mark them
|
||||
|
||||
blockWriteTimer.Update(time.Since(wstart) - max(stateTransition.Metrics().AccountCommits, stateTransition.Metrics().StorageCommits) /* concurrent */ - statedb.SnapshotCommits - statedb.TrieDBCommits)
|
||||
elapsed := time.Since(startTime) + 1 // prevent zero division
|
||||
blockInsertTimer.Update(elapsed)
|
||||
|
||||
// TODO(rjl493456442) generalize the ResettingTimer
|
||||
mgasps := float64(res.ProcessResult.GasUsed) * 1000 / float64(elapsed)
|
||||
chainMgaspsMeter.Update(time.Duration(mgasps))
|
||||
|
||||
blockPreprocessingTimer.Update(res.PreProcessTime)
|
||||
txExecutionTimer.Update(res.ExecTime)
|
||||
|
||||
accountTriesUpdateTimer.Update(res.StateTransitionMetrics.AccountUpdate)
|
||||
stateTrieUpdateTimer.Update(res.StateTransitionMetrics.StateUpdate)
|
||||
stateTrieHashTimer.Update(res.StateTransitionMetrics.StateHash)
|
||||
stateRootComputeTimer.Update(res.StateTransitionMetrics.AccountUpdate + res.StateTransitionMetrics.StateUpdate + res.StateTransitionMetrics.StateHash)
|
||||
|
||||
originStorageLoadTimer.Update(res.StateTransitionMetrics.OriginStorageLoadTime)
|
||||
stateCommitTimer.Update(res.StateTransitionMetrics.TotalCommitTime)
|
||||
blockPostprocessingTimer.Update(res.PostProcessTime)
|
||||
|
||||
return &blockProcessingResult{
|
||||
usedGas: res.ProcessResult.GasUsed,
|
||||
procTime: procTime,
|
||||
status: status,
|
||||
witness: nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ProcessBlock executes and validates the given block. If there was no error
|
||||
// it writes the block and associated state to database.
|
||||
func (bc *BlockChain) ProcessBlock(parentRoot common.Hash, block *types.Block, setHead bool, makeWitness bool, constructBALForTesting bool, validateBAL bool) (_ *blockProcessingResult, blockEndErr error) {
|
||||
if block.Body().AccessList != nil {
|
||||
return bc.processBlockWithAccessList(parentRoot, block, setHead)
|
||||
}
|
||||
var (
|
||||
err error
|
||||
startTime = time.Now()
|
||||
|
|
@ -2109,85 +2217,53 @@ func (bc *BlockChain) ProcessBlock(parentRoot common.Hash, block *types.Block, s
|
|||
}()
|
||||
}
|
||||
|
||||
blockHadBAL := block.Body().AccessList != nil
|
||||
var res *ProcessResult
|
||||
var resWithMetrics *ProcessResultWithMetrics
|
||||
var ptime, vtime time.Duration
|
||||
if block.Body().AccessList != nil {
|
||||
if block.NumberU64() == 0 {
|
||||
return nil, fmt.Errorf("genesis block cannot have a block access list")
|
||||
}
|
||||
// TODO: rename 'validateBAL' to indicate that it's for validating that the BAL
|
||||
// is present and we are after amsterdam fork. validateBAL=false is only used for
|
||||
// testing BALs in pre-Amsterdam blocks.
|
||||
if !validateBAL && !bc.chainConfig.IsAmsterdam(block.Number(), block.Time()) {
|
||||
bc.reportBlock(block, res, fmt.Errorf("received block containing access list before glamsterdam activated"))
|
||||
return nil, err
|
||||
}
|
||||
// Process block using the parent state as reference point
|
||||
pstart := time.Now()
|
||||
resWithMetrics, err = bc.parallelProcessor.Process(block, statedb, bc.cfg.VmConfig)
|
||||
if err != nil {
|
||||
// TODO: okay to pass nil here as execution result?
|
||||
bc.reportBlock(block, nil, err)
|
||||
return nil, err
|
||||
}
|
||||
ptime = time.Since(pstart)
|
||||
|
||||
vstart := time.Now()
|
||||
var err error
|
||||
err = bc.validator.ValidateState(block, statedb, resWithMetrics.ProcessResult, false, false)
|
||||
if err != nil {
|
||||
// TODO: okay to pass nil here as execution result?
|
||||
bc.reportBlock(block, nil, err)
|
||||
return nil, err
|
||||
}
|
||||
res = resWithMetrics.ProcessResult
|
||||
vtime = time.Since(vstart)
|
||||
} else {
|
||||
var balTracer *BlockAccessListTracer
|
||||
// Process block using the parent state as reference point
|
||||
if constructBALForTesting {
|
||||
balTracer, bc.cfg.VmConfig.Tracer = NewBlockAccessListTracer()
|
||||
defer func() {
|
||||
bc.cfg.VmConfig.Tracer = nil
|
||||
}()
|
||||
// BAL Tracer used for creating BALs in ProcessBlock in testing path only
|
||||
var balTracer *BlockAccessListTracer
|
||||
|
||||
}
|
||||
// Process block using the parent state as reference point
|
||||
pstart := time.Now()
|
||||
res, err = bc.processor.Process(block, statedb, bc.cfg.VmConfig)
|
||||
if err != nil {
|
||||
bc.reportBlock(block, res, err)
|
||||
return nil, err
|
||||
}
|
||||
ptime = time.Since(pstart)
|
||||
// Process block using the parent state as reference point
|
||||
if constructBALForTesting {
|
||||
balTracer, bc.cfg.VmConfig.Tracer = NewBlockAccessListTracer()
|
||||
defer func() {
|
||||
bc.cfg.VmConfig.Tracer = nil
|
||||
}()
|
||||
|
||||
// TODO: if I remove this check before executing balTracer.Finalise, the following test fails:
|
||||
// ExecutionSpecBlocktests/shanghai/eip3855_push0/push0/push0_storage_overwrite.json
|
||||
if constructBALForTesting {
|
||||
balTracer.OnBlockFinalization()
|
||||
}
|
||||
}
|
||||
// Process block using the parent state as reference point
|
||||
pstart := time.Now()
|
||||
res, err = bc.processor.Process(block, statedb, bc.cfg.VmConfig)
|
||||
if err != nil {
|
||||
bc.reportBlock(block, res, err)
|
||||
return nil, err
|
||||
}
|
||||
ptime = time.Since(pstart)
|
||||
|
||||
// unset the BAL-creation tracer (dirty)
|
||||
bc.cfg.VmConfig.Tracer = nil
|
||||
// TODO: if I remove this check before executing balTracer.Finalise, the following test fails:
|
||||
// ExecutionSpecBlocktests/shanghai/eip3855_push0/push0/push0_storage_overwrite.json
|
||||
if constructBALForTesting {
|
||||
balTracer.OnBlockFinalization()
|
||||
}
|
||||
|
||||
vstart := time.Now()
|
||||
if err := bc.validator.ValidateState(block, statedb, res, true, false); err != nil {
|
||||
bc.reportBlock(block, res, err)
|
||||
return nil, err
|
||||
}
|
||||
vtime = time.Since(vstart)
|
||||
// unset the BAL-creation tracer (dirty)
|
||||
bc.cfg.VmConfig.Tracer = nil
|
||||
|
||||
if constructBALForTesting {
|
||||
// very ugly... deep-copy the block body before setting the block access
|
||||
// list on it to prevent mutating the block instance passed by the caller.
|
||||
existingBody := block.Body()
|
||||
block = block.WithBody(*existingBody)
|
||||
existingBody = block.Body()
|
||||
existingBody.AccessList = balTracer.AccessList().ToEncodingObj()
|
||||
block = block.WithBody(*existingBody)
|
||||
}
|
||||
vstart := time.Now()
|
||||
if err := bc.validator.ValidateState(block, statedb, res, false); err != nil {
|
||||
bc.reportBlock(block, res, err)
|
||||
return nil, err
|
||||
}
|
||||
vtime = time.Since(vstart)
|
||||
|
||||
if constructBALForTesting {
|
||||
// very ugly... deep-copy the block body before setting the block access
|
||||
// list on it to prevent mutating the block instance passed by the caller.
|
||||
existingBody := block.Body()
|
||||
block = block.WithBody(*existingBody)
|
||||
existingBody = block.Body()
|
||||
existingBody.AccessList = balTracer.AccessList().ToEncodingObj()
|
||||
block = block.WithBody(*existingBody)
|
||||
}
|
||||
|
||||
// If witnesses was generated and stateless self-validation requested, do
|
||||
|
|
@ -2220,36 +2296,26 @@ func (bc *BlockChain) ProcessBlock(parentRoot common.Hash, block *types.Block, s
|
|||
}
|
||||
|
||||
var proctime time.Duration
|
||||
if blockHadBAL {
|
||||
blockPreprocessingTimer.Update(resWithMetrics.PreProcessTime)
|
||||
blockPrestateLoadTimer.Update(resWithMetrics.PrestateLoadTime)
|
||||
txExecutionTimer.Update(resWithMetrics.ExecTime)
|
||||
stateRootCalctimer.Update(resWithMetrics.RootCalcTime)
|
||||
blockPostprocessingTimer.Update(resWithMetrics.PostProcessTime)
|
||||
xvtime := time.Since(xvstart)
|
||||
proctime = time.Since(startTime) // processing + validation + cross validation
|
||||
|
||||
accountHashTimer.Update(statedb.AccountHashes)
|
||||
} else {
|
||||
xvtime := time.Since(xvstart)
|
||||
proctime = time.Since(startTime) // processing + validation + cross validation
|
||||
|
||||
// Update the metrics touched during block processing and validation
|
||||
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing)
|
||||
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing)
|
||||
if statedb.AccountLoaded != 0 {
|
||||
accountReadSingleTimer.Update(statedb.AccountReads / time.Duration(statedb.AccountLoaded))
|
||||
}
|
||||
if statedb.StorageLoaded != 0 {
|
||||
storageReadSingleTimer.Update(statedb.StorageReads / time.Duration(statedb.StorageLoaded))
|
||||
}
|
||||
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation)
|
||||
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
|
||||
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
|
||||
triehash := statedb.AccountHashes // The time spent on tries hashing
|
||||
trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update
|
||||
blockExecutionTimer.Update(ptime - (statedb.AccountReads + statedb.StorageReads)) // The time spent on EVM processing
|
||||
blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation
|
||||
blockCrossValidationTimer.Update(xvtime) // The time spent on stateless cross validation
|
||||
// Update the metrics touched during block processing and validation
|
||||
accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing)
|
||||
storageReadTimer.Update(statedb.StorageReads) // Storage reads are complete(in processing)
|
||||
if statedb.AccountLoaded != 0 {
|
||||
accountReadSingleTimer.Update(statedb.AccountReads / time.Duration(statedb.AccountLoaded))
|
||||
}
|
||||
if statedb.StorageLoaded != 0 {
|
||||
storageReadSingleTimer.Update(statedb.StorageReads / time.Duration(statedb.StorageLoaded))
|
||||
}
|
||||
accountUpdateTimer.Update(statedb.AccountUpdates) // Account updates are complete(in validation)
|
||||
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
|
||||
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
|
||||
triehash := statedb.AccountHashes // The time spent on tries hashing
|
||||
trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update
|
||||
blockExecutionTimer.Update(ptime - (statedb.AccountReads + statedb.StorageReads)) // The time spent on EVM processing
|
||||
blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation
|
||||
blockCrossValidationTimer.Update(xvtime) // The time spent on stateless cross validation
|
||||
|
||||
// Write the block to the chain and get the status.
|
||||
var (
|
||||
|
|
@ -2783,6 +2849,10 @@ func (bc *BlockChain) reportBlock(block *types.Block, res *ProcessResult, err er
|
|||
log.Error(summarizeBadBlock(block, receipts, bc.Config(), err))
|
||||
}
|
||||
|
||||
func (bc *BlockChain) reportBALBlock(block *types.Block, res *ProcessResult, err error) {
|
||||
|
||||
}
|
||||
|
||||
// logForkReadiness will write a log when a future fork is scheduled, but not
|
||||
// active. This is useful so operators know their client is ready for the fork.
|
||||
func (bc *BlockChain) logForkReadiness(block *types.Block) {
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@ import (
|
|||
"cmp"
|
||||
"fmt"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/consensus/misc"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/core/types/bal"
|
||||
|
|
@ -17,18 +16,12 @@ import (
|
|||
// ProcessResultWithMetrics wraps ProcessResult with some metrics that are
|
||||
// emitted when executing blocks containing access lists.
|
||||
type ProcessResultWithMetrics struct {
|
||||
ProcessResult *ProcessResult
|
||||
// the time it took to load modified prestate accounts from disk and instantiate statedbs for execution
|
||||
PreProcessTime time.Duration
|
||||
// the time it took to validate the block post transaction execution and state root calculation
|
||||
PostProcessTime time.Duration
|
||||
// the time it took to hash the state root, including intermediate node reads
|
||||
RootCalcTime time.Duration
|
||||
// the time that it took to load the prestate for accounts that were updated as part of
|
||||
// the state root update
|
||||
PrestateLoadTime time.Duration
|
||||
ProcessResult *ProcessResult
|
||||
PreProcessTime time.Duration
|
||||
StateTransitionMetrics *state.BALStateTransitionMetrics
|
||||
// the time it took to execute all txs in the block
|
||||
ExecTime time.Duration
|
||||
ExecTime time.Duration
|
||||
PostProcessTime time.Duration
|
||||
}
|
||||
|
||||
// ParallelStateProcessor is used to execute and verify blocks containing
|
||||
|
|
@ -214,31 +207,31 @@ func (p *ParallelStateProcessor) resultHandler(block *types.Block, preTxStateRea
|
|||
} else if rootCalcRes.err != nil {
|
||||
resCh <- &ProcessResultWithMetrics{ProcessResult: &ProcessResult{Error: rootCalcRes.err}}
|
||||
} else {
|
||||
execResults.RootCalcTime = rootCalcRes.rootCalcTime
|
||||
execResults.PrestateLoadTime = rootCalcRes.prestateLoadTime
|
||||
// &{20.39677ms 0s 1.149668ms 735.295µs 0s 0s 0s 0s}
|
||||
execResults.StateTransitionMetrics = rootCalcRes.metrics
|
||||
resCh <- execResults
|
||||
}
|
||||
}
|
||||
|
||||
type stateRootCalculationResult struct {
|
||||
err error
|
||||
prestateLoadTime time.Duration
|
||||
rootCalcTime time.Duration
|
||||
root common.Hash
|
||||
err error
|
||||
metrics *state.BALStateTransitionMetrics
|
||||
root common.Hash
|
||||
}
|
||||
|
||||
// calcAndVerifyRoot performs the post-state root hash calculation, verifying
|
||||
// it against what is reported by the block and returning a result on resCh.
|
||||
func (p *ParallelStateProcessor) calcAndVerifyRoot(preState *state.StateDB, block *types.Block, resCh chan stateRootCalculationResult) {
|
||||
func (p *ParallelStateProcessor) calcAndVerifyRoot(preState *state.StateDB, block *types.Block, stateTransition *state.BALStateTransition, resCh chan stateRootCalculationResult) {
|
||||
// calculate and apply the block state modifications
|
||||
root, prestateLoadTime, rootCalcTime := preState.BlockAccessList().StateRoot(preState)
|
||||
//root, prestateLoadTime, rootCalcTime := preState.BlockAccessList().StateRoot(preState)
|
||||
root := stateTransition.IntermediateRoot(false)
|
||||
|
||||
res := stateRootCalculationResult{
|
||||
root: root,
|
||||
prestateLoadTime: prestateLoadTime,
|
||||
rootCalcTime: rootCalcTime,
|
||||
// TODO: I think we can remove the root from this struct
|
||||
metrics: stateTransition.Metrics(),
|
||||
}
|
||||
|
||||
// TODO: validate state root in block validator?
|
||||
if root != block.Root() {
|
||||
res.err = fmt.Errorf("state root mismatch. local: %x. remote: %x", root, block.Root())
|
||||
}
|
||||
|
|
@ -292,7 +285,8 @@ func (p *ParallelStateProcessor) execTx(block *types.Block, tx *types.Transactio
|
|||
|
||||
// Process performs EVM execution and state root computation for a block which is known
|
||||
// to contain an access list.
|
||||
func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (*ProcessResultWithMetrics, error) {
|
||||
func (p *ParallelStateProcessor) Process(block *types.Block, stateTransition *state.BALStateTransition, statedb *state.StateDB, cfg vm.Config) (*ProcessResultWithMetrics, error) {
|
||||
//fmt.Println("Parallel Process")
|
||||
var (
|
||||
header = block.Header()
|
||||
resCh = make(chan *ProcessResultWithMetrics)
|
||||
|
|
@ -305,18 +299,9 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
|
|||
tPreprocess time.Duration // time to create a set of prestates for parallel transaction execution
|
||||
tExecStart time.Time
|
||||
rootCalcResultCh = make(chan stateRootCalculationResult)
|
||||
context vm.BlockContext
|
||||
)
|
||||
|
||||
// Mutate the block and state according to any hard-fork specs
|
||||
if p.chainConfig().DAOForkSupport && p.chainConfig().DAOForkBlock != nil && p.chainConfig().DAOForkBlock.Cmp(block.Number()) == 0 {
|
||||
misc.ApplyDAOHardFork(statedb)
|
||||
}
|
||||
var (
|
||||
context vm.BlockContext
|
||||
)
|
||||
alReader := state.NewBALReader(block, statedb)
|
||||
statedb.SetBlockAccessList(alReader)
|
||||
|
||||
balTracer, hooks := NewBlockAccessListTracer()
|
||||
tracingStateDB := state.NewHookedState(statedb, hooks)
|
||||
// TODO: figure out exactly why we need to set the hooks on the TracingStateDB and the vm.Config
|
||||
|
|
@ -364,7 +349,7 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
|
|||
})
|
||||
}
|
||||
|
||||
go p.calcAndVerifyRoot(statedb, block, rootCalcResultCh)
|
||||
go p.calcAndVerifyRoot(statedb, block, stateTransition, rootCalcResultCh)
|
||||
|
||||
res := <-resCh
|
||||
if res.ProcessResult.Error != nil {
|
||||
|
|
|
|||
|
|
@ -143,12 +143,12 @@ type BALReader struct {
|
|||
}
|
||||
|
||||
// NewBALReader constructs a new reader from an access list. db is expected to have been instantiated with a reader.
|
||||
func NewBALReader(block *types.Block, db *StateDB) *BALReader {
|
||||
func NewBALReader(block *types.Block, reader Reader) *BALReader {
|
||||
r := &BALReader{accesses: make(map[common.Address]*bal.AccountAccess), block: block}
|
||||
for _, acctDiff := range *block.Body().AccessList {
|
||||
r.accesses[acctDiff.Address] = &acctDiff
|
||||
}
|
||||
r.prestateReader.resolve(db.Reader(), r.ModifiedAccounts())
|
||||
r.prestateReader.resolve(reader, r.ModifiedAccounts())
|
||||
return r
|
||||
}
|
||||
|
||||
|
|
@ -211,21 +211,8 @@ func (r *BALReader) AccessedState() (res map[common.Address]map[common.Hash]stru
|
|||
// TODO: it feels weird that this modifies the prestate instance. However, it's needed because it will
|
||||
// subsequently be used in Commit.
|
||||
func (r *BALReader) StateRoot(prestate *StateDB) (root common.Hash, prestateLoadTime time.Duration, rootUpdateTime time.Duration) {
|
||||
lastIdx := len(r.block.Transactions()) + 1
|
||||
modifiedAccts := r.ModifiedAccounts()
|
||||
startPrestateLoad := time.Now()
|
||||
for _, addr := range modifiedAccts {
|
||||
diff := r.readAccountDiff(addr, lastIdx)
|
||||
acct := r.prestateReader.account(addr)
|
||||
obj := r.initMutatedObjFromDiff(prestate, addr, acct, diff)
|
||||
if obj != nil {
|
||||
prestate.setStateObject(obj)
|
||||
}
|
||||
}
|
||||
prestateLoadTime = time.Since(startPrestateLoad)
|
||||
rootUpdateStart := time.Now()
|
||||
root = prestate.IntermediateRoot(true)
|
||||
rootUpdateTime = time.Since(rootUpdateStart)
|
||||
// TODO: fix the metrics calculation here
|
||||
return root, prestateLoadTime, rootUpdateTime
|
||||
}
|
||||
|
||||
|
|
|
|||
522
core/state/bal_state_transition.go
Normal file
522
core/state/bal_state_transition.go
Normal file
|
|
@ -0,0 +1,522 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/core/types/bal"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/trie/trienode"
|
||||
"github.com/holiman/uint256"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"maps"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type BALStateTransition struct {
|
||||
accessList *BALReader
|
||||
db Database
|
||||
reader Reader
|
||||
stateTrie Trie
|
||||
parentRoot common.Hash
|
||||
|
||||
rootHash common.Hash
|
||||
diffs map[common.Address]*bal.AccountMutations
|
||||
prestates sync.Map //map[common.Address]*types.StateAccount
|
||||
|
||||
poststates map[common.Address]*types.StateAccount
|
||||
tries sync.Map //map[common.Address]Trie
|
||||
|
||||
deletions map[common.Address]struct{}
|
||||
|
||||
originStorages map[common.Address]map[common.Hash]common.Hash
|
||||
originStoragesWG sync.WaitGroup
|
||||
|
||||
accountDeleted int64
|
||||
accountUpdated int64
|
||||
storageDeleted atomic.Int64
|
||||
storageUpdated atomic.Int64
|
||||
|
||||
// TODO: maybe package these into their own 'CommitMetrics' struct instead of making them public fields
|
||||
|
||||
stateUpdate *stateUpdate
|
||||
|
||||
metrics BALStateTransitionMetrics
|
||||
}
|
||||
|
||||
func (s *BALStateTransition) Metrics() *BALStateTransitionMetrics {
|
||||
return &s.metrics
|
||||
}
|
||||
|
||||
type BALStateTransitionMetrics struct {
|
||||
// trie hashing metrics
|
||||
AccountUpdate time.Duration
|
||||
StatePrefetch time.Duration
|
||||
StateUpdate time.Duration
|
||||
StateHash time.Duration
|
||||
OriginStorageLoadTime time.Duration
|
||||
|
||||
// commit metrics
|
||||
AccountCommits time.Duration
|
||||
StorageCommits time.Duration
|
||||
SnapshotCommits time.Duration
|
||||
TrieDBCommits time.Duration
|
||||
TotalCommitTime time.Duration
|
||||
}
|
||||
|
||||
func NewBALStateTransition(accessList *BALReader, db Database, parentRoot common.Hash) (*BALStateTransition, error) {
|
||||
reader, err := db.Reader(parentRoot)
|
||||
if err != nil {
|
||||
panic("OH FUCK")
|
||||
}
|
||||
stateTrie, err := db.OpenTrie(parentRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &BALStateTransition{
|
||||
accessList: accessList,
|
||||
db: db,
|
||||
reader: reader,
|
||||
stateTrie: stateTrie,
|
||||
parentRoot: parentRoot,
|
||||
rootHash: common.Hash{},
|
||||
diffs: make(map[common.Address]*bal.AccountMutations),
|
||||
prestates: sync.Map{},
|
||||
poststates: make(map[common.Address]*types.StateAccount),
|
||||
tries: sync.Map{},
|
||||
deletions: make(map[common.Address]struct{}),
|
||||
originStorages: make(map[common.Address]map[common.Hash]common.Hash),
|
||||
originStoragesWG: sync.WaitGroup{},
|
||||
stateUpdate: nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TODO: make use of this method return the error from IntermediateRoot or Commit
|
||||
func (s *BALStateTransition) Error() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: refresh my knowledge of the storage-clearing EIP and ensure that my assumptions around
|
||||
// an empty account which contains storage are valid here.
|
||||
//
|
||||
// isAccountDeleted checks whether the state account was deleted in this block. Post selfdestruct-removal,
|
||||
// deletions can only occur if an account which has a balance becomes the target of a CREATE2 initcode
|
||||
// which calls SENDALL, clearing the account and marking it for deletion.
|
||||
func isAccountDeleted(prestate *types.StateAccount, mutations *bal.AccountMutations) bool {
|
||||
// TODO: figure out how to simplify this method
|
||||
if mutations.Code != nil && len(mutations.Code) != 0 {
|
||||
return false
|
||||
}
|
||||
if mutations.Nonce != nil && *mutations.Nonce != 0 {
|
||||
return false
|
||||
}
|
||||
if mutations.StorageWrites != nil && len(mutations.StorageWrites) > 0 {
|
||||
return false
|
||||
}
|
||||
if mutations.Balance != nil {
|
||||
if mutations.Balance.IsZero() {
|
||||
if prestate.Nonce != 0 || prestate.Balance.IsZero() || common.BytesToHash(prestate.CodeHash) != types.EmptyCodeHash {
|
||||
return false
|
||||
}
|
||||
// consider an empty account with storage to be deleted, so we don't check root here
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *BALStateTransition) updateAccount(addr common.Address) (*types.StateAccount, []byte) {
|
||||
a, _ := s.prestates.Load(addr)
|
||||
acct := a.(*types.StateAccount)
|
||||
|
||||
acct, diff := acct.Copy(), s.diffs[addr]
|
||||
code := diff.Code
|
||||
|
||||
if diff.Nonce != nil {
|
||||
acct.Nonce = *diff.Nonce
|
||||
}
|
||||
if diff.Balance != nil {
|
||||
acct.Balance = new(uint256.Int).Set(diff.Balance)
|
||||
}
|
||||
if tr, ok := s.tries.Load(addr); ok {
|
||||
acct.Root = tr.(Trie).Hash()
|
||||
}
|
||||
return acct, code
|
||||
}
|
||||
|
||||
func (s *BALStateTransition) commitAccount(addr common.Address) (*accountUpdate, *trienode.NodeSet, error) {
|
||||
var (
|
||||
encode = func(val common.Hash) []byte {
|
||||
if val == (common.Hash{}) {
|
||||
return nil
|
||||
}
|
||||
blob, _ := rlp.EncodeToBytes(common.TrimLeftZeroes(val[:]))
|
||||
return blob
|
||||
}
|
||||
)
|
||||
op := &accountUpdate{
|
||||
address: addr,
|
||||
data: types.SlimAccountRLP(*s.poststates[addr]), // TODO: cache the updated state acocunt somewhere
|
||||
}
|
||||
if prestate, exist := s.prestates.Load(addr); exist {
|
||||
prestate := prestate.(*types.StateAccount)
|
||||
op.origin = types.SlimAccountRLP(*prestate)
|
||||
}
|
||||
|
||||
if s.diffs[addr].Code != nil {
|
||||
op.code = &contractCode{
|
||||
crypto.Keccak256Hash(s.diffs[addr].Code),
|
||||
s.diffs[addr].Code,
|
||||
}
|
||||
}
|
||||
|
||||
if len(s.diffs[addr].StorageWrites) == 0 {
|
||||
return op, nil, nil
|
||||
}
|
||||
|
||||
op.storages = make(map[common.Hash][]byte)
|
||||
op.storagesOriginByHash = make(map[common.Hash][]byte)
|
||||
op.storagesOriginByKey = make(map[common.Hash][]byte)
|
||||
|
||||
for key, value := range s.diffs[addr].StorageWrites {
|
||||
hash := crypto.Keccak256Hash(key.Bytes())
|
||||
op.storages[hash] = encode(value)
|
||||
origin := encode(s.originStorages[addr][key])
|
||||
op.storagesOriginByHash[hash] = origin
|
||||
op.storagesOriginByKey[key] = origin
|
||||
}
|
||||
tr, _ := s.tries.Load(addr)
|
||||
root, nodes := tr.(Trie).Commit(false)
|
||||
s.poststates[addr].Root = root
|
||||
return op, nodes, nil
|
||||
}
|
||||
|
||||
func (s *BALStateTransition) CommitWithUpdate(block uint64, deleteEmptyObjects bool, noStorageWiping bool) (common.Hash, *stateUpdate, error) {
|
||||
// 1) create a stateUpdate object
|
||||
// Commit objects to the trie, measuring the elapsed time
|
||||
var (
|
||||
commitStart = time.Now()
|
||||
accountTrieNodesUpdated int
|
||||
accountTrieNodesDeleted int
|
||||
storageTrieNodesUpdated int
|
||||
storageTrieNodesDeleted int
|
||||
|
||||
lock sync.Mutex // protect two maps below
|
||||
nodes = trienode.NewMergedNodeSet() // aggregated trie nodes
|
||||
updates = make(map[common.Hash]*accountUpdate, len(s.diffs)) // aggregated account updates
|
||||
|
||||
// merge aggregates the dirty trie nodes into the global set.
|
||||
//
|
||||
// Given that some accounts may be destroyed and then recreated within
|
||||
// the same block, it's possible that a node set with the same owner
|
||||
// may already exist. In such cases, these two sets are combined, with
|
||||
// the later one overwriting the previous one if any nodes are modified
|
||||
// or deleted in both sets.
|
||||
//
|
||||
// merge run concurrently across all the state objects and account trie.
|
||||
merge = func(set *trienode.NodeSet) error {
|
||||
if set == nil {
|
||||
return nil
|
||||
}
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
updates, deletes := set.Size()
|
||||
if set.Owner == (common.Hash{}) {
|
||||
accountTrieNodesUpdated += updates
|
||||
accountTrieNodesDeleted += deletes
|
||||
} else {
|
||||
storageTrieNodesUpdated += updates
|
||||
storageTrieNodesDeleted += deletes
|
||||
}
|
||||
return nodes.Merge(set)
|
||||
}
|
||||
)
|
||||
|
||||
destructedPrestates := make(map[common.Address]*types.StateAccount)
|
||||
s.prestates.Range(func(key, value any) bool {
|
||||
addr := key.(common.Address)
|
||||
acct := value.(*types.StateAccount)
|
||||
destructedPrestates[addr] = acct
|
||||
return true
|
||||
})
|
||||
|
||||
deletes, delNodes, err := handleDestruction(s.db, s.stateTrie, noStorageWiping, maps.Keys(s.deletions), destructedPrestates)
|
||||
if err != nil {
|
||||
return common.Hash{}, nil, err
|
||||
}
|
||||
for _, set := range delNodes {
|
||||
if err := merge(set); err != nil {
|
||||
return common.Hash{}, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Handle all state updates afterwards, concurrently to one another to shave
|
||||
// off some milliseconds from the commit operation. Also accumulate the code
|
||||
// writes to run in parallel with the computations.
|
||||
var (
|
||||
start = time.Now()
|
||||
root common.Hash
|
||||
workers errgroup.Group
|
||||
)
|
||||
// Schedule the account trie first since that will be the biggest, so give
|
||||
// it the most time to crunch.
|
||||
//
|
||||
// TODO(karalabe): This account trie commit is *very* heavy. 5-6ms at chain
|
||||
// heads, which seems excessive given that it doesn't do hashing, it just
|
||||
// shuffles some data. For comparison, the *hashing* at chain head is 2-3ms.
|
||||
// We need to investigate what's happening as it seems something's wonky.
|
||||
// Obviously it's not an end of the world issue, just something the original
|
||||
// code didn't anticipate for.
|
||||
workers.Go(func() error {
|
||||
// Write the account trie changes, measuring the amount of wasted time
|
||||
newroot, set := s.stateTrie.Commit(true)
|
||||
root = newroot
|
||||
|
||||
if err := merge(set); err != nil {
|
||||
return err
|
||||
}
|
||||
s.metrics.AccountCommits = time.Since(start)
|
||||
return nil
|
||||
})
|
||||
|
||||
s.originStoragesWG.Wait()
|
||||
|
||||
// Schedule each of the storage tries that need to be updated, so they can
|
||||
// run concurrently to one another.
|
||||
//
|
||||
// TODO(karalabe): Experimentally, the account commit takes approximately the
|
||||
// same time as all the storage commits combined, so we could maybe only have
|
||||
// 2 threads in total. But that kind of depends on the account commit being
|
||||
// more expensive than it should be, so let's fix that and revisit this todo.
|
||||
for addr, _ := range s.diffs {
|
||||
if _, isDeleted := s.deletions[addr]; isDeleted {
|
||||
continue
|
||||
}
|
||||
|
||||
address := addr
|
||||
// Run the storage updates concurrently to one another
|
||||
workers.Go(func() error {
|
||||
// Write any storage changes in the state object to its storage trie
|
||||
update, set, err := s.commitAccount(address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := merge(set); err != nil {
|
||||
return err
|
||||
}
|
||||
lock.Lock()
|
||||
updates[crypto.Keccak256Hash(address[:])] = update
|
||||
s.metrics.StorageCommits = time.Since(start) // overwrite with the longest storage commit runtime
|
||||
lock.Unlock()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
// Wait for everything to finish and update the metrics
|
||||
if err := workers.Wait(); err != nil {
|
||||
return common.Hash{}, nil, err
|
||||
}
|
||||
|
||||
accountUpdatedMeter.Mark(s.accountUpdated)
|
||||
storageUpdatedMeter.Mark(s.storageUpdated.Load())
|
||||
accountDeletedMeter.Mark(s.accountDeleted)
|
||||
storageDeletedMeter.Mark(s.storageDeleted.Load())
|
||||
accountTrieUpdatedMeter.Mark(int64(accountTrieNodesUpdated))
|
||||
accountTrieDeletedMeter.Mark(int64(accountTrieNodesDeleted))
|
||||
storageTriesUpdatedMeter.Mark(int64(storageTrieNodesUpdated))
|
||||
storageTriesDeletedMeter.Mark(int64(storageTrieNodesDeleted))
|
||||
|
||||
ret := newStateUpdate(noStorageWiping, s.parentRoot, root, block, deletes, updates, nodes)
|
||||
|
||||
snapshotCommits, trieDBCommits, err := flushStateUpdate(s.db, block, ret)
|
||||
if err != nil {
|
||||
return common.Hash{}, nil, err
|
||||
}
|
||||
|
||||
s.metrics.SnapshotCommits, s.metrics.TrieDBCommits = snapshotCommits, trieDBCommits
|
||||
s.metrics.TotalCommitTime = time.Since(commitStart)
|
||||
return root, ret, nil
|
||||
}
|
||||
|
||||
func (s *BALStateTransition) IntermediateRoot(_ bool) common.Hash {
|
||||
if s.rootHash != (common.Hash{}) {
|
||||
return s.rootHash
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
lastIdx := len(s.accessList.block.Transactions()) + 1
|
||||
|
||||
s.originStoragesWG.Add(1)
|
||||
go func() {
|
||||
defer s.originStoragesWG.Done()
|
||||
for _, addr := range s.accessList.ModifiedAccounts() {
|
||||
diff := s.accessList.readAccountDiff(addr, lastIdx)
|
||||
if len(diff.StorageWrites) > 0 {
|
||||
s.originStorages[addr] = make(map[common.Hash]common.Hash)
|
||||
for key := range diff.StorageWrites {
|
||||
val, err := s.reader.Storage(addr, key)
|
||||
if err != nil {
|
||||
panic("TODO: wat do?")
|
||||
}
|
||||
s.originStorages[addr][key] = val
|
||||
}
|
||||
}
|
||||
}
|
||||
s.metrics.OriginStorageLoadTime = time.Since(start)
|
||||
}()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// 1. resolve the entire state object for the updated addrs, in parallel prefetch them in the account trie
|
||||
// 1. in parallel:
|
||||
// * load the prestate of mutated state objects from the snapshot, update their tries.
|
||||
// * prefetch all mutated account in the account trie
|
||||
|
||||
for _, addr := range s.accessList.ModifiedAccounts() {
|
||||
diff := s.accessList.readAccountDiff(addr, lastIdx)
|
||||
s.diffs[addr] = diff
|
||||
}
|
||||
|
||||
for _, addr := range s.accessList.ModifiedAccounts() {
|
||||
wg.Add(1)
|
||||
address := addr
|
||||
go func() {
|
||||
acct := s.accessList.prestateReader.account(address)
|
||||
diff := s.diffs[address]
|
||||
if acct == nil {
|
||||
acct = types.NewEmptyStateAccount()
|
||||
}
|
||||
s.prestates.Store(address, acct)
|
||||
|
||||
if len(diff.StorageWrites) > 0 {
|
||||
tr, err := s.db.OpenStorageTrie(s.parentRoot, address, acct.Root, s.stateTrie)
|
||||
if err != nil {
|
||||
panic("FUCK")
|
||||
}
|
||||
s.tries.Store(address, tr)
|
||||
|
||||
var (
|
||||
updateKeys, updateValues [][]byte
|
||||
deleteKeys [][]byte
|
||||
)
|
||||
for key, val := range diff.StorageWrites {
|
||||
if val != (common.Hash{}) {
|
||||
updateKeys = append(updateKeys, key[:])
|
||||
updateValues = append(updateValues, common.TrimLeftZeroes(val[:]))
|
||||
|
||||
s.storageUpdated.Add(1)
|
||||
} else {
|
||||
deleteKeys = append(deleteKeys, key[:])
|
||||
|
||||
s.storageDeleted.Add(1)
|
||||
}
|
||||
}
|
||||
if err := tr.UpdateStorageBatch(address, updateKeys, updateValues); err != nil {
|
||||
panic("FUCK1")
|
||||
}
|
||||
|
||||
for _, key := range deleteKeys {
|
||||
if err := tr.DeleteStorage(address, key); err != nil {
|
||||
panic("SHITTT")
|
||||
}
|
||||
}
|
||||
|
||||
hashStart := time.Now()
|
||||
tr.Hash()
|
||||
s.metrics.StateHash = time.Since(hashStart)
|
||||
/*
|
||||
var objTrieData string
|
||||
it, err := tr.NodeIterator([]byte{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for it.Next(true) {
|
||||
if it.Leaf() {
|
||||
objTrieData += fmt.Sprintf("%x: %x\n", it.Path(), it.LeafBlob())
|
||||
} else {
|
||||
objTrieData += fmt.Sprintf("%x: %x\n", it.Path(), it.Hash())
|
||||
}
|
||||
}
|
||||
fmt.Printf("acct hash %x. hash=%x, updated storage: %v, deleted storage: %v trie:\n%s\n", crypto.Keccak256Hash(address[:]), tr.Hash(), updateKeys, deleteKeys, objTrieData)
|
||||
*/
|
||||
}
|
||||
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
// prefetch all modified accounts in the main account trie.
|
||||
go func() {
|
||||
prefetchStart := time.Now()
|
||||
if err := s.stateTrie.PrefetchAccount(s.accessList.ModifiedAccounts()); err != nil {
|
||||
panic("FUCK2")
|
||||
}
|
||||
s.metrics.StatePrefetch = time.Since(prefetchStart)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
s.metrics.AccountUpdate = time.Since(start)
|
||||
|
||||
// stage 2: update the main account trie
|
||||
|
||||
stateUpdateStart := time.Now()
|
||||
for mutatedAddr, _ := range s.diffs {
|
||||
p, _ := s.prestates.Load(mutatedAddr)
|
||||
prestate := p.(*types.StateAccount)
|
||||
|
||||
isDeleted := isAccountDeleted(prestate, s.diffs[mutatedAddr])
|
||||
if isDeleted {
|
||||
if err := s.stateTrie.DeleteAccount(mutatedAddr); err != nil {
|
||||
panic("FUCK3")
|
||||
}
|
||||
s.deletions[mutatedAddr] = struct{}{}
|
||||
} else {
|
||||
acct, code := s.updateAccount(mutatedAddr)
|
||||
|
||||
if code != nil {
|
||||
codeHash := crypto.Keccak256Hash(code)
|
||||
acct.CodeHash = codeHash.Bytes()
|
||||
if err := s.stateTrie.UpdateContractCode(mutatedAddr, codeHash, code); err != nil {
|
||||
panic("FUCK4")
|
||||
}
|
||||
}
|
||||
if err := s.stateTrie.UpdateAccount(mutatedAddr, acct, len(code)); err != nil {
|
||||
panic("FUCK4")
|
||||
}
|
||||
s.poststates[mutatedAddr] = acct
|
||||
}
|
||||
}
|
||||
|
||||
s.metrics.StateUpdate = time.Since(stateUpdateStart)
|
||||
|
||||
stateTrieHashStart := time.Now()
|
||||
s.rootHash = s.stateTrie.Hash()
|
||||
s.metrics.StateHash = time.Since(stateTrieHashStart)
|
||||
|
||||
/*
|
||||
it, err := s.stateTrie.NodeIterator([]byte{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fmt.Println("state trie")
|
||||
for it.Next(true) {
|
||||
if it.Leaf() {
|
||||
fmt.Printf("%x: %x\n", it.Path(), it.LeafBlob())
|
||||
} else {
|
||||
fmt.Printf("%x: %x\n", it.Path(), it.Hash())
|
||||
}
|
||||
}
|
||||
*/
|
||||
return s.rootHash
|
||||
}
|
||||
|
||||
func (s *BALStateTransition) Preimages() map[common.Hash][]byte {
|
||||
// TODO: implement this
|
||||
return make(map[common.Hash][]byte)
|
||||
}
|
||||
|
|
@ -221,6 +221,7 @@ func (s *stateObject) SetState(key, value common.Hash) common.Hash {
|
|||
if prev == value {
|
||||
return prev
|
||||
}
|
||||
|
||||
// New value is different, update and journal the change
|
||||
s.db.journal.storageChange(s.address, key, prev, origin)
|
||||
s.setState(key, value, origin)
|
||||
|
|
@ -296,6 +297,7 @@ func (s *stateObject) updateTrie() (Trie, error) {
|
|||
return s.trie, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Retrieve a pretecher populated trie, or fall back to the database. This will
|
||||
// block until all prefetch tasks are done, which are needed for witnesses even
|
||||
// for unmodified state objects.
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ package state
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"iter"
|
||||
"maps"
|
||||
"slices"
|
||||
"sync"
|
||||
|
|
@ -65,6 +66,13 @@ func (m *mutation) isDelete() bool {
|
|||
return m.typ == deletion
|
||||
}
|
||||
|
||||
type BlockStateTransition interface {
|
||||
CommitWithUpdate(block uint64, deleteEmptyObjects bool, noStorageWiping bool) (common.Hash, *stateUpdate, error)
|
||||
IntermediateRoot(deleteEmpty bool) common.Hash
|
||||
Error() error
|
||||
Preimages() map[common.Hash][]byte
|
||||
}
|
||||
|
||||
// StateDB structs within the ethereum protocol are used to store anything
|
||||
// within the merkle trie. StateDBs take care of caching and storing
|
||||
// nested states. It's the general query interface to retrieve:
|
||||
|
|
@ -921,12 +929,33 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
|
|||
// later time.
|
||||
workers.SetLimit(1)
|
||||
}
|
||||
for addr, op := range s.mutations {
|
||||
if op.applied || op.isDelete() {
|
||||
continue
|
||||
var updatedAddrs []common.Address
|
||||
|
||||
if s.blockAccessList != nil {
|
||||
updatedAddrs = s.blockAccessList.ModifiedAccounts()
|
||||
} else {
|
||||
for addr, op := range s.mutations {
|
||||
if op.applied || op.isDelete() {
|
||||
continue
|
||||
}
|
||||
updatedAddrs = append(updatedAddrs, addr)
|
||||
}
|
||||
obj := s.stateObjects[addr] // closure for the task runner below
|
||||
}
|
||||
|
||||
var m sync.Mutex
|
||||
for _, addr := range updatedAddrs {
|
||||
workers.Go(func() error {
|
||||
var obj *stateObject
|
||||
if s.blockAccessList != nil {
|
||||
lastIdx := len(s.blockAccessList.block.Transactions()) + 1
|
||||
diff := s.blockAccessList.readAccountDiff(addr, lastIdx)
|
||||
acct := s.blockAccessList.prestateReader.account(addr)
|
||||
m.Lock()
|
||||
obj = s.blockAccessList.initMutatedObjFromDiff(s, addr, acct, diff)
|
||||
m.Unlock()
|
||||
} else {
|
||||
obj = s.stateObjects[addr] // closure for the task runner below
|
||||
}
|
||||
if s.db.TrieDB().IsVerkle() {
|
||||
obj.updateTrie()
|
||||
} else {
|
||||
|
|
@ -1103,8 +1132,8 @@ func (s *StateDB) clearJournalAndRefund() {
|
|||
// of a specific account. It leverages the associated state snapshot for fast
|
||||
// storage iteration and constructs trie node deletion markers by creating
|
||||
// stack trie with iterated slots.
|
||||
func (s *StateDB) fastDeleteStorage(snaps *snapshot.Tree, addrHash common.Hash, root common.Hash) (map[common.Hash][]byte, map[common.Hash][]byte, *trienode.NodeSet, error) {
|
||||
iter, err := snaps.StorageIterator(s.originalRoot, addrHash, common.Hash{})
|
||||
func fastDeleteStorage(originalRoot common.Hash, snaps *snapshot.Tree, addrHash common.Hash, root common.Hash) (map[common.Hash][]byte, map[common.Hash][]byte, *trienode.NodeSet, error) {
|
||||
iter, err := snaps.StorageIterator(originalRoot, addrHash, common.Hash{})
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
|
@ -1143,8 +1172,8 @@ func (s *StateDB) fastDeleteStorage(snaps *snapshot.Tree, addrHash common.Hash,
|
|||
// slowDeleteStorage serves as a less-efficient alternative to "fastDeleteStorage,"
|
||||
// employed when the associated state snapshot is not available. It iterates the
|
||||
// storage slots along with all internal trie nodes via trie directly.
|
||||
func (s *StateDB) slowDeleteStorage(addr common.Address, addrHash common.Hash, root common.Hash) (map[common.Hash][]byte, map[common.Hash][]byte, *trienode.NodeSet, error) {
|
||||
tr, err := s.db.OpenStorageTrie(s.originalRoot, addr, root, s.trie)
|
||||
func slowDeleteStorage(db Database, trie Trie, originalRoot common.Hash, addr common.Address, addrHash common.Hash, root common.Hash) (map[common.Hash][]byte, map[common.Hash][]byte, *trienode.NodeSet, error) {
|
||||
tr, err := db.OpenStorageTrie(originalRoot, addr, root, trie)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("failed to open storage trie, err: %w", err)
|
||||
}
|
||||
|
|
@ -1179,7 +1208,7 @@ func (s *StateDB) slowDeleteStorage(addr common.Address, addrHash common.Hash, r
|
|||
// The function will make an attempt to utilize an efficient strategy if the
|
||||
// associated state snapshot is reachable; otherwise, it will resort to a less
|
||||
// efficient approach.
|
||||
func (s *StateDB) deleteStorage(addr common.Address, addrHash common.Hash, root common.Hash) (map[common.Hash][]byte, map[common.Hash][]byte, *trienode.NodeSet, error) {
|
||||
func deleteStorage(db Database, trie Trie, addr common.Address, addrHash common.Hash, root, originalRoot common.Hash) (map[common.Hash][]byte, map[common.Hash][]byte, *trienode.NodeSet, error) {
|
||||
var (
|
||||
err error
|
||||
nodes *trienode.NodeSet // the set for trie node mutations (value is nil)
|
||||
|
|
@ -1189,12 +1218,12 @@ func (s *StateDB) deleteStorage(addr common.Address, addrHash common.Hash, root
|
|||
// The fast approach can be failed if the snapshot is not fully
|
||||
// generated, or it's internally corrupted. Fallback to the slow
|
||||
// one just in case.
|
||||
snaps := s.db.Snapshot()
|
||||
snaps := db.Snapshot()
|
||||
if snaps != nil {
|
||||
storages, storageOrigins, nodes, err = s.fastDeleteStorage(snaps, addrHash, root)
|
||||
storages, storageOrigins, nodes, err = fastDeleteStorage(originalRoot, snaps, addrHash, root)
|
||||
}
|
||||
if snaps == nil || err != nil {
|
||||
storages, storageOrigins, nodes, err = s.slowDeleteStorage(addr, addrHash, root)
|
||||
storages, storageOrigins, nodes, err = slowDeleteStorage(db, trie, originalRoot, addr, addrHash, root)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
|
|
@ -1220,39 +1249,38 @@ func (s *StateDB) deleteStorage(addr common.Address, addrHash common.Hash, root
|
|||
// with their values be tracked as original value.
|
||||
// In case (d), **original** account along with its storages should be deleted,
|
||||
// with their values be tracked as original value.
|
||||
func (s *StateDB) handleDestruction(noStorageWiping bool) (map[common.Hash]*accountDelete, []*trienode.NodeSet, error) {
|
||||
func handleDestruction(db Database, trie Trie, noStorageWiping bool, destructions iter.Seq[common.Address], prestates map[common.Address]*types.StateAccount) (map[common.Hash]*accountDelete, []*trienode.NodeSet, error) {
|
||||
var (
|
||||
nodes []*trienode.NodeSet
|
||||
deletes = make(map[common.Hash]*accountDelete)
|
||||
)
|
||||
for addr, prevObj := range s.stateObjectsDestruct {
|
||||
prev := prevObj.origin
|
||||
|
||||
for addr := range destructions {
|
||||
prestate := prestates[addr]
|
||||
// The account was non-existent, and it's marked as destructed in the scope
|
||||
// of block. It can be either case (a) or (b) and will be interpreted as
|
||||
// null->null state transition.
|
||||
// - for (a), skip it without doing anything
|
||||
// - for (b), the resurrected account with nil as original will be handled afterwards
|
||||
if prev == nil {
|
||||
if prestate == nil {
|
||||
continue
|
||||
}
|
||||
// The account was existent, it can be either case (c) or (d).
|
||||
addrHash := crypto.Keccak256Hash(addr.Bytes())
|
||||
op := &accountDelete{
|
||||
address: addr,
|
||||
origin: types.SlimAccountRLP(*prev),
|
||||
origin: types.SlimAccountRLP(*prestate),
|
||||
}
|
||||
deletes[addrHash] = op
|
||||
|
||||
// Short circuit if the origin storage was empty.
|
||||
if prev.Root == types.EmptyRootHash || s.db.TrieDB().IsVerkle() {
|
||||
if prestate.Root == types.EmptyRootHash || db.TrieDB().IsVerkle() {
|
||||
continue
|
||||
}
|
||||
if noStorageWiping {
|
||||
return nil, nil, fmt.Errorf("unexpected storage wiping, %x", addr)
|
||||
}
|
||||
// Remove storage slots belonging to the account.
|
||||
storages, storagesOrigin, set, err := s.deleteStorage(addr, addrHash, prev.Root)
|
||||
storages, storagesOrigin, set, err := deleteStorage(db, trie, addr, addrHash, prestate.Root, prestate.Root)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to delete storage, err: %w", err)
|
||||
}
|
||||
|
|
@ -1327,7 +1355,12 @@ func (s *StateDB) commit(deleteEmptyObjects bool, noStorageWiping bool, blockNum
|
|||
// the same block, account deletions must be processed first. This ensures
|
||||
// that the storage trie nodes deleted during destruction and recreated
|
||||
// during subsequent resurrection can be combined correctly.
|
||||
deletes, delNodes, err := s.handleDestruction(noStorageWiping)
|
||||
var stateAccountsDestruct, destructAccountsOrigins = make(map[common.Address]*types.StateAccount), make(map[common.Address]*types.StateAccount)
|
||||
for addr, obj := range s.stateObjectsDestruct {
|
||||
stateAccountsDestruct[addr] = &obj.data
|
||||
destructAccountsOrigins[addr] = obj.origin
|
||||
}
|
||||
deletes, delNodes, err := handleDestruction(s.db, s.trie, noStorageWiping, maps.Keys(stateAccountsDestruct), destructAccountsOrigins)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -1428,6 +1461,44 @@ func (s *StateDB) commit(deleteEmptyObjects bool, noStorageWiping bool, blockNum
|
|||
return newStateUpdate(noStorageWiping, origin, root, blockNumber, deletes, updates, nodes), nil
|
||||
}
|
||||
|
||||
func flushStateUpdate(d Database, block uint64, update *stateUpdate) (snapshotCommits, trieDBCommits time.Duration, err error) {
|
||||
if db := d.TrieDB().Disk(); db != nil && len(update.codes) > 0 {
|
||||
batch := db.NewBatch()
|
||||
for _, code := range update.codes {
|
||||
rawdb.WriteCode(batch, code.hash, code.blob)
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
}
|
||||
if !update.empty() {
|
||||
// If snapshotting is enabled, update the snapshot tree with this new version
|
||||
if snap := d.Snapshot(); snap != nil && snap.Snapshot(update.originRoot) != nil {
|
||||
start := time.Now()
|
||||
if err := snap.Update(update.root, update.originRoot, update.accounts, update.storages); err != nil {
|
||||
log.Warn("Failed to update snapshot tree", "from", update.originRoot, "to", update.root, "err", err)
|
||||
}
|
||||
// Keep 128 diff layers in the memory, persistent layer is 129th.
|
||||
// - head layer is paired with HEAD state
|
||||
// - head-1 layer is paired with HEAD-1 state
|
||||
// - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state
|
||||
if err := snap.Cap(update.root, TriesInMemory); err != nil {
|
||||
log.Warn("Failed to cap snapshot tree", "root", update.root, "layers", TriesInMemory, "err", err)
|
||||
}
|
||||
snapshotCommits += time.Since(start)
|
||||
}
|
||||
// If trie database is enabled, commit the state update as a new layer
|
||||
if db := d.TrieDB(); db != nil {
|
||||
start := time.Now()
|
||||
if err := db.Update(update.root, update.originRoot, block, update.nodes, update.stateSet()); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
trieDBCommits += time.Since(start)
|
||||
}
|
||||
}
|
||||
return snapshotCommits, trieDBCommits, nil
|
||||
}
|
||||
|
||||
// commitAndFlush is a wrapper of commit which also commits the state mutations
|
||||
// to the configured data stores.
|
||||
func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool, noStorageWiping bool) (*stateUpdate, error) {
|
||||
|
|
@ -1435,41 +1506,12 @@ func (s *StateDB) commitAndFlush(block uint64, deleteEmptyObjects bool, noStorag
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Commit dirty contract code if any exists
|
||||
if db := s.db.TrieDB().Disk(); db != nil && len(ret.codes) > 0 {
|
||||
batch := db.NewBatch()
|
||||
for _, code := range ret.codes {
|
||||
rawdb.WriteCode(batch, code.hash, code.blob)
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if !ret.empty() {
|
||||
// If snapshotting is enabled, update the snapshot tree with this new version
|
||||
if snap := s.db.Snapshot(); snap != nil && snap.Snapshot(ret.originRoot) != nil {
|
||||
start := time.Now()
|
||||
if err := snap.Update(ret.root, ret.originRoot, ret.accounts, ret.storages); err != nil {
|
||||
log.Warn("Failed to update snapshot tree", "from", ret.originRoot, "to", ret.root, "err", err)
|
||||
}
|
||||
// Keep 128 diff layers in the memory, persistent layer is 129th.
|
||||
// - head layer is paired with HEAD state
|
||||
// - head-1 layer is paired with HEAD-1 state
|
||||
// - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state
|
||||
if err := snap.Cap(ret.root, TriesInMemory); err != nil {
|
||||
log.Warn("Failed to cap snapshot tree", "root", ret.root, "layers", TriesInMemory, "err", err)
|
||||
}
|
||||
s.SnapshotCommits += time.Since(start)
|
||||
}
|
||||
// If trie database is enabled, commit the state update as a new layer
|
||||
if db := s.db.TrieDB(); db != nil {
|
||||
start := time.Now()
|
||||
if err := db.Update(ret.root, ret.originRoot, block, ret.nodes, ret.stateSet()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.TrieDBCommits += time.Since(start)
|
||||
}
|
||||
snapshotCommits, trieDBCommits, err := flushStateUpdate(s.db, block, ret)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.SnapshotCommits = snapshotCommits
|
||||
s.TrieDBCommits = trieDBCommits
|
||||
s.reader, _ = s.db.Reader(s.originalRoot)
|
||||
return ret, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ func ExecuteStateless(config *params.ChainConfig, vmconfig vm.Config, block *typ
|
|||
if err != nil {
|
||||
return common.Hash{}, common.Hash{}, err
|
||||
}
|
||||
if err = validator.ValidateState(block, db, res, true, true); err != nil {
|
||||
if err = validator.ValidateState(block, db, res, true); err != nil {
|
||||
return common.Hash{}, common.Hash{}, err
|
||||
}
|
||||
// Almost everything validated, but receipt and state root needs to be returned
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ type Validator interface {
|
|||
ValidateBody(block *types.Block) error
|
||||
|
||||
// ValidateState validates the given statedb and optionally the process result.
|
||||
ValidateState(block *types.Block, state *state.StateDB, res *ProcessResult, validateStateRoot, stateless bool) error
|
||||
ValidateState(block *types.Block, state state.BlockStateTransition, res *ProcessResult, stateless bool) error
|
||||
}
|
||||
|
||||
// Prefetcher is an interface for pre-caching transaction signatures and state.
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ package triedb
|
|||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
|
|
|
|||
Loading…
Reference in a new issue