mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-23 07:04:35 +00:00
core: fix race conditions in txpool (#23474)
This commit is contained in:
parent
e46f41d081
commit
8c78a80771
3 changed files with 26 additions and 13 deletions
|
|
@ -21,6 +21,8 @@ import (
|
|||
"math"
|
||||
"math/big"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/common"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/types"
|
||||
|
|
@ -450,9 +452,10 @@ func (h *priceHeap) Pop() interface{} {
|
|||
// in txpool but only interested in the remote part. It means only remote transactions
|
||||
// will be considered for tracking, sorting, eviction, etc.
|
||||
type txPricedList struct {
|
||||
all *txLookup // Pointer to the map of all transactions
|
||||
remotes *priceHeap // Heap of prices of all the stored **remote** transactions
|
||||
stales int // Number of stale price points to (re-heap trigger)
|
||||
all *txLookup // Pointer to the map of all transactions
|
||||
remotes *priceHeap // Heap of prices of all the stored **remote** transactions
|
||||
stales int64 // Number of stale price points to (re-heap trigger)
|
||||
reheapMu sync.Mutex // Mutex asserts that only one routine is reheaping the list
|
||||
}
|
||||
|
||||
// newTxPricedList creates a new price-sorted transaction heap.
|
||||
|
|
@ -476,8 +479,8 @@ func (l *txPricedList) Put(tx *types.Transaction, local bool) {
|
|||
// the heap if a large enough ratio of transactions go stale.
|
||||
func (l *txPricedList) Removed(count int) {
|
||||
// Bump the stale counter, but exit if still too low (< 25%)
|
||||
l.stales += count
|
||||
if l.stales <= len(*l.remotes)/4 {
|
||||
stales := atomic.AddInt64(&l.stales, int64(count))
|
||||
if int(stales) <= len(*l.remotes)/4 {
|
||||
return
|
||||
}
|
||||
// Seems we've reached a critical number of stale transactions, reheap
|
||||
|
|
@ -515,7 +518,7 @@ func (l *txPricedList) Underpriced(tx *types.Transaction) bool {
|
|||
for len(*l.remotes) > 0 {
|
||||
head := []*types.Transaction(*l.remotes)[0]
|
||||
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
|
||||
l.stales--
|
||||
atomic.AddInt64(&l.stales, -1)
|
||||
heap.Pop(l.remotes)
|
||||
continue
|
||||
}
|
||||
|
|
@ -541,7 +544,7 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
|
|||
// Discard stale transactions if found during cleanup
|
||||
tx := heap.Pop(l.remotes).(*types.Transaction)
|
||||
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
|
||||
l.stales--
|
||||
atomic.AddInt64(&l.stales, -1)
|
||||
continue
|
||||
}
|
||||
// Non stale transaction found, discard it
|
||||
|
|
@ -560,9 +563,12 @@ func (l *txPricedList) Discard(slots int, force bool) (types.Transactions, bool)
|
|||
|
||||
// Reheap forcibly rebuilds the heap based on the current remote transaction set.
|
||||
func (l *txPricedList) Reheap() {
|
||||
l.reheapMu.Lock()
|
||||
defer l.reheapMu.Unlock()
|
||||
reheap := make(priceHeap, 0, l.all.RemoteCount())
|
||||
|
||||
l.stales, l.remotes = 0, &reheap
|
||||
atomic.StoreInt64(&l.stales, 0)
|
||||
l.remotes = &reheap
|
||||
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
|
||||
*l.remotes = append(*l.remotes, tx)
|
||||
return true
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import (
|
|||
"math/big"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/common"
|
||||
|
|
@ -282,6 +283,7 @@ type TxPool struct {
|
|||
reorgDoneCh chan chan struct{}
|
||||
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
|
||||
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
|
||||
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)
|
||||
|
||||
eip2718 bool // Fork indicator whether we are using EIP-2718 type transactions.
|
||||
IsSigner func(address common.Address) bool
|
||||
|
|
@ -314,6 +316,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
|
|||
queueTxEventCh: make(chan *types.Transaction),
|
||||
reorgDoneCh: make(chan chan struct{}),
|
||||
reorgShutdownCh: make(chan struct{}),
|
||||
initDoneCh: make(chan struct{}),
|
||||
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
|
||||
trc21FeeCapacity: map[common.Address]*big.Int{},
|
||||
}
|
||||
|
|
@ -368,6 +371,8 @@ func (pool *TxPool) loop() {
|
|||
defer evict.Stop()
|
||||
defer journal.Stop()
|
||||
|
||||
// Notify tests that the init phase is done
|
||||
close(pool.initDoneCh)
|
||||
for {
|
||||
select {
|
||||
// Handle ChainHeadEvent
|
||||
|
|
@ -386,8 +391,8 @@ func (pool *TxPool) loop() {
|
|||
case <-report.C:
|
||||
pool.mu.RLock()
|
||||
pending, queued := pool.stats()
|
||||
stales := pool.priced.stales
|
||||
pool.mu.RUnlock()
|
||||
stales := int(atomic.LoadInt64(&pool.priced.stales))
|
||||
|
||||
if pending != prevPending || queued != prevQueued || stales != prevStales {
|
||||
log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
|
||||
|
|
|
|||
|
|
@ -22,13 +22,13 @@ import (
|
|||
"math/big"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/common"
|
||||
"github.com/XinFinOrg/XDPoSChain/consensus"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/common"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/state"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/types"
|
||||
"github.com/XinFinOrg/XDPoSChain/crypto"
|
||||
|
|
@ -69,7 +69,7 @@ func (bc *testBlockChain) Config() *params.ChainConfig {
|
|||
|
||||
func (bc *testBlockChain) CurrentBlock() *types.Block {
|
||||
return types.NewBlock(&types.Header{
|
||||
GasLimit: bc.gasLimit,
|
||||
GasLimit: atomic.LoadUint64(&bc.gasLimit),
|
||||
}, nil, nil, nil)
|
||||
}
|
||||
|
||||
|
|
@ -110,6 +110,8 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
|
|||
key, _ := crypto.GenerateKey()
|
||||
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
|
||||
|
||||
// wait for the pool to initialize
|
||||
<-pool.initDoneCh
|
||||
return pool, key
|
||||
}
|
||||
|
||||
|
|
@ -572,7 +574,7 @@ func TestTransactionDropping(t *testing.T) {
|
|||
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4)
|
||||
}
|
||||
// Reduce the block gas limit, check that invalidated transactions are dropped
|
||||
pool.chain.(*testBlockChain).gasLimit = 100
|
||||
atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100)
|
||||
<-pool.requestReset(nil, nil)
|
||||
|
||||
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
|
||||
|
|
|
|||
Loading…
Reference in a new issue