From c9a730d859794bc4c487e05879fe6c6c6bcc2dcc Mon Sep 17 00:00:00 2001 From: Daniel Liu <139250065@qq.com> Date: Tue, 16 Dec 2025 14:43:09 +0800 Subject: [PATCH] core/txpool: use atomic int added in go1.19 #26913 (#1856) --- core/txpool/list.go | 14 +++++----- core/txpool/txpool.go | 2 +- core/txpool/txpool2_test.go | 6 ++--- core/txpool/txpool_test.go | 52 +++++++++++++++++++++---------------- 4 files changed, 41 insertions(+), 33 deletions(-) diff --git a/core/txpool/list.go b/core/txpool/list.go index fa4377f10a..4c3759bbb9 100644 --- a/core/txpool/list.go +++ b/core/txpool/list.go @@ -535,9 +535,11 @@ func (h *priceHeap) Pop() interface{} { // better candidates for inclusion while in other cases (at the top of the baseFee peak) // the floating heap is better. When baseFee is decreasing they behave similarly. type pricedList struct { + // Number of stale price points to (re-heap trigger). + stales atomic.Int64 + all *lookup // Pointer to the map of all transactions urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions - stales int64 // Number of stale price points to (re-heap trigger) reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list } @@ -568,7 +570,7 @@ func (l *pricedList) Put(tx *types.Transaction, local bool) { // the heap if a large enough ratio of transactions go stale. func (l *pricedList) Removed(count int) { // Bump the stale counter, but exit if still too low (< 25%) - stales := atomic.AddInt64(&l.stales, int64(count)) + stales := l.stales.Add(int64(count)) if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 { return } @@ -593,7 +595,7 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool { for len(h.list) > 0 { head := h.list[0] if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated - atomic.AddInt64(&l.stales, -1) + l.stales.Add(-1) heap.Pop(h) continue } @@ -620,7 +622,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { // Discard stale transactions if found during cleanup tx := heap.Pop(&l.urgent).(*types.Transaction) if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated - atomic.AddInt64(&l.stales, -1) + l.stales.Add(-1) continue } // Non stale transaction found, move to floating heap @@ -633,7 +635,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { // Discard stale transactions if found during cleanup tx := heap.Pop(&l.floating).(*types.Transaction) if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated - atomic.AddInt64(&l.stales, -1) + l.stales.Add(-1) continue } // Non stale transaction found, discard it @@ -656,7 +658,7 @@ func (l *pricedList) Reheap() { l.reheapMu.Lock() defer l.reheapMu.Unlock() start := time.Now() - atomic.StoreInt64(&l.stales, 0) + l.stales.Store(0) l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount()) l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { l.urgent.list = append(l.urgent.list, tx) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index eb031c02be..c4284ef9da 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -401,7 +401,7 @@ func (pool *TxPool) loop() { pool.mu.RLock() pending, queued := pool.stats() pool.mu.RUnlock() - stales := int(atomic.LoadInt64(&pool.priced.stales)) + stales := int(pool.priced.stales.Load()) if pending != prevPending || queued != prevQueued || stales != prevStales { log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) diff --git a/core/txpool/txpool2_test.go b/core/txpool/txpool2_test.go index 71c1339970..ab4d659e98 100644 --- a/core/txpool/txpool2_test.go +++ b/core/txpool/txpool2_test.go @@ -80,7 +80,7 @@ func TestTransactionFutureAttack(t *testing.T) { // Create the pool to test the limit enforcement with statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase())) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.GlobalQueue = 100 config.GlobalSlots = 100 @@ -116,7 +116,7 @@ func TestTransactionFuture1559(t *testing.T) { t.Parallel() // Create the pool to test the pricing enforcement with statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase())) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain) defer pool.Stop() @@ -148,7 +148,7 @@ func TestTransactionZAttack(t *testing.T) { t.Parallel() // Create the pool to test the pricing enforcement with statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase())) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain) defer pool.Stop() // Create a number of test accounts, fund them and make transactions diff --git a/core/txpool/txpool_test.go b/core/txpool/txpool_test.go index 7d6be9f626..610e84734a 100644 --- a/core/txpool/txpool_test.go +++ b/core/txpool/txpool_test.go @@ -62,11 +62,17 @@ func init() { } type testBlockChain struct { + gasLimit atomic.Uint64 statedb *state.StateDB - gasLimit uint64 chainHeadFeed *event.Feed } +func newTestBlockChain(gasLimit uint64, statedb *state.StateDB, chainHeadFeed *event.Feed) *testBlockChain { + bc := testBlockChain{statedb: statedb, chainHeadFeed: new(event.Feed)} + bc.gasLimit.Store(gasLimit) + return &bc +} + func (bc *testBlockChain) Engine() consensus.Engine { return nil } @@ -87,7 +93,7 @@ func (bc *testBlockChain) CurrentBlock() *types.Header { return &types.Header{ Root: types.EmptyRootHash, Number: new(big.Int), - GasLimit: atomic.LoadUint64(&bc.gasLimit), + GasLimit: bc.gasLimit.Load(), } } @@ -142,7 +148,7 @@ func setupPool() (*TxPool, *ecdsa.PrivateKey) { func setupPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateKey) { diskdb := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(diskdb)) - blockchain := &testBlockChain{statedb, 10000000, new(event.Feed)} + blockchain := newTestBlockChain(10000000, statedb, new(event.Feed)) key, _ := crypto.GenerateKey() pool := NewTxPool(testTxPoolConfig, config, blockchain) @@ -259,7 +265,7 @@ func TestStateChangeDuringReset(t *testing.T) { // setup pool with 2 transaction in it statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether), tracing.BalanceChangeUnspecified) - blockchain := &testChain{&testBlockChain{statedb, 1000000000, new(event.Feed)}, address, &trigger} + blockchain := &testChain{newTestBlockChain(1000000000, statedb, new(event.Feed)), address, &trigger} tx0 := transaction(0, 100000, key) tx1 := transaction(1, 100000, key) @@ -523,7 +529,7 @@ func TestChainFork(t *testing.T) { statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase())) statedb.AddBalance(addr, big.NewInt(100000000000000), tracing.BalanceChangeUnspecified) - pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} + pool.chain = newTestBlockChain(1000000, statedb, new(event.Feed)) <-pool.requestReset(nil, nil) } resetState() @@ -552,7 +558,7 @@ func TestDoubleNonce(t *testing.T) { statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase())) statedb.AddBalance(addr, big.NewInt(100000000000000), tracing.BalanceChangeUnspecified) - pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} + pool.chain = newTestBlockChain(1000000, statedb, new(event.Feed)) <-pool.requestReset(nil, nil) } resetState() @@ -722,7 +728,7 @@ func TestDropping(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4) } // Reduce the block gas limit, check that invalidated transactions are dropped - atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100) + pool.chain.(*testBlockChain).gasLimit.Store(100) <-pool.requestReset(nil, nil) if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { @@ -751,7 +757,7 @@ func TestPostponing(t *testing.T) { // Create the pool to test the postponing with db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() @@ -964,7 +970,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) { // Create the pool to test the limit enforcement with db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.NoLocals = nolocals @@ -1057,7 +1063,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { // Create the pool to test the non-expiration enforcement db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.Lifetime = time.Second @@ -1244,7 +1250,7 @@ func TestPendingGlobalLimiting(t *testing.T) { // Create the pool to test the limit enforcement with db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.GlobalSlots = config.AccountSlots * 10 @@ -1347,7 +1353,7 @@ func TestCapClearsFromAll(t *testing.T) { // Create the pool to test the limit enforcement with db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.AccountSlots = 2 @@ -1382,7 +1388,7 @@ func TestPendingMinimumAllowance(t *testing.T) { // Create the pool to test the limit enforcement with db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.AccountSlots = 5 @@ -1431,7 +1437,7 @@ func TestRepricing(t *testing.T) { // Create the pool to test the pricing enforcement with db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() @@ -1680,7 +1686,7 @@ func TestRepricingKeepsLocals(t *testing.T) { // Create the pool to test the pricing enforcement with db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain) defer pool.Stop() @@ -1755,7 +1761,7 @@ func TestPoolUnderpricing(t *testing.T) { // Create the pool to test the pricing enforcement with db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.GlobalSlots = 2 @@ -1870,7 +1876,7 @@ func TestPoolStableUnderpricing(t *testing.T) { // Create the pool to test the pricing enforcement with db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.GlobalSlots = common.LimitThresholdNonceInQueue @@ -2103,7 +2109,7 @@ func TestDeduplication(t *testing.T) { // Create the pool to test the pricing enforcement with statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase())) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() @@ -2170,7 +2176,7 @@ func TestReplacement(t *testing.T) { // Create the pool to test the pricing enforcement with db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() @@ -2376,7 +2382,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { // Create the original pool to inject transaction into the journal db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.NoLocals = nolocals @@ -2418,7 +2424,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { // Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - blockchain = &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain = newTestBlockChain(1000000, statedb, new(event.Feed)) pool = NewTxPool(config, params.TestChainConfig, blockchain) @@ -2445,7 +2451,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - blockchain = &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain = newTestBlockChain(1000000, statedb, new(event.Feed)) pool = NewTxPool(config, params.TestChainConfig, blockchain) pending, queued = pool.Stats() @@ -2475,7 +2481,7 @@ func TestStatusCheck(t *testing.T) { // Create the pool to test the status retrievals with db := rawdb.NewMemoryDatabase() statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db)) - blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop()