mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-03-31 15:15:56 +00:00
core: prioritize the heavy transaction for prefetching
This commit is contained in:
parent
b2843a11d6
commit
93f1369fa3
2 changed files with 69 additions and 3 deletions
|
|
@ -108,6 +108,7 @@ var (
|
|||
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
|
||||
blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil)
|
||||
blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil)
|
||||
blockPrefetchHeavyTxsMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/heavy", nil)
|
||||
|
||||
errInsertionInterrupted = errors.New("insertion is interrupted")
|
||||
errChainStopped = errors.New("blockchain is stopped")
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package core
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"math/rand"
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
|
||||
|
|
@ -29,6 +30,27 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
// heavyTransactionThreshold defines the threshold for classifying a
|
||||
// transaction as heavy. As defined, the transaction consumes more than
|
||||
// 20% of the block's GasUsed is regarded as heavy.
|
||||
//
|
||||
// Heavy transactions are prioritized for prefetching to allow additional
|
||||
// preparation time.
|
||||
heavyTransactionThreshold = 20
|
||||
|
||||
// heavyTransactionPriority defines the probability with which the heavy
|
||||
// transactions will be scheduled first for prefetching.
|
||||
heavyTransactionPriority = 30
|
||||
)
|
||||
|
||||
// isHeavyTransaction returns an indicator whether the transaction is regarded
|
||||
// as heavy or not.
|
||||
func isHeavyTransaction(txGasLimit uint64, blockGasUsed uint64) bool {
|
||||
threshold := blockGasUsed * heavyTransactionThreshold / 100
|
||||
return txGasLimit >= threshold
|
||||
}
|
||||
|
||||
// statePrefetcher is a basic Prefetcher that executes transactions from a block
|
||||
// on top of the parent state, aiming to prefetch potentially useful state data
|
||||
// from disk. Transactions are executed in parallel to fully leverage the
|
||||
|
|
@ -59,8 +81,49 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
|
|||
)
|
||||
workers.SetLimit(max(1, 4*runtime.NumCPU()/5)) // Aggressively run the prefetching
|
||||
|
||||
// Iterate over and process the individual transactions
|
||||
for i, tx := range block.Transactions() {
|
||||
var (
|
||||
processed = make(map[common.Hash]struct{}, len(block.Transactions()))
|
||||
heavyTxs = make(chan *types.Transaction, len(block.Transactions()))
|
||||
normalTxs = make(chan *types.Transaction, len(block.Transactions()))
|
||||
)
|
||||
for _, tx := range block.Transactions() {
|
||||
// Note, the gasLimit is not equivalent with the gasUsed. Theoretically
|
||||
// we should measure the transaction heaviness based on the gasUsed.
|
||||
// Unfortunately this field is still unknown without execution, so use
|
||||
// gasLimit instead.
|
||||
if isHeavyTransaction(tx.Gas(), block.GasUsed()) {
|
||||
heavyTxs <- tx
|
||||
}
|
||||
normalTxs <- tx
|
||||
}
|
||||
blockPrefetchHeavyTxsMeter.Mark(int64(len(heavyTxs)))
|
||||
|
||||
fetchTx := func() (*types.Transaction, bool) {
|
||||
// Pick the heavy transaction first based on the pre-defined probability
|
||||
if rand.Intn(100) < heavyTransactionPriority {
|
||||
select {
|
||||
case tx := <-heavyTxs:
|
||||
return tx, false
|
||||
default:
|
||||
}
|
||||
}
|
||||
// No more heavy transaction, or no priority for them, pick the transaction
|
||||
// with normal order.
|
||||
select {
|
||||
case tx := <-normalTxs:
|
||||
return tx, false
|
||||
default:
|
||||
return nil, true
|
||||
}
|
||||
}
|
||||
for {
|
||||
tx, done := fetchTx()
|
||||
if done {
|
||||
break
|
||||
}
|
||||
if _, exists := processed[tx.Hash()]; exists {
|
||||
continue
|
||||
}
|
||||
stateCpy := statedb.Copy() // closure
|
||||
workers.Go(func() error {
|
||||
// If block precaching was interrupted, abort
|
||||
|
|
@ -103,7 +166,9 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c
|
|||
// Disable the nonce check
|
||||
msg.SkipNonceChecks = true
|
||||
|
||||
stateCpy.SetTxContext(tx.Hash(), i)
|
||||
// The transaction index is assigned blindly with zero, it's fine
|
||||
// for prefetching only.
|
||||
stateCpy.SetTxContext(tx.Hash(), 0)
|
||||
|
||||
// We attempt to apply a transaction. The goal is not to execute
|
||||
// the transaction successfully, rather to warm up touched data slots.
|
||||
|
|
|
|||
Loading…
Reference in a new issue