core/txpool/legacypool: move queue out of main txpool (#32270)

This PR move the queue out of the main transaction pool.
For now there should be no functional changes.
I see this as a first step to refactor the legacypool and make the queue
a fully separate concept from the main pending pool.

---------

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
Co-authored-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Marius van der Wijden 2025-10-13 19:07:36 +02:00 committed by GitHub
parent b87581f297
commit 7b693ea17c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 370 additions and 209 deletions

View file

@ -23,7 +23,6 @@ import (
"math"
"math/big"
"slices"
"sort"
"sync"
"sync/atomic"
"time"
@ -238,11 +237,10 @@ type LegacyPool struct {
pendingNonces *noncer // Pending state tracking virtual nonces
reserver txpool.Reserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price
pending map[common.Address]*list // All currently processable transactions
queue *queue
all *lookup // All transactions to allow lookups
priced *pricedList // All transactions sorted by price
reqResetCh chan *txpoolResetRequest
reqPromoteCh chan *accountSet
@ -266,14 +264,14 @@ func New(config Config, chain BlockChain) *LegacyPool {
config = (&config).sanitize()
// Create the transaction pool with its initial settings
signer := types.LatestSigner(chain.Config())
pool := &LegacyPool{
config: config,
chain: chain,
chainconfig: chain.Config(),
signer: types.LatestSigner(chain.Config()),
signer: signer,
pending: make(map[common.Address]*list),
queue: make(map[common.Address]*list),
beats: make(map[common.Address]time.Time),
queue: newQueue(config, signer),
all: newLookup(),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
@ -369,15 +367,9 @@ func (pool *LegacyPool) loop() {
// Handle inactive account transaction eviction
case <-evict.C:
pool.mu.Lock()
for addr := range pool.queue {
// Any old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.queue[addr].Flatten()
for _, tx := range list {
pool.removeTx(tx.Hash(), true, true)
}
queuedEvictionMeter.Mark(int64(len(list)))
}
evicted := pool.queue.evict(false)
for _, hash := range evicted {
pool.removeTx(hash, true, true)
}
pool.mu.Unlock()
}
@ -459,11 +451,7 @@ func (pool *LegacyPool) stats() (int, int) {
for _, list := range pool.pending {
pending += list.Len()
}
queued := 0
for _, list := range pool.queue {
queued += list.Len()
}
return pending, queued
return pending, pool.queue.stats()
}
// Content retrieves the data content of the transaction pool, returning all the
@ -476,10 +464,7 @@ func (pool *LegacyPool) Content() (map[common.Address][]*types.Transaction, map[
for addr, list := range pool.pending {
pending[addr] = list.Flatten()
}
queued := make(map[common.Address][]*types.Transaction, len(pool.queue))
for addr, list := range pool.queue {
queued[addr] = list.Flatten()
}
queued := pool.queue.content()
return pending, queued
}
@ -493,10 +478,7 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
if list, ok := pool.pending[addr]; ok {
pending = list.Flatten()
}
var queued []*types.Transaction
if list, ok := pool.queue[addr]; ok {
queued = list.Flatten()
}
queued := pool.queue.contentFrom(addr)
return pending, queued
}
@ -644,7 +626,7 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error {
if pending := pool.pending[auth]; pending != nil {
count += pending.Len()
}
if queue := pool.queue[auth]; queue != nil {
if queue, ok := pool.queue.get(auth); ok {
count += queue.Len()
}
if count > 1 {
@ -691,7 +673,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
// only by this subpool until all transactions are evicted
var (
_, hasPending = pool.pending[from]
_, hasQueued = pool.queue[from]
_, hasQueued = pool.queue.get(from)
)
if !hasPending && !hasQueued {
if err := pool.reserver.Hold(from); err != nil {
@ -790,7 +772,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
// Successful promotion, bump the heartbeat
pool.beats[from] = time.Now()
pool.queue.bump(from)
return old != nil, nil
}
// New transaction isn't replacing a pending one, push into queue
@ -815,7 +797,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo
}
// The transaction has a nonce gap with pending list, it's only considered
// as executable if transactions in queue can fill up the nonce gap.
queue, ok := pool.queue[from]
queue, ok := pool.queue.get(from)
if !ok {
return true
}
@ -831,25 +813,12 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo
//
// Note, this method assumes the pool lock is held!
func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil {
pool.queue[from] = newList(false)
replaced, err := pool.queue.add(hash, tx)
if err != nil {
return false, err
}
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
queuedDiscardMeter.Mark(1)
return false, txpool.ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
if old != nil {
pool.all.Remove(old.Hash())
pool.priced.Removed(1)
queuedReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the queued counter
queuedGauge.Inc(1)
if replaced != nil {
pool.removeTx(*replaced, true, true)
}
// If the transaction isn't in lookup set but it's expected to be there,
// show the error log.
@ -860,11 +829,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAl
pool.all.Add(tx)
pool.priced.Put(tx)
}
// If we never record the heartbeat, do it right now.
if _, exist := pool.beats[from]; !exist {
pool.beats[from] = time.Now()
}
return old != nil, nil
return replaced != nil, nil
}
// promoteTx adds a transaction to the pending (processable) list of transactions
@ -899,7 +864,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
pool.pendingNonces.set(addr, tx.Nonce()+1)
// Successful promotion, bump the heartbeat
pool.beats[addr] = time.Now()
pool.queue.bump(addr)
return true
}
@ -1019,7 +984,7 @@ func (pool *LegacyPool) Status(hash common.Hash) txpool.TxStatus {
if txList := pool.pending[from]; txList != nil && txList.txs.items[tx.Nonce()] != nil {
return txpool.TxStatusPending
} else if txList := pool.queue[from]; txList != nil && txList.txs.items[tx.Nonce()] != nil {
} else if txList, ok := pool.queue.get(from); ok && txList.txs.items[tx.Nonce()] != nil {
return txpool.TxStatusQueued
}
return txpool.TxStatusUnknown
@ -1096,7 +1061,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
defer func() {
var (
_, hasPending = pool.pending[addr]
_, hasQueued = pool.queue[addr]
_, hasQueued = pool.queue.get(addr)
)
if !hasPending && !hasQueued {
pool.reserver.Release(addr)
@ -1128,16 +1093,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
}
}
// Transaction is in the future queue
if future := pool.queue[addr]; future != nil {
if removed, _ := future.Remove(tx); removed {
// Reduce the queued counter
queuedGauge.Dec(1)
}
if future.Empty() {
delete(pool.queue, addr)
delete(pool.beats, addr)
}
}
pool.queue.removeTx(addr, tx)
return 0
}
@ -1285,10 +1241,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
}
}
// Reset needs promote for all addresses
promoteAddrs = make([]common.Address, 0, len(pool.queue))
for addr := range pool.queue {
promoteAddrs = append(promoteAddrs, addr)
}
promoteAddrs = append(promoteAddrs, pool.queue.addresses()...)
}
// Check for pending transactions for every account that sent new ones
promoted := pool.promoteExecutables(promoteAddrs)
@ -1442,62 +1395,32 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.Transaction {
// Track the promoted transactions to broadcast them at once
var promoted []*types.Transaction
// Iterate over all accounts and promote any executable transactions
gasLimit := pool.currentHead.Load().GasLimit
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
forwards := list.Forward(pool.currentState.GetNonce(addr))
for _, tx := range forwards {
pool.all.Remove(tx.Hash())
}
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
for _, tx := range drops {
pool.all.Remove(tx.Hash())
}
log.Trace("Removed unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))
promotable, dropped, removedAddresses := pool.queue.promoteExecutables(accounts, gasLimit, pool.currentState, pool.pendingNonces)
promoted := make([]*types.Transaction, 0, len(promotable))
// Gather all executable transactions and promote them
readies := list.Ready(pool.pendingNonces.get(addr))
for _, tx := range readies {
hash := tx.Hash()
if pool.promoteTx(addr, hash, tx) {
promoted = append(promoted, tx)
}
}
log.Trace("Promoted queued transactions", "count", len(promoted))
queuedGauge.Dec(int64(len(readies)))
// Drop all transactions over the allowed limit
var caps = list.Cap(int(pool.config.AccountQueue))
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
delete(pool.beats, addr)
if _, ok := pool.pending[addr]; !ok {
pool.reserver.Release(addr)
}
// promote all promoteable transactions
for _, tx := range promotable {
from, _ := pool.signer.Sender(tx)
if pool.promoteTx(from, tx.Hash(), tx) {
promoted = append(promoted, tx)
}
}
// remove all removable transactions
for _, hash := range dropped {
pool.all.Remove(hash)
}
// release all accounts that have no more transactions in the pool
for _, addr := range removedAddresses {
_, hasPending := pool.pending[addr]
_, hasQueued := pool.queue.get(addr)
if !hasPending && !hasQueued {
pool.reserver.Release(addr)
}
}
return promoted
}
@ -1585,43 +1508,17 @@ func (pool *LegacyPool) truncatePending() {
// truncateQueue drops the oldest transactions in the queue if the pool is above the global queue limit.
func (pool *LegacyPool) truncateQueue() {
queued := uint64(0)
for _, list := range pool.queue {
queued += uint64(list.Len())
}
if queued <= pool.config.GlobalQueue {
return
removed, removedAddresses := pool.queue.truncate()
// remove all removable transactions
for _, hash := range removed {
pool.all.Remove(hash)
}
// Sort all accounts with queued transactions by heartbeat
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
sort.Sort(sort.Reverse(addresses))
// Drop transactions until the total is below the limit
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1]
list := pool.queue[addr.address]
addresses = addresses[:len(addresses)-1]
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash(), true, true)
}
drop -= size
queuedRateLimitMeter.Mark(int64(size))
continue
}
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true, true)
drop--
queuedRateLimitMeter.Mark(1)
for _, addr := range removedAddresses {
_, hasPending := pool.pending[addr]
if !hasPending {
pool.reserver.Release(addr)
}
}
}
@ -1679,25 +1576,13 @@ func (pool *LegacyPool) demoteUnexecutables() {
// Delete the entire pending entry if it became empty.
if list.Empty() {
delete(pool.pending, addr)
if _, ok := pool.queue[addr]; !ok {
if _, ok := pool.queue.get(addr); !ok {
pool.reserver.Release(addr)
}
}
}
}
// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.Address
heartbeat time.Time
}
type addressesByHeartbeat []addressByHeartbeat
func (a addressesByHeartbeat) Len() int { return len(a) }
func (a addressesByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
func (a addressesByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// accountSet is simply a set of addresses to check for existence, and a signer
// capable of deriving addresses from transactions.
type accountSet struct {
@ -1938,17 +1823,17 @@ func (pool *LegacyPool) Clear() {
// acquire the subpool lock until the transaction addition is completed.
for addr := range pool.pending {
if _, ok := pool.queue[addr]; !ok {
if _, ok := pool.queue.get(addr); !ok {
pool.reserver.Release(addr)
}
}
for addr := range pool.queue {
for _, addr := range pool.queue.addresses() {
pool.reserver.Release(addr)
}
pool.all.Clear()
pool.priced.Reheap()
pool.pending = make(map[common.Address]*list)
pool.queue = make(map[common.Address]*list)
pool.queue = newQueue(pool.config, pool.signer)
pool.pendingNonces = newNoncer(pool.currentState)
}

View file

@ -466,8 +466,8 @@ func TestQueue(t *testing.T) {
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
t.Error("expected transaction to be in tx pool")
}
if len(pool.queue) > 0 {
t.Error("expected transaction queue to be empty. is", len(pool.queue))
if len(pool.queue.queued) > 0 {
t.Error("expected transaction queue to be empty. is", len(pool.queue.queued))
}
}
@ -492,8 +492,8 @@ func TestQueue2(t *testing.T) {
if len(pool.pending) != 1 {
t.Error("expected pending length to be 1, got", len(pool.pending))
}
if pool.queue[from].Len() != 2 {
t.Error("expected len(queue) == 2, got", pool.queue[from].Len())
if list, _ := pool.queue.get(from); list.Len() != 2 {
t.Error("expected len(queue) == 2, got", list.Len())
}
}
@ -639,8 +639,8 @@ func TestMissingNonce(t *testing.T) {
if len(pool.pending) != 0 {
t.Error("expected 0 pending transactions, got", len(pool.pending))
}
if pool.queue[addr].Len() != 1 {
t.Error("expected 1 queued transaction, got", pool.queue[addr].Len())
if list, _ := pool.queue.get(addr); list.Len() != 1 {
t.Error("expected 1 queued transaction, got", list.Len())
}
if pool.all.Count() != 1 {
t.Error("expected 1 total transactions, got", pool.all.Count())
@ -712,8 +712,8 @@ func TestDropping(t *testing.T) {
if pool.pending[account].Len() != 3 {
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3)
}
if pool.queue[account].Len() != 3 {
t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3)
if list, _ := pool.queue.get(account); list.Len() != 3 {
t.Errorf("queued transaction mismatch: have %d, want %d", list.Len(), 3)
}
if pool.all.Count() != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6)
@ -722,8 +722,8 @@ func TestDropping(t *testing.T) {
if pool.pending[account].Len() != 3 {
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3)
}
if pool.queue[account].Len() != 3 {
t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3)
if list, _ := pool.queue.get(account); list.Len() != 3 {
t.Errorf("queued transaction mismatch: have %d, want %d", list.Len(), 3)
}
if pool.all.Count() != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6)
@ -741,13 +741,14 @@ func TestDropping(t *testing.T) {
if _, ok := pool.pending[account].txs.items[tx2.Nonce()]; ok {
t.Errorf("out-of-fund pending transaction present: %v", tx1)
}
if _, ok := pool.queue[account].txs.items[tx10.Nonce()]; !ok {
list, _ := pool.queue.get(account)
if _, ok := list.txs.items[tx10.Nonce()]; !ok {
t.Errorf("funded queued transaction missing: %v", tx10)
}
if _, ok := pool.queue[account].txs.items[tx11.Nonce()]; !ok {
if _, ok := list.txs.items[tx11.Nonce()]; !ok {
t.Errorf("funded queued transaction missing: %v", tx10)
}
if _, ok := pool.queue[account].txs.items[tx12.Nonce()]; ok {
if _, ok := list.txs.items[tx12.Nonce()]; ok {
t.Errorf("out-of-fund queued transaction present: %v", tx11)
}
if pool.all.Count() != 4 {
@ -763,10 +764,11 @@ func TestDropping(t *testing.T) {
if _, ok := pool.pending[account].txs.items[tx1.Nonce()]; ok {
t.Errorf("over-gased pending transaction present: %v", tx1)
}
if _, ok := pool.queue[account].txs.items[tx10.Nonce()]; !ok {
list, _ = pool.queue.get(account)
if _, ok := list.txs.items[tx10.Nonce()]; !ok {
t.Errorf("funded queued transaction missing: %v", tx10)
}
if _, ok := pool.queue[account].txs.items[tx11.Nonce()]; ok {
if _, ok := list.txs.items[tx11.Nonce()]; ok {
t.Errorf("over-gased queued transaction present: %v", tx11)
}
if pool.all.Count() != 2 {
@ -820,8 +822,8 @@ func TestPostponing(t *testing.T) {
if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) {
t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs))
}
if len(pool.queue) != 0 {
t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue), 0)
if len(pool.queue.addresses()) != 0 {
t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue.addresses()), 0)
}
if pool.all.Count() != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs))
@ -830,8 +832,8 @@ func TestPostponing(t *testing.T) {
if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) {
t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs))
}
if len(pool.queue) != 0 {
t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue), 0)
if len(pool.queue.addresses()) != 0 {
t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue.addresses()), 0)
}
if pool.all.Count() != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs))
@ -847,7 +849,8 @@ func TestPostponing(t *testing.T) {
if _, ok := pool.pending[accs[0]].txs.items[txs[0].Nonce()]; !ok {
t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txs[0])
}
if _, ok := pool.queue[accs[0]].txs.items[txs[0].Nonce()]; ok {
list, _ := pool.queue.get(accs[0])
if _, ok := list.txs.items[txs[0].Nonce()]; ok {
t.Errorf("tx %d: valid and funded transaction present in future queue: %v", 0, txs[0])
}
for i, tx := range txs[1:100] {
@ -855,14 +858,14 @@ func TestPostponing(t *testing.T) {
if _, ok := pool.pending[accs[0]].txs.items[tx.Nonce()]; ok {
t.Errorf("tx %d: valid but future transaction present in pending pool: %v", i+1, tx)
}
if _, ok := pool.queue[accs[0]].txs.items[tx.Nonce()]; !ok {
if _, ok := list.txs.items[tx.Nonce()]; !ok {
t.Errorf("tx %d: valid but future transaction missing from future queue: %v", i+1, tx)
}
} else {
if _, ok := pool.pending[accs[0]].txs.items[tx.Nonce()]; ok {
t.Errorf("tx %d: out-of-fund transaction present in pending pool: %v", i+1, tx)
}
if _, ok := pool.queue[accs[0]].txs.items[tx.Nonce()]; ok {
if _, ok := list.txs.items[tx.Nonce()]; ok {
t.Errorf("tx %d: out-of-fund transaction present in future queue: %v", i+1, tx)
}
}
@ -872,13 +875,14 @@ func TestPostponing(t *testing.T) {
if pool.pending[accs[1]] != nil {
t.Errorf("invalidated account still has pending transactions")
}
list, _ = pool.queue.get(accs[1])
for i, tx := range txs[100:] {
if i%2 == 1 {
if _, ok := pool.queue[accs[1]].txs.items[tx.Nonce()]; !ok {
if _, ok := list.txs.items[tx.Nonce()]; !ok {
t.Errorf("tx %d: valid but future transaction missing from future queue: %v", 100+i, tx)
}
} else {
if _, ok := pool.queue[accs[1]].txs.items[tx.Nonce()]; ok {
if _, ok := list.txs.items[tx.Nonce()]; ok {
t.Errorf("tx %d: out-of-fund transaction present in future queue: %v", 100+i, tx)
}
}
@ -963,13 +967,14 @@ func TestQueueAccountLimiting(t *testing.T) {
if len(pool.pending) != 0 {
t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, len(pool.pending), 0)
}
list, _ := pool.queue.get(account)
if i <= testTxPoolConfig.AccountQueue {
if pool.queue[account].Len() != int(i) {
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), i)
if list.Len() != int(i) {
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, list.Len(), i)
}
} else {
if pool.queue[account].Len() != int(testTxPoolConfig.AccountQueue) {
t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, pool.queue[account].Len(), testTxPoolConfig.AccountQueue)
if list.Len() != int(testTxPoolConfig.AccountQueue) {
t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, list.Len(), testTxPoolConfig.AccountQueue)
}
}
}
@ -1020,7 +1025,7 @@ func TestQueueGlobalLimiting(t *testing.T) {
pool.addRemotesSync(txs)
queued := 0
for addr, list := range pool.queue {
for addr, list := range pool.queue.queued {
if list.Len() > int(config.AccountQueue) {
t.Errorf("addr %x: queued accounts overflown allowance: %d > %d", addr, list.Len(), config.AccountQueue)
}
@ -1179,8 +1184,8 @@ func TestPendingLimiting(t *testing.T) {
if pool.pending[account].Len() != int(i)+1 {
t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, pool.pending[account].Len(), i+1)
}
if len(pool.queue) != 0 {
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), 0)
if len(pool.queue.addresses()) != 0 {
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, len(pool.queue.addresses()), 0)
}
}
if pool.all.Count() != int(testTxPoolConfig.AccountQueue+5) {

View file

@ -0,0 +1,271 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package legacypool
import (
"sort"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)
type queue struct {
config Config
signer types.Signer
queued map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
}
func newQueue(config Config, signer types.Signer) *queue {
return &queue{
signer: signer,
config: config,
queued: make(map[common.Address]*list),
beats: make(map[common.Address]time.Time),
}
}
func (q *queue) evict(force bool) []common.Hash {
removed := make([]common.Hash, 0)
for addr, list := range q.queued {
// Any transactions old enough should be removed
if force || time.Since(q.beats[addr]) > q.config.Lifetime {
list := list.Flatten()
for _, tx := range list {
q.removeTx(addr, tx)
removed = append(removed, tx.Hash())
}
queuedEvictionMeter.Mark(int64(len(list)))
}
}
return removed
}
func (q *queue) stats() int {
queued := 0
for _, list := range q.queued {
queued += list.Len()
}
return queued
}
func (q *queue) content() map[common.Address][]*types.Transaction {
queued := make(map[common.Address][]*types.Transaction, len(q.queued))
for addr, list := range q.queued {
queued[addr] = list.Flatten()
}
return queued
}
func (q *queue) contentFrom(addr common.Address) []*types.Transaction {
var queued []*types.Transaction
if list, ok := q.get(addr); ok {
queued = list.Flatten()
}
return queued
}
func (q *queue) get(addr common.Address) (*list, bool) {
l, ok := q.queued[addr]
return l, ok
}
func (q *queue) bump(addr common.Address) {
q.beats[addr] = time.Now()
}
func (q *queue) addresses() []common.Address {
addrs := make([]common.Address, 0, len(q.queued))
for addr := range q.queued {
addrs = append(addrs, addr)
}
return addrs
}
func (q queue) removeTx(addr common.Address, tx *types.Transaction) {
if future := q.queued[addr]; future != nil {
if txOld := future.txs.Get(tx.Nonce()); txOld != nil && txOld.Hash() != tx.Hash() {
// Edge case, a different transaction
// with the same nonce is in the queued, just ignore
return
}
if removed, _ := future.Remove(tx); removed {
// Reduce the queued counter
queuedGauge.Dec(1)
}
if future.Empty() {
delete(q.queued, addr)
delete(q.beats, addr)
}
}
}
func (q *queue) add(hash common.Hash, tx *types.Transaction) (*common.Hash, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(q.signer, tx) // already validated
if q.queued[from] == nil {
q.queued[from] = newList(false)
}
inserted, old := q.queued[from].Add(tx, q.config.PriceBump)
if !inserted {
// An older transaction was better, discard this
queuedDiscardMeter.Mark(1)
return nil, txpool.ErrReplaceUnderpriced
}
// If we never record the heartbeat, do it right now.
if _, exist := q.beats[from]; !exist {
q.beats[from] = time.Now()
}
if old == nil {
// Nothing was replaced, bump the queued counter
queuedGauge.Inc(1)
return nil, nil
}
h := old.Hash()
// Transaction was replaced, bump the replacement counter
queuedReplaceMeter.Mark(1)
return &h, nil
}
// promoteExecutables iterates over all accounts with queued transactions, selecting
// for promotion any that are now executable. It also drops any transactions that are
// deemed too old (nonce too low) or too costly (insufficient funds or over gas limit).
//
// Returns three lists: all transactions that were removed from the queue and selected
// for promotion; all other transactions that were removed from the queue and dropped;
// the list of addresses removed.
func (q *queue) promoteExecutables(accounts []common.Address, gasLimit uint64, currentState *state.StateDB, nonces *noncer) ([]*types.Transaction, []common.Hash, []common.Address) {
// Track the promotable transactions to broadcast them at once
var promotable []*types.Transaction
var dropped []common.Hash
var removedAddresses []common.Address
// Iterate over all accounts and promote any executable transactions
for _, addr := range accounts {
list := q.queued[addr]
if list == nil {
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
forwards := list.Forward(currentState.GetNonce(addr))
for _, tx := range forwards {
dropped = append(dropped, tx.Hash())
}
log.Trace("Removing old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(currentState.GetBalance(addr), gasLimit)
for _, tx := range drops {
dropped = append(dropped, tx.Hash())
}
log.Trace("Removing unpayable queued transactions", "count", len(drops))
queuedNofundsMeter.Mark(int64(len(drops)))
// Gather all executable transactions and promote them
readies := list.Ready(nonces.get(addr))
promotable = append(promotable, readies...)
log.Trace("Promoting queued transactions", "count", len(promotable))
queuedGauge.Dec(int64(len(readies)))
// Drop all transactions over the allowed limit
var caps = list.Cap(int(q.config.AccountQueue))
for _, tx := range caps {
hash := tx.Hash()
dropped = append(dropped, hash)
log.Trace("Removing cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(q.queued, addr)
delete(q.beats, addr)
removedAddresses = append(removedAddresses, addr)
}
}
queuedGauge.Dec(int64(len(dropped)))
return promotable, dropped, removedAddresses
}
// truncate drops the oldest transactions from the queue until the total
// number is below the configured limit.
// Returns the hashes of all dropped transactions, and the addresses of
// accounts that became empty due to the truncation.
func (q *queue) truncate() ([]common.Hash, []common.Address) {
queued := uint64(0)
for _, list := range q.queued {
queued += uint64(list.Len())
}
if queued <= q.config.GlobalQueue {
return nil, nil
}
// Sort all accounts with queued transactions by heartbeat
addresses := make(addressesByHeartbeat, 0, len(q.queued))
for addr := range q.queued {
addresses = append(addresses, addressByHeartbeat{addr, q.beats[addr]})
}
sort.Sort(sort.Reverse(addresses))
removed := make([]common.Hash, 0)
removedAddresses := make([]common.Address, 0)
// Drop transactions until the total is below the limit
for drop := queued - q.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1]
list := q.queued[addr.address]
addresses = addresses[:len(addresses)-1]
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
q.removeTx(addr.address, tx)
removed = append(removed, tx.Hash())
}
drop -= size
queuedRateLimitMeter.Mark(int64(size))
removedAddresses = append(removedAddresses, addr.address)
continue
}
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
q.removeTx(addr.address, txs[i])
removed = append(removed, txs[i].Hash())
drop--
queuedRateLimitMeter.Mark(1)
}
}
// no need to clear empty accounts, removeTx already does that
return removed, removedAddresses
}
// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.Address
heartbeat time.Time
}
type addressesByHeartbeat []addressByHeartbeat
func (a addressesByHeartbeat) Len() int { return len(a) }
func (a addressesByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
func (a addressesByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }