core: move TxPool reorg and events to background goroutine (#19705)

This commit is contained in:
Daniel Liu 2024-05-10 15:48:14 +08:00
parent ddbf5d2782
commit 74c72363d0
2 changed files with 540 additions and 449 deletions

File diff suppressed because it is too large Load diff

View file

@ -220,7 +220,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
t.Fatalf("Invalid nonce, want 0, got %d", nonce)
}
pool.AddRemotes(types.Transactions{tx0, tx1})
pool.addRemotesSync([]*types.Transaction{tx0, tx1})
nonce = pool.State().GetNonce(address)
if nonce != 2 {
@ -229,8 +229,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
// trigger state change in the background
trigger = true
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
_, err := pool.Pending()
if err != nil {
@ -288,10 +287,10 @@ func TestTransactionQueue(t *testing.T) {
tx := transaction(0, 100, key)
from, _ := deriveSender(tx)
pool.currentState.AddBalance(from, big.NewInt(1000))
pool.lockedReset(nil, nil)
pool.enqueueTx(tx.Hash(), tx)
<-pool.requestReset(nil, nil)
pool.promoteExecutables([]common.Address{from})
pool.enqueueTx(tx.Hash(), tx)
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
if len(pool.pending) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.pending))
}
@ -300,7 +299,8 @@ func TestTransactionQueue(t *testing.T) {
from, _ = deriveSender(tx)
pool.currentState.SetNonce(from, 2)
pool.enqueueTx(tx.Hash(), tx)
pool.promoteExecutables([]common.Address{from})
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
t.Error("expected transaction to be in tx pool")
}
@ -308,25 +308,28 @@ func TestTransactionQueue(t *testing.T) {
if len(pool.queue) > 0 {
t.Error("expected transaction queue to be empty. is", len(pool.queue))
}
}
pool, key = setupTxPool()
func TestTransactionQueue2(t *testing.T) {
t.Parallel()
pool, key := setupTxPool()
defer pool.Stop()
tx1 := transaction(0, 100, key)
tx2 := transaction(10, 100, key)
tx3 := transaction(11, 100, key)
from, _ = deriveSender(tx1)
from, _ := deriveSender(tx1)
pool.currentState.AddBalance(from, big.NewInt(1000))
pool.lockedReset(nil, nil)
pool.reset(nil, nil)
pool.enqueueTx(tx1.Hash(), tx1)
pool.enqueueTx(tx2.Hash(), tx2)
pool.enqueueTx(tx3.Hash(), tx3)
pool.promoteExecutables([]common.Address{from})
if len(pool.pending) != 1 {
t.Error("expected tx pool to be 1, got", len(pool.pending))
t.Error("expected pending length to be 1, got", len(pool.pending))
}
if pool.queue[from].Len() != 2 {
t.Error("expected len(queue) == 2, got", pool.queue[from].Len())
@ -360,7 +363,7 @@ func TestTransactionChainFork(t *testing.T) {
statedb.AddBalance(addr, big.NewInt(100000000000000))
pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)}
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
}
resetState()
@ -390,7 +393,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
statedb.AddBalance(addr, big.NewInt(100000000000000))
pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)}
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
}
resetState()
@ -406,16 +409,17 @@ func TestTransactionDoubleNonce(t *testing.T) {
if replace, err := pool.add(tx2, false); err != nil || !replace {
t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace)
}
pool.promoteExecutables([]common.Address{addr})
<-pool.requestPromoteExecutables(newAccountSet(signer, addr))
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
}
if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() {
t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash())
}
// Add the third transaction and ensure it's not saved (smaller price)
pool.add(tx3, false)
pool.promoteExecutables([]common.Address{addr})
<-pool.requestPromoteExecutables(newAccountSet(signer, addr))
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
}
@ -461,7 +465,7 @@ func TestTransactionNonceRecovery(t *testing.T) {
addr := crypto.PubkeyToAddress(key.PublicKey)
pool.currentState.SetNonce(addr, n)
pool.currentState.AddBalance(addr, big.NewInt(100000000000000))
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
tx := transaction(n, 100000, key)
if err := pool.AddRemote(tx); err != nil {
@ -469,7 +473,7 @@ func TestTransactionNonceRecovery(t *testing.T) {
}
// simulate some weird re-order of transactions and missing nonce(s)
pool.currentState.SetNonce(addr, n-1)
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
if fn := pool.pendingState.GetNonce(addr); fn != n-1 {
t.Errorf("expected nonce to be %d, got %d", n-1, fn)
}
@ -513,7 +517,7 @@ func TestTransactionDropping(t *testing.T) {
if pool.all.Count() != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6)
}
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
if pool.pending[account].Len() != 3 {
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3)
}
@ -525,7 +529,7 @@ func TestTransactionDropping(t *testing.T) {
}
// Reduce the balance of the account, and check that invalidated transactions are dropped
pool.currentState.AddBalance(account, big.NewInt(-650))
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0)
@ -550,7 +554,7 @@ func TestTransactionDropping(t *testing.T) {
}
// Reduce the block gas limit, check that invalidated transactions are dropped
pool.chain.(*testBlockChain).gasLimit = 100
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0)
@ -607,7 +611,7 @@ func TestTransactionPostponing(t *testing.T) {
txs = append(txs, tx)
}
}
for i, err := range pool.AddRemotes(txs) {
for i, err := range pool.addRemotesSync(txs) {
if err != nil {
t.Fatalf("tx %d: failed to add transactions: %v", i, err)
}
@ -622,7 +626,7 @@ func TestTransactionPostponing(t *testing.T) {
if pool.all.Count() != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs))
}
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) {
t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs))
}
@ -636,7 +640,7 @@ func TestTransactionPostponing(t *testing.T) {
for _, addr := range accs {
pool.currentState.AddBalance(addr, big.NewInt(-1))
}
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
// The first account's first transaction remains valid, check that subsequent
// ones are either filtered out, or queued up for later.
@ -703,12 +707,10 @@ func TestTransactionGapFilling(t *testing.T) {
defer sub.Unsubscribe()
// Create a pending and a queued transaction with a nonce-gap in between
if err := pool.AddRemote(transaction(0, 100000, key)); err != nil {
t.Fatalf("failed to add pending transaction: %v", err)
}
if err := pool.AddRemote(transaction(2, 100000, key)); err != nil {
t.Fatalf("failed to add queued transaction: %v", err)
}
pool.addRemotesSync([]*types.Transaction{
transaction(0, 100000, key),
transaction(2, 100000, key),
})
pending, queued := pool.Stats()
if pending != 1 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
@ -723,7 +725,7 @@ func TestTransactionGapFilling(t *testing.T) {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Fill the nonce gap and ensure all transactions become pending
if err := pool.AddRemote(transaction(1, 100000, key)); err != nil {
if err := pool.addRemoteSync(transaction(1, 100000, key)); err != nil {
t.Fatalf("failed to add gapped transaction: %v", err)
}
pending, queued = pool.Stats()
@ -755,7 +757,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
testTxPoolConfig.AccountQueue = 10
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(1); i <= testTxPoolConfig.AccountQueue; i++ {
if err := pool.AddRemote(transaction(i, 100000, key)); err != nil {
if err := pool.addRemoteSync(transaction(i, 100000, key)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
if len(pool.pending) != 0 {
@ -824,7 +826,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
nonces[addr]++
}
// Import the batch and verify that limits have been enforced
pool.AddRemotes(txs)
pool.addRemotesSync(txs)
queued := 0
for addr, list := range pool.queue {
@ -961,7 +963,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(0); i < testTxPoolConfig.AccountQueue; i++ {
if err := pool.AddRemote(transaction(i, 100000, key)); err != nil {
if err := pool.addRemoteSync(transaction(i, 100000, key)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
if pool.pending[account].Len() != int(i)+1 {
@ -982,59 +984,6 @@ func TestTransactionPendingLimiting(t *testing.T) {
}
}
// Tests that the transaction limits are enforced the same way irrelevant whether
// the transactions are added one by one or in batches.
func TestTransactionQueueLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 1) }
func TestTransactionPendingLimitingEquivalency(t *testing.T) {
testTransactionLimitingEquivalency(t, 0)
}
func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
t.Parallel()
// Add a batch of transactions to a pool one by one
pool1, key1 := setupTxPool()
defer pool1.Stop()
account1, _ := deriveSender(transaction(0, 0, key1))
pool1.currentState.AddBalance(account1, big.NewInt(1000000))
testTxPoolConfig.AccountQueue = 10
for i := uint64(0); i < testTxPoolConfig.AccountQueue; i++ {
if err := pool1.AddRemote(transaction(origin+i, 100000, key1)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
}
// Add a batch of transactions to a pool in one big batch
pool2, key2 := setupTxPool()
defer pool2.Stop()
account2, _ := deriveSender(transaction(0, 0, key2))
pool2.currentState.AddBalance(account2, big.NewInt(1000000))
txs := []*types.Transaction{}
for i := uint64(0); i < testTxPoolConfig.AccountQueue; i++ {
txs = append(txs, transaction(origin+i, 100000, key2))
}
pool2.AddRemotes(txs)
// Ensure the batch optimization honors the same pool mechanics
if len(pool1.pending) != len(pool2.pending) {
t.Errorf("pending transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.pending), len(pool2.pending))
}
if len(pool1.queue) != len(pool2.queue) {
t.Errorf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.queue), len(pool2.queue))
}
if pool1.all.Count() != pool2.all.Count() {
t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", pool1.all.Count(), pool2.all.Count())
}
if err := validateTxPoolInternals(pool1); err != nil {
t.Errorf("pool 1 internal state corrupted: %v", err)
}
if err := validateTxPoolInternals(pool2); err != nil {
t.Errorf("pool 2 internal state corrupted: %v", err)
}
}
// Tests that if the transaction count belonging to multiple accounts go above
// some hard threshold, the higher transactions are dropped to prevent DOS
// attacks.
@ -1070,7 +1019,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
}
}
// Import the batch and verify that limits have been enforced
pool.AddRemotes(txs)
pool.addRemotesSync(txs)
pending := 0
for _, list := range pool.pending {
@ -1152,7 +1101,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
}
}
// Import the batch and verify that limits have been enforced
pool.AddRemotes(txs)
pool.addRemotesSync(txs)
for addr, list := range pool.pending {
if list.Len() != int(config.AccountSlots) {
@ -1209,7 +1158,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[3])
// Import the batch and that both pending and queued transactions match up
pool.AddRemotes(txs)
pool.addRemotesSync(txs)
pool.AddLocal(ltx)
pending, queued := pool.Stats()
@ -1493,7 +1442,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) {
for i := uint64(0); i < config.GlobalSlots; i++ {
txs = append(txs, pricedTransaction(i, 100000, big.NewInt(1), keys[0]))
}
pool.AddRemotes(txs)
pool.addRemotesSync(txs)
pending, queued := pool.Stats()
if pending != int(config.GlobalSlots) {
@ -1509,7 +1458,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Ensure that adding high priced transactions drops a cheap, but doesn't produce a gap
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil {
if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil {
t.Fatalf("failed to add well priced transaction: %v", err)
}
pending, queued = pool.Stats()
@ -1553,7 +1502,7 @@ func TestTransactionReplacement(t *testing.T) {
price := int64(100)
threshold := (price * (100 + int64(testTxPoolConfig.PriceBump))) / 100
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), key)); err != nil {
if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), key)); err != nil {
t.Fatalf("failed to add original cheap pending transaction: %v", err)
}
if err := pool.AddRemote(pricedTransaction(0, 100001, big.NewInt(1), key)); err != ErrReplaceUnderpriced {
@ -1566,7 +1515,7 @@ func TestTransactionReplacement(t *testing.T) {
t.Fatalf("cheap replacement event firing failed: %v", err)
}
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(price), key)); err != nil {
if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(price), key)); err != nil {
t.Fatalf("failed to add original proper pending transaction: %v", err)
}
if err := pool.AddRemote(pricedTransaction(0, 100001, big.NewInt(threshold-1), key)); err != ErrReplaceUnderpriced {
@ -1578,6 +1527,7 @@ func TestTransactionReplacement(t *testing.T) {
if err := validateEvents(events, 2); err != nil {
t.Fatalf("proper replacement event firing failed: %v", err)
}
// Add queued transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too)
if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(1), key)); err != nil {
t.Fatalf("failed to add original cheap queued transaction: %v", err)
@ -1656,7 +1606,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
if err := pool.AddLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add local transaction: %v", err)
}
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), remote)); err != nil {
if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
pending, queued := pool.Stats()
@ -1694,7 +1644,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
}
// Bump the nonce temporarily and ensure the newly invalidated transaction is removed
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
time.Sleep(2 * config.Rejournal)
pool.Stop()
@ -1749,7 +1699,7 @@ func TestTransactionStatusCheck(t *testing.T) {
txs = append(txs, pricedTransaction(2, 100000, big.NewInt(1), keys[2])) // Queued only
// Import the transaction and ensure they are correctly added
pool.AddRemotes(txs)
pool.addRemotesSync(txs)
pending, queued := pool.Stats()
if pending != 2 {
@ -1828,26 +1778,6 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
}
}
// Benchmarks the speed of iterative transaction insertion.
func BenchmarkPoolInsert(b *testing.B) {
// Generate a batch of transactions to enqueue into the pool
pool, key := setupTxPool()
defer pool.Stop()
account, _ := deriveSender(transaction(0, 0, key))
pool.currentState.AddBalance(account, big.NewInt(1000000))
txs := make(types.Transactions, b.N)
for i := 0; i < b.N; i++ {
txs[i] = transaction(uint64(i), 100000, key)
}
// Benchmark importing the transactions into the queue
b.ResetTimer()
for _, tx := range txs {
pool.AddRemote(tx)
}
}
// Benchmarks the speed of batched transaction insertion.
func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) }
func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) }