From e03d97a42052097d817eb13c22a2f1a6459518e1 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Tue, 14 Oct 2025 14:40:04 +0800 Subject: [PATCH] core/txpool/legacypool: fix pricedList updates (#32906) This pr addresses a few issues brought by the #32270 - Add updates to pricedList after dropping transactions. - Remove redundant deletions in queue.evictList, since pool.removeTx(hash, true, true) already performs the removal. - Prevent duplicate addresses during promotion when Reset is not nil. --- core/txpool/legacypool/legacypool.go | 21 +++++------ core/txpool/legacypool/queue.go | 56 +++++++++++++++------------- 2 files changed, 40 insertions(+), 37 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index b36d86dd19..ceedc74a53 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -367,8 +367,7 @@ func (pool *LegacyPool) loop() { // Handle inactive account transaction eviction case <-evict.C: pool.mu.Lock() - evicted := pool.queue.evict(false) - for _, hash := range evicted { + for _, hash := range pool.queue.evictList() { pool.removeTx(hash, true, true) } pool.mu.Unlock() @@ -813,7 +812,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo // // Note, this method assumes the pool lock is held! func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) { - replaced, err := pool.queue.add(hash, tx) + replaced, err := pool.queue.add(tx) if err != nil { return false, err } @@ -1093,7 +1092,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo } } // Transaction is in the future queue - pool.queue.removeTx(addr, tx) + pool.queue.remove(addr, tx) return 0 } @@ -1241,7 +1240,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, } } // Reset needs promote for all addresses - promoteAddrs = append(promoteAddrs, pool.queue.addresses()...) + promoteAddrs = pool.queue.addresses() } // Check for pending transactions for every account that sent new ones promoted := pool.promoteExecutables(promoteAddrs) @@ -1397,9 +1396,9 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction { gasLimit := pool.currentHead.Load().GasLimit promotable, dropped, removedAddresses := pool.queue.promoteExecutables(accounts, gasLimit, pool.currentState, pool.pendingNonces) - promoted := make([]*types.Transaction, 0, len(promotable)) - // promote all promoteable transactions + // promote all promotable transactions + promoted := make([]*types.Transaction, 0, len(promotable)) for _, tx := range promotable { from, _ := pool.signer.Sender(tx) if pool.promoteTx(from, tx.Hash(), tx) { @@ -1411,16 +1410,15 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T for _, hash := range dropped { pool.all.Remove(hash) } + pool.priced.Removed(len(dropped)) // release all accounts that have no more transactions in the pool for _, addr := range removedAddresses { _, hasPending := pool.pending[addr] - _, hasQueued := pool.queue.get(addr) - if !hasPending && !hasQueued { + if !hasPending { pool.reserver.Release(addr) } } - return promoted } @@ -1510,10 +1508,11 @@ func (pool *LegacyPool) truncatePending() { func (pool *LegacyPool) truncateQueue() { removed, removedAddresses := pool.queue.truncate() - // remove all removable transactions + // Remove all removable transactions from the lookup and global price list for _, hash := range removed { pool.all.Remove(hash) } + pool.priced.Removed(len(removed)) for _, addr := range removedAddresses { _, hasPending := pool.pending[addr] diff --git a/core/txpool/legacypool/queue.go b/core/txpool/legacypool/queue.go index b8417064f7..a889debe37 100644 --- a/core/txpool/legacypool/queue.go +++ b/core/txpool/legacypool/queue.go @@ -27,6 +27,8 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// queue manages nonce-gapped transactions that have been validated but are +// not yet processable. type queue struct { config Config signer types.Signer @@ -43,19 +45,17 @@ func newQueue(config Config, signer types.Signer) *queue { } } -func (q *queue) evict(force bool) []common.Hash { - removed := make([]common.Hash, 0) +// evictList returns the hashes of transactions that are old enough to be evicted. +func (q *queue) evictList() []common.Hash { + var removed []common.Hash for addr, list := range q.queued { - // Any transactions old enough should be removed - if force || time.Since(q.beats[addr]) > q.config.Lifetime { - list := list.Flatten() - for _, tx := range list { - q.removeTx(addr, tx) + if time.Since(q.beats[addr]) > q.config.Lifetime { + for _, tx := range list.Flatten() { removed = append(removed, tx.Hash()) } - queuedEvictionMeter.Mark(int64(len(list))) } } + queuedEvictionMeter.Mark(int64(len(removed))) return removed } @@ -100,7 +100,7 @@ func (q *queue) addresses() []common.Address { return addrs } -func (q queue) removeTx(addr common.Address, tx *types.Transaction) { +func (q *queue) remove(addr common.Address, tx *types.Transaction) { if future := q.queued[addr]; future != nil { if txOld := future.txs.Get(tx.Nonce()); txOld != nil && txOld.Hash() != tx.Hash() { // Edge case, a different transaction @@ -118,7 +118,7 @@ func (q queue) removeTx(addr common.Address, tx *types.Transaction) { } } -func (q *queue) add(hash common.Hash, tx *types.Transaction) (*common.Hash, error) { +func (q *queue) add(tx *types.Transaction) (*common.Hash, error) { // Try to insert the transaction into the future queue from, _ := types.Sender(q.signer, tx) // already validated if q.queued[from] == nil { @@ -149,15 +149,17 @@ func (q *queue) add(hash common.Hash, tx *types.Transaction) (*common.Hash, erro // for promotion any that are now executable. It also drops any transactions that are // deemed too old (nonce too low) or too costly (insufficient funds or over gas limit). // -// Returns three lists: all transactions that were removed from the queue and selected -// for promotion; all other transactions that were removed from the queue and dropped; -// the list of addresses removed. +// Returns three lists: +// - all transactions that were removed from the queue and selected for promotion; +// - all other transactions that were removed from the queue and dropped; +// - the list of addresses removed. func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, currentState *state.StateDB, nonces *noncer) ([]*types.Transaction, []common.Hash, []common.Address) { // Track the promotable transactions to broadcast them at once - var promotable []*types.Transaction - var dropped []common.Hash - var removedAddresses []common.Address - + var ( + promotable []*types.Transaction + dropped []common.Hash + removedAddresses []common.Address + ) // Iterate over all accounts and promote any executable transactions for _, addr := range accounts { list := q.queued[addr] @@ -170,6 +172,7 @@ func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, c dropped = append(dropped, tx.Hash()) } log.Trace("Removing old queued transactions", "count", len(forwards)) + // Drop all transactions that are too costly (low balance or out of gas) drops, _ := list.Filter(currentState.GetBalance(addr), gasLimit) for _, tx := range drops { @@ -205,9 +208,9 @@ func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, c } // truncate drops the oldest transactions from the queue until the total -// number is below the configured limit. -// Returns the hashes of all dropped transactions, and the addresses of -// accounts that became empty due to the truncation. +// number is below the configured limit. Returns the hashes of all dropped +// transactions and the addresses of accounts that became empty due to +// the truncation. func (q *queue) truncate() ([]common.Hash, []common.Address) { queued := uint64(0) for _, list := range q.queued { @@ -223,10 +226,12 @@ func (q *queue) truncate() ([]common.Hash, []common.Address) { addresses = append(addresses, addressByHeartbeat{addr, q.beats[addr]}) } sort.Sort(sort.Reverse(addresses)) - removed := make([]common.Hash, 0) - removedAddresses := make([]common.Address, 0) // Drop transactions until the total is below the limit + var ( + removed = make([]common.Hash, 0) + removedAddresses = make([]common.Address, 0) + ) for drop := queued - q.config.GlobalQueue; drop > 0 && len(addresses) > 0; { addr := addresses[len(addresses)-1] list := q.queued[addr.address] @@ -236,7 +241,7 @@ func (q *queue) truncate() ([]common.Hash, []common.Address) { // Drop all transactions if they are less than the overflow if size := uint64(list.Len()); size <= drop { for _, tx := range list.Flatten() { - q.removeTx(addr.address, tx) + q.remove(addr.address, tx) removed = append(removed, tx.Hash()) } drop -= size @@ -247,14 +252,13 @@ func (q *queue) truncate() ([]common.Hash, []common.Address) { // Otherwise drop only last few transactions txs := list.Flatten() for i := len(txs) - 1; i >= 0 && drop > 0; i-- { - q.removeTx(addr.address, txs[i]) + q.remove(addr.address, txs[i]) removed = append(removed, txs[i].Hash()) drop-- queuedRateLimitMeter.Mark(1) } } - - // no need to clear empty accounts, removeTx already does that + // No need to clear empty accounts, remove already does that return removed, removedAddresses }