diff --git a/core/blockchain.go b/core/blockchain.go index d41f301243..7b2716902e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -111,6 +111,7 @@ var ( blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil) blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil) + blockPrefetchHeavyTxsMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/heavy", nil) errInsertionInterrupted = errors.New("insertion is interrupted") errChainStopped = errors.New("blockchain is stopped") diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 1c738c1e38..eb3f1a0ddd 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -18,6 +18,7 @@ package core import ( "bytes" + "math/rand" "runtime" "sync/atomic" @@ -29,6 +30,20 @@ import ( "golang.org/x/sync/errgroup" ) +const ( + // heavyTransactionThreshold defines the threshold for classifying a + // transaction as heavy. As defined, the transaction consumes more than + // 20% of the block's GasUsed is regarded as heavy. + // + // Heavy transactions are prioritized for prefetching to allow additional + // preparation time. + heavyTransactionThreshold = 20 + + // heavyTransactionPriority defines the probability with which the heavy + // transactions will be scheduled first for prefetching. + heavyTransactionPriority = 40 +) + // statePrefetcher is a basic Prefetcher that executes transactions from a block // on top of the parent state, aiming to prefetch potentially useful state data // from disk. Transactions are executed in parallel to fully leverage the @@ -59,9 +74,59 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c ) workers.SetLimit(max(1, 4*runtime.NumCPU()/5)) // Aggressively run the prefetching - // Iterate over and process the individual transactions - for i, tx := range block.Transactions() { - stateCpy := statedb.Copy() // closure + var ( + processed = make(map[common.Hash]struct{}, len(block.Transactions())) + heavyTxs = make(chan *types.Transaction, len(block.Transactions())) + normalTxs = make(chan *types.Transaction, len(block.Transactions())) + threshold = min(block.GasUsed()*heavyTransactionThreshold/100, params.MaxTxGas/2) + ) + for _, tx := range block.Transactions() { + // Note: gasLimit is not equivalent to gasUsed. Ideally, transaction heaviness + // should be measured using gasUsed. However, gasUsed is unknown prior to + // execution, so gasLimit is used as the indicator instead. This allows transaction + // senders to inflate gasLimit to gain higher prefetch priority, but this + // trade-off is unavoidable. + if tx.Gas() > threshold { + heavyTxs <- tx + } + // The heavy transaction will also be emitted with the normal prefetching + // ordering, depends on in which track it will be selected first. + normalTxs <- tx + } + blockPrefetchHeavyTxsMeter.Mark(int64(len(heavyTxs))) + + fetchTx := func() (*types.Transaction, bool) { + // Pick the heavy transaction first based on the pre-defined probability + if rand.Intn(100) < heavyTransactionPriority { + select { + case tx := <-heavyTxs: + return tx, false + default: + } + } + // No more heavy transaction, or no priority for them, pick the transaction + // with normal order. + select { + case tx := <-normalTxs: + return tx, false + default: + return nil, true + } + } + for { + next, done := fetchTx() + if done { + break + } + if _, exists := processed[next.Hash()]; exists { + continue + } + processed[next.Hash()] = struct{}{} + + var ( + stateCpy = statedb.Copy() // closure + tx = next // closure + ) workers.Go(func() error { // If block precaching was interrupted, abort if interrupt != nil && interrupt.Load() { @@ -103,7 +168,9 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c // Disable the nonce check msg.SkipNonceChecks = true - stateCpy.SetTxContext(tx.Hash(), i) + // The transaction index is assigned blindly with zero, it's fine + // for prefetching only. + stateCpy.SetTxContext(tx.Hash(), 0) // We attempt to apply a transaction. The goal is not to execute // the transaction successfully, rather to warm up touched data slots.