diff --git a/core/blockchain.go b/core/blockchain.go index 858eceb630..47e26d61dc 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -108,6 +108,7 @@ var ( blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil) blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil) + blockPrefetchHeavyTxsMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/heavy", nil) errInsertionInterrupted = errors.New("insertion is interrupted") errChainStopped = errors.New("blockchain is stopped") diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 1c738c1e38..3273fa05e2 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -18,6 +18,7 @@ package core import ( "bytes" + "math/rand" "runtime" "sync/atomic" @@ -29,6 +30,27 @@ import ( "golang.org/x/sync/errgroup" ) +const ( + // heavyTransactionThreshold defines the threshold for classifying a + // transaction as heavy. As defined, the transaction consumes more than + // 20% of the block's GasUsed is regarded as heavy. + // + // Heavy transactions are prioritized for prefetching to allow additional + // preparation time. + heavyTransactionThreshold = 20 + + // heavyTransactionPriority defines the probability with which the heavy + // transactions will be scheduled first for prefetching. + heavyTransactionPriority = 30 +) + +// isHeavyTransaction returns an indicator whether the transaction is regarded +// as heavy or not. +func isHeavyTransaction(txGasLimit uint64, blockGasUsed uint64) bool { + threshold := blockGasUsed * heavyTransactionThreshold / 100 + return txGasLimit >= threshold +} + // statePrefetcher is a basic Prefetcher that executes transactions from a block // on top of the parent state, aiming to prefetch potentially useful state data // from disk. Transactions are executed in parallel to fully leverage the @@ -59,8 +81,49 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c ) workers.SetLimit(max(1, 4*runtime.NumCPU()/5)) // Aggressively run the prefetching - // Iterate over and process the individual transactions - for i, tx := range block.Transactions() { + var ( + processed = make(map[common.Hash]struct{}, len(block.Transactions())) + heavyTxs = make(chan *types.Transaction, len(block.Transactions())) + normalTxs = make(chan *types.Transaction, len(block.Transactions())) + ) + for _, tx := range block.Transactions() { + // Note, the gasLimit is not equivalent with the gasUsed. Theoretically + // we should measure the transaction heaviness based on the gasUsed. + // Unfortunately this field is still unknown without execution, so use + // gasLimit instead. + if isHeavyTransaction(tx.Gas(), block.GasUsed()) { + heavyTxs <- tx + } + normalTxs <- tx + } + blockPrefetchHeavyTxsMeter.Mark(int64(len(heavyTxs))) + + fetchTx := func() (*types.Transaction, bool) { + // Pick the heavy transaction first based on the pre-defined probability + if rand.Intn(100) < heavyTransactionPriority { + select { + case tx := <-heavyTxs: + return tx, false + default: + } + } + // No more heavy transaction, or no priority for them, pick the transaction + // with normal order. + select { + case tx := <-normalTxs: + return tx, false + default: + return nil, true + } + } + for { + tx, done := fetchTx() + if done { + break + } + if _, exists := processed[tx.Hash()]; exists { + continue + } stateCpy := statedb.Copy() // closure workers.Go(func() error { // If block precaching was interrupted, abort @@ -103,7 +166,9 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c // Disable the nonce check msg.SkipNonceChecks = true - stateCpy.SetTxContext(tx.Hash(), i) + // The transaction index is assigned blindly with zero, it's fine + // for prefetching only. + stateCpy.SetTxContext(tx.Hash(), 0) // We attempt to apply a transaction. The goal is not to execute // the transaction successfully, rather to warm up touched data slots.