mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-21 06:04:33 +00:00
parent
999ded17da
commit
c9a730d859
4 changed files with 41 additions and 33 deletions
|
|
@ -535,9 +535,11 @@ func (h *priceHeap) Pop() interface{} {
|
|||
// better candidates for inclusion while in other cases (at the top of the baseFee peak)
|
||||
// the floating heap is better. When baseFee is decreasing they behave similarly.
|
||||
type pricedList struct {
|
||||
// Number of stale price points to (re-heap trigger).
|
||||
stales atomic.Int64
|
||||
|
||||
all *lookup // Pointer to the map of all transactions
|
||||
urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions
|
||||
stales int64 // Number of stale price points to (re-heap trigger)
|
||||
reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list
|
||||
}
|
||||
|
||||
|
|
@ -568,7 +570,7 @@ func (l *pricedList) Put(tx *types.Transaction, local bool) {
|
|||
// the heap if a large enough ratio of transactions go stale.
|
||||
func (l *pricedList) Removed(count int) {
|
||||
// Bump the stale counter, but exit if still too low (< 25%)
|
||||
stales := atomic.AddInt64(&l.stales, int64(count))
|
||||
stales := l.stales.Add(int64(count))
|
||||
if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 {
|
||||
return
|
||||
}
|
||||
|
|
@ -593,7 +595,7 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool {
|
|||
for len(h.list) > 0 {
|
||||
head := h.list[0]
|
||||
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
|
||||
atomic.AddInt64(&l.stales, -1)
|
||||
l.stales.Add(-1)
|
||||
heap.Pop(h)
|
||||
continue
|
||||
}
|
||||
|
|
@ -620,7 +622,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
|
|||
// Discard stale transactions if found during cleanup
|
||||
tx := heap.Pop(&l.urgent).(*types.Transaction)
|
||||
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
|
||||
atomic.AddInt64(&l.stales, -1)
|
||||
l.stales.Add(-1)
|
||||
continue
|
||||
}
|
||||
// Non stale transaction found, move to floating heap
|
||||
|
|
@ -633,7 +635,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
|
|||
// Discard stale transactions if found during cleanup
|
||||
tx := heap.Pop(&l.floating).(*types.Transaction)
|
||||
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
|
||||
atomic.AddInt64(&l.stales, -1)
|
||||
l.stales.Add(-1)
|
||||
continue
|
||||
}
|
||||
// Non stale transaction found, discard it
|
||||
|
|
@ -656,7 +658,7 @@ func (l *pricedList) Reheap() {
|
|||
l.reheapMu.Lock()
|
||||
defer l.reheapMu.Unlock()
|
||||
start := time.Now()
|
||||
atomic.StoreInt64(&l.stales, 0)
|
||||
l.stales.Store(0)
|
||||
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
|
||||
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
|
||||
l.urgent.list = append(l.urgent.list, tx)
|
||||
|
|
|
|||
|
|
@ -401,7 +401,7 @@ func (pool *TxPool) loop() {
|
|||
pool.mu.RLock()
|
||||
pending, queued := pool.stats()
|
||||
pool.mu.RUnlock()
|
||||
stales := int(atomic.LoadInt64(&pool.priced.stales))
|
||||
stales := int(pool.priced.stales.Load())
|
||||
|
||||
if pending != prevPending || queued != prevQueued || stales != prevStales {
|
||||
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ func TestTransactionFutureAttack(t *testing.T) {
|
|||
|
||||
// Create the pool to test the limit enforcement with
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
config := testTxPoolConfig
|
||||
config.GlobalQueue = 100
|
||||
config.GlobalSlots = 100
|
||||
|
|
@ -116,7 +116,7 @@ func TestTransactionFuture1559(t *testing.T) {
|
|||
t.Parallel()
|
||||
// Create the pool to test the pricing enforcement with
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain)
|
||||
defer pool.Stop()
|
||||
|
||||
|
|
@ -148,7 +148,7 @@ func TestTransactionZAttack(t *testing.T) {
|
|||
t.Parallel()
|
||||
// Create the pool to test the pricing enforcement with
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain)
|
||||
defer pool.Stop()
|
||||
// Create a number of test accounts, fund them and make transactions
|
||||
|
|
|
|||
|
|
@ -62,11 +62,17 @@ func init() {
|
|||
}
|
||||
|
||||
type testBlockChain struct {
|
||||
gasLimit atomic.Uint64
|
||||
statedb *state.StateDB
|
||||
gasLimit uint64
|
||||
chainHeadFeed *event.Feed
|
||||
}
|
||||
|
||||
func newTestBlockChain(gasLimit uint64, statedb *state.StateDB, chainHeadFeed *event.Feed) *testBlockChain {
|
||||
bc := testBlockChain{statedb: statedb, chainHeadFeed: new(event.Feed)}
|
||||
bc.gasLimit.Store(gasLimit)
|
||||
return &bc
|
||||
}
|
||||
|
||||
func (bc *testBlockChain) Engine() consensus.Engine {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -87,7 +93,7 @@ func (bc *testBlockChain) CurrentBlock() *types.Header {
|
|||
return &types.Header{
|
||||
Root: types.EmptyRootHash,
|
||||
Number: new(big.Int),
|
||||
GasLimit: atomic.LoadUint64(&bc.gasLimit),
|
||||
GasLimit: bc.gasLimit.Load(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -142,7 +148,7 @@ func setupPool() (*TxPool, *ecdsa.PrivateKey) {
|
|||
func setupPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateKey) {
|
||||
diskdb := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(diskdb))
|
||||
blockchain := &testBlockChain{statedb, 10000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(10000000, statedb, new(event.Feed))
|
||||
|
||||
key, _ := crypto.GenerateKey()
|
||||
pool := NewTxPool(testTxPoolConfig, config, blockchain)
|
||||
|
|
@ -259,7 +265,7 @@ func TestStateChangeDuringReset(t *testing.T) {
|
|||
|
||||
// setup pool with 2 transaction in it
|
||||
statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether), tracing.BalanceChangeUnspecified)
|
||||
blockchain := &testChain{&testBlockChain{statedb, 1000000000, new(event.Feed)}, address, &trigger}
|
||||
blockchain := &testChain{newTestBlockChain(1000000000, statedb, new(event.Feed)), address, &trigger}
|
||||
|
||||
tx0 := transaction(0, 100000, key)
|
||||
tx1 := transaction(1, 100000, key)
|
||||
|
|
@ -523,7 +529,7 @@ func TestChainFork(t *testing.T) {
|
|||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
|
||||
statedb.AddBalance(addr, big.NewInt(100000000000000), tracing.BalanceChangeUnspecified)
|
||||
|
||||
pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
pool.chain = newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
<-pool.requestReset(nil, nil)
|
||||
}
|
||||
resetState()
|
||||
|
|
@ -552,7 +558,7 @@ func TestDoubleNonce(t *testing.T) {
|
|||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
|
||||
statedb.AddBalance(addr, big.NewInt(100000000000000), tracing.BalanceChangeUnspecified)
|
||||
|
||||
pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
pool.chain = newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
<-pool.requestReset(nil, nil)
|
||||
}
|
||||
resetState()
|
||||
|
|
@ -722,7 +728,7 @@ func TestDropping(t *testing.T) {
|
|||
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4)
|
||||
}
|
||||
// Reduce the block gas limit, check that invalidated transactions are dropped
|
||||
atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100)
|
||||
pool.chain.(*testBlockChain).gasLimit.Store(100)
|
||||
<-pool.requestReset(nil, nil)
|
||||
|
||||
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
|
||||
|
|
@ -751,7 +757,7 @@ func TestPostponing(t *testing.T) {
|
|||
// Create the pool to test the postponing with
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
|
||||
defer pool.Stop()
|
||||
|
|
@ -964,7 +970,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) {
|
|||
// Create the pool to test the limit enforcement with
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
config := testTxPoolConfig
|
||||
config.NoLocals = nolocals
|
||||
|
|
@ -1057,7 +1063,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
|
|||
// Create the pool to test the non-expiration enforcement
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
config := testTxPoolConfig
|
||||
config.Lifetime = time.Second
|
||||
|
|
@ -1244,7 +1250,7 @@ func TestPendingGlobalLimiting(t *testing.T) {
|
|||
// Create the pool to test the limit enforcement with
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
config := testTxPoolConfig
|
||||
config.GlobalSlots = config.AccountSlots * 10
|
||||
|
|
@ -1347,7 +1353,7 @@ func TestCapClearsFromAll(t *testing.T) {
|
|||
// Create the pool to test the limit enforcement with
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
config := testTxPoolConfig
|
||||
config.AccountSlots = 2
|
||||
|
|
@ -1382,7 +1388,7 @@ func TestPendingMinimumAllowance(t *testing.T) {
|
|||
// Create the pool to test the limit enforcement with
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
config := testTxPoolConfig
|
||||
config.AccountSlots = 5
|
||||
|
|
@ -1431,7 +1437,7 @@ func TestRepricing(t *testing.T) {
|
|||
// Create the pool to test the pricing enforcement with
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
|
||||
defer pool.Stop()
|
||||
|
|
@ -1680,7 +1686,7 @@ func TestRepricingKeepsLocals(t *testing.T) {
|
|||
// Create the pool to test the pricing enforcement with
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain)
|
||||
defer pool.Stop()
|
||||
|
|
@ -1755,7 +1761,7 @@ func TestPoolUnderpricing(t *testing.T) {
|
|||
// Create the pool to test the pricing enforcement with
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
config := testTxPoolConfig
|
||||
config.GlobalSlots = 2
|
||||
|
|
@ -1870,7 +1876,7 @@ func TestPoolStableUnderpricing(t *testing.T) {
|
|||
// Create the pool to test the pricing enforcement with
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
config := testTxPoolConfig
|
||||
config.GlobalSlots = common.LimitThresholdNonceInQueue
|
||||
|
|
@ -2103,7 +2109,7 @@ func TestDeduplication(t *testing.T) {
|
|||
|
||||
// Create the pool to test the pricing enforcement with
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
|
||||
defer pool.Stop()
|
||||
|
|
@ -2170,7 +2176,7 @@ func TestReplacement(t *testing.T) {
|
|||
// Create the pool to test the pricing enforcement with
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
|
||||
defer pool.Stop()
|
||||
|
|
@ -2376,7 +2382,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
|
|||
// Create the original pool to inject transaction into the journal
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
config := testTxPoolConfig
|
||||
config.NoLocals = nolocals
|
||||
|
|
@ -2418,7 +2424,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
|
|||
// Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive
|
||||
pool.Stop()
|
||||
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
|
||||
blockchain = &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain = newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
pool = NewTxPool(config, params.TestChainConfig, blockchain)
|
||||
|
||||
|
|
@ -2445,7 +2451,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
|
|||
pool.Stop()
|
||||
|
||||
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
|
||||
blockchain = &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain = newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
pool = NewTxPool(config, params.TestChainConfig, blockchain)
|
||||
|
||||
pending, queued = pool.Stats()
|
||||
|
|
@ -2475,7 +2481,7 @@ func TestStatusCheck(t *testing.T) {
|
|||
// Create the pool to test the status retrievals with
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(db))
|
||||
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
|
||||
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
|
||||
defer pool.Stop()
|
||||
|
|
|
|||
Loading…
Reference in a new issue