core/txpool/legacypool: fix pricedList updates (#32906)

This pr addresses a few issues brought by the #32270 

- Add updates to pricedList after dropping transactions.
- Remove redundant deletions in queue.evictList, since
pool.removeTx(hash, true, true) already performs the removal.
- Prevent duplicate addresses during promotion when Reset is not nil.
This commit is contained in:
rjl493456442 2025-10-14 14:40:04 +08:00 committed by GitHub
parent fb8d2298b6
commit e03d97a420
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 40 additions and 37 deletions

View file

@ -367,8 +367,7 @@ func (pool *LegacyPool) loop() {
// Handle inactive account transaction eviction // Handle inactive account transaction eviction
case <-evict.C: case <-evict.C:
pool.mu.Lock() pool.mu.Lock()
evicted := pool.queue.evict(false) for _, hash := range pool.queue.evictList() {
for _, hash := range evicted {
pool.removeTx(hash, true, true) pool.removeTx(hash, true, true)
} }
pool.mu.Unlock() pool.mu.Unlock()
@ -813,7 +812,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo
// //
// Note, this method assumes the pool lock is held! // Note, this method assumes the pool lock is held!
func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) { func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) {
replaced, err := pool.queue.add(hash, tx) replaced, err := pool.queue.add(tx)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -1093,7 +1092,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
} }
} }
// Transaction is in the future queue // Transaction is in the future queue
pool.queue.removeTx(addr, tx) pool.queue.remove(addr, tx)
return 0 return 0
} }
@ -1241,7 +1240,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
} }
} }
// Reset needs promote for all addresses // Reset needs promote for all addresses
promoteAddrs = append(promoteAddrs, pool.queue.addresses()...) promoteAddrs = pool.queue.addresses()
} }
// Check for pending transactions for every account that sent new ones // Check for pending transactions for every account that sent new ones
promoted := pool.promoteExecutables(promoteAddrs) promoted := pool.promoteExecutables(promoteAddrs)
@ -1397,9 +1396,9 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction { func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
gasLimit := pool.currentHead.Load().GasLimit gasLimit := pool.currentHead.Load().GasLimit
promotable, dropped, removedAddresses := pool.queue.promoteExecutables(accounts, gasLimit, pool.currentState, pool.pendingNonces) promotable, dropped, removedAddresses := pool.queue.promoteExecutables(accounts, gasLimit, pool.currentState, pool.pendingNonces)
promoted := make([]*types.Transaction, 0, len(promotable))
// promote all promoteable transactions // promote all promotable transactions
promoted := make([]*types.Transaction, 0, len(promotable))
for _, tx := range promotable { for _, tx := range promotable {
from, _ := pool.signer.Sender(tx) from, _ := pool.signer.Sender(tx)
if pool.promoteTx(from, tx.Hash(), tx) { if pool.promoteTx(from, tx.Hash(), tx) {
@ -1411,16 +1410,15 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
for _, hash := range dropped { for _, hash := range dropped {
pool.all.Remove(hash) pool.all.Remove(hash)
} }
pool.priced.Removed(len(dropped))
// release all accounts that have no more transactions in the pool // release all accounts that have no more transactions in the pool
for _, addr := range removedAddresses { for _, addr := range removedAddresses {
_, hasPending := pool.pending[addr] _, hasPending := pool.pending[addr]
_, hasQueued := pool.queue.get(addr) if !hasPending {
if !hasPending && !hasQueued {
pool.reserver.Release(addr) pool.reserver.Release(addr)
} }
} }
return promoted return promoted
} }
@ -1510,10 +1508,11 @@ func (pool *LegacyPool) truncatePending() {
func (pool *LegacyPool) truncateQueue() { func (pool *LegacyPool) truncateQueue() {
removed, removedAddresses := pool.queue.truncate() removed, removedAddresses := pool.queue.truncate()
// remove all removable transactions // Remove all removable transactions from the lookup and global price list
for _, hash := range removed { for _, hash := range removed {
pool.all.Remove(hash) pool.all.Remove(hash)
} }
pool.priced.Removed(len(removed))
for _, addr := range removedAddresses { for _, addr := range removedAddresses {
_, hasPending := pool.pending[addr] _, hasPending := pool.pending[addr]

View file

@ -27,6 +27,8 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
// queue manages nonce-gapped transactions that have been validated but are
// not yet processable.
type queue struct { type queue struct {
config Config config Config
signer types.Signer signer types.Signer
@ -43,19 +45,17 @@ func newQueue(config Config, signer types.Signer) *queue {
} }
} }
func (q *queue) evict(force bool) []common.Hash { // evictList returns the hashes of transactions that are old enough to be evicted.
removed := make([]common.Hash, 0) func (q *queue) evictList() []common.Hash {
var removed []common.Hash
for addr, list := range q.queued { for addr, list := range q.queued {
// Any transactions old enough should be removed if time.Since(q.beats[addr]) > q.config.Lifetime {
if force || time.Since(q.beats[addr]) > q.config.Lifetime { for _, tx := range list.Flatten() {
list := list.Flatten()
for _, tx := range list {
q.removeTx(addr, tx)
removed = append(removed, tx.Hash()) removed = append(removed, tx.Hash())
} }
queuedEvictionMeter.Mark(int64(len(list)))
} }
} }
queuedEvictionMeter.Mark(int64(len(removed)))
return removed return removed
} }
@ -100,7 +100,7 @@ func (q *queue) addresses() []common.Address {
return addrs return addrs
} }
func (q queue) removeTx(addr common.Address, tx *types.Transaction) { func (q *queue) remove(addr common.Address, tx *types.Transaction) {
if future := q.queued[addr]; future != nil { if future := q.queued[addr]; future != nil {
if txOld := future.txs.Get(tx.Nonce()); txOld != nil && txOld.Hash() != tx.Hash() { if txOld := future.txs.Get(tx.Nonce()); txOld != nil && txOld.Hash() != tx.Hash() {
// Edge case, a different transaction // Edge case, a different transaction
@ -118,7 +118,7 @@ func (q queue) removeTx(addr common.Address, tx *types.Transaction) {
} }
} }
func (q *queue) add(hash common.Hash, tx *types.Transaction) (*common.Hash, error) { func (q *queue) add(tx *types.Transaction) (*common.Hash, error) {
// Try to insert the transaction into the future queue // Try to insert the transaction into the future queue
from, _ := types.Sender(q.signer, tx) // already validated from, _ := types.Sender(q.signer, tx) // already validated
if q.queued[from] == nil { if q.queued[from] == nil {
@ -149,15 +149,17 @@ func (q *queue) add(hash common.Hash, tx *types.Transaction) (*common.Hash, erro
// for promotion any that are now executable. It also drops any transactions that are // for promotion any that are now executable. It also drops any transactions that are
// deemed too old (nonce too low) or too costly (insufficient funds or over gas limit). // deemed too old (nonce too low) or too costly (insufficient funds or over gas limit).
// //
// Returns three lists: all transactions that were removed from the queue and selected // Returns three lists:
// for promotion; all other transactions that were removed from the queue and dropped; // - all transactions that were removed from the queue and selected for promotion;
// the list of addresses removed. // - all other transactions that were removed from the queue and dropped;
// - the list of addresses removed.
func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, currentState *state.StateDB, nonces *noncer) ([]*types.Transaction, []common.Hash, []common.Address) { func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, currentState *state.StateDB, nonces *noncer) ([]*types.Transaction, []common.Hash, []common.Address) {
// Track the promotable transactions to broadcast them at once // Track the promotable transactions to broadcast them at once
var promotable []*types.Transaction var (
var dropped []common.Hash promotable []*types.Transaction
var removedAddresses []common.Address dropped []common.Hash
removedAddresses []common.Address
)
// Iterate over all accounts and promote any executable transactions // Iterate over all accounts and promote any executable transactions
for _, addr := range accounts { for _, addr := range accounts {
list := q.queued[addr] list := q.queued[addr]
@ -170,6 +172,7 @@ func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, c
dropped = append(dropped, tx.Hash()) dropped = append(dropped, tx.Hash())
} }
log.Trace("Removing old queued transactions", "count", len(forwards)) log.Trace("Removing old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas) // Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(currentState.GetBalance(addr), gasLimit) drops, _ := list.Filter(currentState.GetBalance(addr), gasLimit)
for _, tx := range drops { for _, tx := range drops {
@ -205,9 +208,9 @@ func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, c
} }
// truncate drops the oldest transactions from the queue until the total // truncate drops the oldest transactions from the queue until the total
// number is below the configured limit. // number is below the configured limit. Returns the hashes of all dropped
// Returns the hashes of all dropped transactions, and the addresses of // transactions and the addresses of accounts that became empty due to
// accounts that became empty due to the truncation. // the truncation.
func (q *queue) truncate() ([]common.Hash, []common.Address) { func (q *queue) truncate() ([]common.Hash, []common.Address) {
queued := uint64(0) queued := uint64(0)
for _, list := range q.queued { for _, list := range q.queued {
@ -223,10 +226,12 @@ func (q *queue) truncate() ([]common.Hash, []common.Address) {
addresses = append(addresses, addressByHeartbeat{addr, q.beats[addr]}) addresses = append(addresses, addressByHeartbeat{addr, q.beats[addr]})
} }
sort.Sort(sort.Reverse(addresses)) sort.Sort(sort.Reverse(addresses))
removed := make([]common.Hash, 0)
removedAddresses := make([]common.Address, 0)
// Drop transactions until the total is below the limit // Drop transactions until the total is below the limit
var (
removed = make([]common.Hash, 0)
removedAddresses = make([]common.Address, 0)
)
for drop := queued - q.config.GlobalQueue; drop > 0 && len(addresses) > 0; { for drop := queued - q.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1] addr := addresses[len(addresses)-1]
list := q.queued[addr.address] list := q.queued[addr.address]
@ -236,7 +241,7 @@ func (q *queue) truncate() ([]common.Hash, []common.Address) {
// Drop all transactions if they are less than the overflow // Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop { if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() { for _, tx := range list.Flatten() {
q.removeTx(addr.address, tx) q.remove(addr.address, tx)
removed = append(removed, tx.Hash()) removed = append(removed, tx.Hash())
} }
drop -= size drop -= size
@ -247,14 +252,13 @@ func (q *queue) truncate() ([]common.Hash, []common.Address) {
// Otherwise drop only last few transactions // Otherwise drop only last few transactions
txs := list.Flatten() txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- { for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
q.removeTx(addr.address, txs[i]) q.remove(addr.address, txs[i])
removed = append(removed, txs[i].Hash()) removed = append(removed, txs[i].Hash())
drop-- drop--
queuedRateLimitMeter.Mark(1) queuedRateLimitMeter.Mark(1)
} }
} }
// No need to clear empty accounts, remove already does that
// no need to clear empty accounts, removeTx already does that
return removed, removedAddresses return removed, removedAddresses
} }