mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 13:21:37 +00:00
This commit is contained in:
parent
bfe9a3e714
commit
13548d5d9e
16 changed files with 673 additions and 889 deletions
|
|
@ -93,7 +93,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool,
|
|||
return err
|
||||
}
|
||||
// Add tx signed to local tx pool.
|
||||
err = pool.Add([]*types.Transaction{txSigned}, true, true)[0]
|
||||
err = pool.AddLocal(txSigned, true)
|
||||
if err != nil {
|
||||
log.Error("Fail to add tx sign to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce)
|
||||
return err
|
||||
|
|
@ -121,7 +121,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool,
|
|||
return err
|
||||
}
|
||||
// Add tx signed to local tx pool.
|
||||
err = pool.Add([]*types.Transaction{txSigned}, true, true)[0]
|
||||
err = pool.AddLocal(txSigned, true)
|
||||
if err != nil {
|
||||
log.Error("Fail to add tx secret to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce)
|
||||
return err
|
||||
|
|
@ -150,7 +150,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool,
|
|||
return err
|
||||
}
|
||||
// Add tx to pool.
|
||||
err = pool.Add([]*types.Transaction{txSigned}, true, true)[0]
|
||||
err = pool.AddLocal(txSigned, true)
|
||||
if err != nil {
|
||||
log.Error("Fail to add tx opening to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce)
|
||||
return err
|
||||
|
|
|
|||
13
core/txpool/legacypool/journal_shared.go
Normal file
13
core/txpool/legacypool/journal_shared.go
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
package legacypool
|
||||
|
||||
import "errors"
|
||||
|
||||
// errNoActiveJournal is returned if a transaction is attempted to be inserted
|
||||
// into the journal, but no such file is currently open.
|
||||
var errNoActiveJournal = errors.New("no active journal")
|
||||
|
||||
// devNull is a WriteCloser that just discards anything written into it.
|
||||
type devNull struct{}
|
||||
|
||||
func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil }
|
||||
func (*devNull) Close() error { return nil }
|
||||
|
|
@ -117,7 +117,6 @@ var (
|
|||
|
||||
pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
|
||||
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
|
||||
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
|
||||
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)
|
||||
|
||||
pendingAddrsGauge = metrics.NewRegisteredGauge("txpool/pending/accounts", nil)
|
||||
|
|
@ -191,10 +190,6 @@ var defaultMaxTip = big.NewInt(1000 * params.GWei)
|
|||
// unreasonable or unworkable.
|
||||
func (config *Config) sanitize() Config {
|
||||
conf := *config
|
||||
if conf.Rejournal < time.Second {
|
||||
log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second)
|
||||
conf.Rejournal = time.Second
|
||||
}
|
||||
if conf.PriceBump < 1 {
|
||||
log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultConfig.PriceBump)
|
||||
conf.PriceBump = DefaultConfig.PriceBump
|
||||
|
|
@ -257,9 +252,6 @@ type LegacyPool struct {
|
|||
pendingNonces *noncer // Pending state tracking virtual nonces
|
||||
reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools
|
||||
|
||||
locals *accountSet // Set of local transaction to exempt from eviction rules
|
||||
journal *journal // Journal of local transaction to back up to disk
|
||||
|
||||
pending map[common.Address]*list // All currently processable transactions
|
||||
queue map[common.Address]*list // Queued but non-processable transactions
|
||||
beats map[common.Address]time.Time // Last heartbeat from each known account
|
||||
|
|
@ -308,16 +300,8 @@ func New(config Config, chain BlockChain) *LegacyPool {
|
|||
initDoneCh: make(chan struct{}),
|
||||
trc21FeeCapacity: map[common.Address]*big.Int{},
|
||||
}
|
||||
pool.locals = newAccountSet(pool.signer)
|
||||
for _, addr := range config.Locals {
|
||||
log.Info("Setting new local account", "address", addr)
|
||||
pool.locals.add(addr)
|
||||
}
|
||||
pool.priced = newPricedList(pool.all)
|
||||
|
||||
if !config.NoLocals && config.Journal != "" {
|
||||
pool.journal = newTxJournal(config.Journal)
|
||||
}
|
||||
return pool
|
||||
}
|
||||
|
||||
|
|
@ -333,8 +317,7 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
|
|||
}
|
||||
|
||||
// Init sets the gas price needed to keep a transaction in the pool and the chain
|
||||
// head to allow balance / nonce checks. The transaction journal will be loaded
|
||||
// from disk and filtered based on the provided starting settings. The internal
|
||||
// head to allow balance / nonce checks. The internal
|
||||
// goroutines will be spun up and the pool deemed operational afterwards.
|
||||
func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error {
|
||||
// Set the address reserver to request exclusive access to pooled accounts
|
||||
|
|
@ -357,19 +340,7 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserver *txpool
|
|||
pool.currentState = statedb
|
||||
pool.pendingNonces = newNoncer(statedb)
|
||||
|
||||
// Start the reorg loop early, so it can handle requests generated during
|
||||
// journal loading.
|
||||
pool.wg.Go(pool.scheduleReorgLoop)
|
||||
|
||||
// If local transactions and journaling is enabled, load from disk
|
||||
if pool.journal != nil {
|
||||
if err := pool.journal.load(pool.addLocals); err != nil {
|
||||
log.Warn("Failed to load transaction journal", "err", err)
|
||||
}
|
||||
if err := pool.journal.rotate(pool.local()); err != nil {
|
||||
log.Warn("Failed to rotate transaction journal", "err", err)
|
||||
}
|
||||
}
|
||||
pool.wg.Go(pool.loop)
|
||||
return nil
|
||||
}
|
||||
|
|
@ -382,13 +353,11 @@ func (pool *LegacyPool) loop() {
|
|||
prevPending, prevQueued, prevStales int
|
||||
|
||||
// Start the stats reporting and transaction eviction tickers
|
||||
report = time.NewTicker(statsReportInterval)
|
||||
evict = time.NewTicker(evictionInterval)
|
||||
journal = time.NewTicker(pool.config.Rejournal)
|
||||
report = time.NewTicker(statsReportInterval)
|
||||
evict = time.NewTicker(evictionInterval)
|
||||
)
|
||||
defer report.Stop()
|
||||
defer evict.Stop()
|
||||
defer journal.Stop()
|
||||
|
||||
// Notify tests that the init phase is done
|
||||
close(pool.initDoneCh)
|
||||
|
|
@ -414,11 +383,7 @@ func (pool *LegacyPool) loop() {
|
|||
case <-evict.C:
|
||||
pool.mu.Lock()
|
||||
for addr := range pool.queue {
|
||||
// Skip local transactions from the eviction mechanism
|
||||
if pool.locals.contains(addr) {
|
||||
continue
|
||||
}
|
||||
// Any non-locals old enough should be removed
|
||||
// Any old enough should be removed
|
||||
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
|
||||
list := pool.queue[addr].Flatten()
|
||||
for _, tx := range list {
|
||||
|
|
@ -428,16 +393,6 @@ func (pool *LegacyPool) loop() {
|
|||
}
|
||||
}
|
||||
pool.mu.Unlock()
|
||||
|
||||
// Handle local transaction journal rotation
|
||||
case <-journal.C:
|
||||
if pool.journal != nil {
|
||||
pool.mu.Lock()
|
||||
if err := pool.journal.rotate(pool.local()); err != nil {
|
||||
log.Warn("Failed to rotate local tx journal", "err", err)
|
||||
}
|
||||
pool.mu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -448,9 +403,6 @@ func (pool *LegacyPool) Close() error {
|
|||
close(pool.reorgShutdownCh)
|
||||
pool.wg.Wait()
|
||||
|
||||
if pool.journal != nil {
|
||||
pool.journal.close()
|
||||
}
|
||||
log.Info("Transaction pool stopped")
|
||||
return nil
|
||||
}
|
||||
|
|
@ -501,7 +453,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) error {
|
|||
// If the min miner fee increased, remove transactions below the new threshold
|
||||
if newTip.Cmp(old) > 0 {
|
||||
// pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead
|
||||
drop := pool.all.RemotesBelowTip(tip)
|
||||
drop := pool.all.TxsBelowTip(tip)
|
||||
for _, tx := range drop {
|
||||
pool.removeTx(tx.Hash(), false, true)
|
||||
}
|
||||
|
|
@ -603,7 +555,7 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
|
|||
txs := list.Flatten()
|
||||
|
||||
// If the miner requests tip enforcement, cap the lists now
|
||||
if minTipBig != nil && !pool.locals.contains(addr) {
|
||||
if minTipBig != nil {
|
||||
for i, tx := range txs {
|
||||
if !tx.IsSpecialTransaction() && tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
|
||||
txs = txs[:i]
|
||||
|
|
@ -630,35 +582,11 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
|
|||
return pending
|
||||
}
|
||||
|
||||
// Locals retrieves the accounts currently considered local by the pool.
|
||||
func (pool *LegacyPool) Locals() []common.Address {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
return pool.locals.flatten()
|
||||
}
|
||||
|
||||
// local retrieves all currently known local transactions, grouped by origin
|
||||
// account and sorted by nonce. The returned transaction set is a copy and can be
|
||||
// freely modified by calling code.
|
||||
func (pool *LegacyPool) local() map[common.Address]types.Transactions {
|
||||
txs := make(map[common.Address]types.Transactions)
|
||||
for addr := range pool.locals.accounts {
|
||||
if pending := pool.pending[addr]; pending != nil {
|
||||
txs[addr] = append(txs[addr], pending.Flatten()...)
|
||||
}
|
||||
if queued := pool.queue[addr]; queued != nil {
|
||||
txs[addr] = append(txs[addr], queued.Flatten()...)
|
||||
}
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
// validateTxBasics checks whether a transaction is valid according to the consensus
|
||||
// rules, but does not check state-dependent validation such as sufficient balance.
|
||||
// This check is meant as an early check which only needs to be performed once,
|
||||
// and does not require the pool mutex to be held.
|
||||
func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) error {
|
||||
func (pool *LegacyPool) validateTxBasics(tx *types.Transaction) error {
|
||||
opts := &txpool.ValidationOptions{
|
||||
Config: pool.chainconfig,
|
||||
Accept: 0 |
|
||||
|
|
@ -672,9 +600,6 @@ func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) erro
|
|||
return pool.isSigner != nil && !pool.isSigner(from)
|
||||
},
|
||||
}
|
||||
if local {
|
||||
opts.MinTip = new(big.Int)
|
||||
}
|
||||
if err := txpool.ValidateTransaction(tx, pool.currentHead.Load(), pool.signer, opts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -797,11 +722,7 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error {
|
|||
// add validates a transaction and inserts it into the non-executable queue for later
|
||||
// pending promotion and execution. If the transaction is a replacement for an already
|
||||
// pending or queued one, it overwrites the previous transaction if its price is higher.
|
||||
//
|
||||
// If a newly added transaction is marked as local, its sending account will be
|
||||
// added to the allowlist, preventing any associated transaction from being dropped
|
||||
// out of the pool due to pricing constraints.
|
||||
func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
|
||||
func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
|
||||
// If the transaction is already known, discard it
|
||||
hash := tx.Hash()
|
||||
if pool.all.Get(hash) != nil {
|
||||
|
|
@ -809,9 +730,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
|
|||
knownTxMeter.Mark(1)
|
||||
return false, txpool.ErrAlreadyKnown
|
||||
}
|
||||
// Make the local flag. If it's from local source or it's from the network but
|
||||
// the sender is marked as local previously, treat it as the local transaction.
|
||||
isLocal := local || pool.locals.containsTx(tx)
|
||||
|
||||
// If the transaction fails basic validation, discard it
|
||||
if err := pool.validateTx(tx); err != nil {
|
||||
|
|
@ -848,13 +766,13 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
|
|||
// Special transactions must also honor the reservation semantics to keep
|
||||
// the coordinator's ownership accounting balanced.
|
||||
if tx.IsSpecialTransaction() && pool.IsSigner(from) && pool.pendingNonces.get(from) == tx.Nonce() {
|
||||
return pool.promoteSpecialTx(from, tx, isLocal)
|
||||
return pool.promoteSpecialTx(from, tx)
|
||||
}
|
||||
|
||||
// If the transaction pool is full, discard underpriced transactions
|
||||
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
|
||||
// If the new transaction is underpriced, don't accept it
|
||||
if !isLocal && pool.priced.Underpriced(tx) {
|
||||
if pool.priced.Underpriced(tx) {
|
||||
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
|
||||
underpricedTxMeter.Mark(1)
|
||||
return false, txpool.ErrUnderpriced
|
||||
|
|
@ -869,19 +787,18 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
|
|||
}
|
||||
|
||||
// New transaction is better than our worse ones, make room for it.
|
||||
// If it's a local transaction, forcibly discard all available transactions.
|
||||
// Otherwise if we can't make enough room for new one, abort the operation.
|
||||
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)
|
||||
// If we can't make enough room for new one, abort the operation.
|
||||
drop, success := pool.priced.Discard(pool.all.Slots() - int(pool.config.GlobalSlots+pool.config.GlobalQueue) + numSlots(tx))
|
||||
|
||||
// Special case, we still can't make the room for the new remote one.
|
||||
if !isLocal && !success {
|
||||
if !success {
|
||||
log.Trace("Discarding overflown transaction", "hash", hash)
|
||||
overflowedTxMeter.Mark(1)
|
||||
return false, ErrTxPoolOverflow
|
||||
}
|
||||
|
||||
// If the new transaction is a future transaction it should never churn pending transactions
|
||||
if !isLocal && pool.isGapped(from, tx) {
|
||||
if pool.isGapped(from, tx) {
|
||||
var replacesPending bool
|
||||
for _, dropTx := range drop {
|
||||
dropSender, _ := types.Sender(pool.signer, dropTx)
|
||||
|
|
@ -893,7 +810,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
|
|||
// Add all transactions back to the priced queue
|
||||
if replacesPending {
|
||||
for _, dropTx := range drop {
|
||||
pool.priced.Put(dropTx, false)
|
||||
pool.priced.Put(dropTx)
|
||||
}
|
||||
log.Trace("Discarding future transaction replacing pending tx", "hash", hash)
|
||||
return false, ErrFutureReplacePending
|
||||
|
|
@ -926,9 +843,8 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
|
|||
pool.priced.Removed(1)
|
||||
pendingReplaceMeter.Mark(1)
|
||||
}
|
||||
pool.all.Add(tx, isLocal)
|
||||
pool.priced.Put(tx, isLocal)
|
||||
pool.journalTx(from, tx)
|
||||
pool.all.Add(tx)
|
||||
pool.priced.Put(tx)
|
||||
pool.queueTxEvent(tx)
|
||||
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
|
||||
|
||||
|
|
@ -937,20 +853,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
|
|||
return old != nil, nil
|
||||
}
|
||||
// New transaction isn't replacing a pending one, push into queue
|
||||
replaced, err = pool.enqueueTx(hash, tx, isLocal, true)
|
||||
replaced, err = pool.enqueueTx(hash, tx, true)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
// Mark local addresses and journal local transactions
|
||||
if local && !pool.locals.contains(from) {
|
||||
log.Info("Setting new local account", "address", from)
|
||||
pool.locals.add(from)
|
||||
pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
|
||||
}
|
||||
if isLocal {
|
||||
localGauge.Inc(1)
|
||||
}
|
||||
pool.journalTx(from, tx)
|
||||
|
||||
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
|
||||
return replaced, nil
|
||||
|
|
@ -983,7 +889,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo
|
|||
// enqueueTx inserts a new transaction into the non-executable transaction queue.
|
||||
//
|
||||
// Note, this method assumes the pool lock is held!
|
||||
func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) {
|
||||
func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) {
|
||||
// Try to insert the transaction into the future queue
|
||||
from, _ := types.Sender(pool.signer, tx) // already validated
|
||||
if pool.queue[from] == nil {
|
||||
|
|
@ -1011,8 +917,8 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
|
|||
log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
|
||||
}
|
||||
if addAll {
|
||||
pool.all.Add(tx, local)
|
||||
pool.priced.Put(tx, local)
|
||||
pool.all.Add(tx)
|
||||
pool.priced.Put(tx)
|
||||
}
|
||||
// If we never record the heartbeat, do it right now.
|
||||
if _, exist := pool.beats[from]; !exist {
|
||||
|
|
@ -1021,18 +927,6 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
|
|||
return old != nil, nil
|
||||
}
|
||||
|
||||
// journalTx adds the specified transaction to the local disk journal if it is
|
||||
// deemed to have been sent from a local account.
|
||||
func (pool *LegacyPool) journalTx(from common.Address, tx *types.Transaction) {
|
||||
// Only journal if it's enabled and the transaction is local
|
||||
if pool.journal == nil || !pool.locals.contains(from) {
|
||||
return
|
||||
}
|
||||
if err := pool.journal.insert(tx); err != nil {
|
||||
log.Warn("Failed to journal local transaction", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// promoteTx adds a transaction to the pending (processable) list of transactions
|
||||
// and returns whether it was inserted or an older was better.
|
||||
//
|
||||
|
|
@ -1071,7 +965,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
|
|||
return true
|
||||
}
|
||||
|
||||
func (pool *LegacyPool) promoteSpecialTx(addr common.Address, tx *types.Transaction, isLocal bool) (bool, error) {
|
||||
func (pool *LegacyPool) promoteSpecialTx(addr common.Address, tx *types.Transaction) (bool, error) {
|
||||
list := pool.pending[addr]
|
||||
newPending := list == nil
|
||||
if newPending {
|
||||
|
|
@ -1122,7 +1016,7 @@ func (pool *LegacyPool) promoteSpecialTx(addr common.Address, tx *types.Transact
|
|||
}
|
||||
// Failsafe to work around direct pending inserts (tests)
|
||||
if pool.all.Get(tx.Hash()) == nil {
|
||||
pool.all.Add(tx, isLocal)
|
||||
pool.all.Add(tx)
|
||||
}
|
||||
// Set the potentially new pending nonce and notify any subsystems of the new tx
|
||||
pool.beats[addr] = time.Now()
|
||||
|
|
@ -1131,28 +1025,13 @@ func (pool *LegacyPool) promoteSpecialTx(addr common.Address, tx *types.Transact
|
|||
return true, nil
|
||||
}
|
||||
|
||||
// AddLocals enqueues a batch of transactions into the pool if they are valid, marking the
|
||||
// senders as local ones, ensuring they go around the local pricing constraints.
|
||||
//
|
||||
// This method is used to add transactions from the RPC API and performs synchronous pool
|
||||
// reorganization and event propagation.
|
||||
func (pool *LegacyPool) addLocals(txs []*types.Transaction) []error {
|
||||
return pool.Add(txs, !pool.config.NoLocals, true)
|
||||
}
|
||||
|
||||
// AddLocal enqueues a single local transaction into the pool if it is valid. This is
|
||||
// a convenience wrapper around AddLocals.
|
||||
func (pool *LegacyPool) addLocal(tx *types.Transaction) error {
|
||||
return pool.addLocals([]*types.Transaction{tx})[0]
|
||||
}
|
||||
|
||||
// AddRemotes enqueues a batch of transactions into the pool if they are valid. If the
|
||||
// senders are not among the locally tracked ones, full pricing constraints will apply.
|
||||
// AddRemotes enqueues a batch of transactions into the pool if they are valid.
|
||||
// Full pricing constraints will apply.
|
||||
//
|
||||
// This method is used to add transactions from the p2p network and does not wait for pool
|
||||
// reorganization and internal event propagation.
|
||||
func (pool *LegacyPool) AddRemotes(txs []*types.Transaction) []error {
|
||||
return pool.Add(txs, false, false)
|
||||
return pool.Add(txs, false)
|
||||
}
|
||||
|
||||
// addRemote enqueues a single transaction into the pool if it is valid. This is a convenience
|
||||
|
|
@ -1163,23 +1042,19 @@ func (pool *LegacyPool) addRemote(tx *types.Transaction) error {
|
|||
|
||||
// AddRemotesSync is like AddRemotes, but waits for pool reorganization. Tests use this method.
|
||||
func (pool *LegacyPool) addRemotesSync(txs []*types.Transaction) []error {
|
||||
return pool.Add(txs, false, true)
|
||||
return pool.Add(txs, true)
|
||||
}
|
||||
|
||||
// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
|
||||
func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
|
||||
return pool.Add([]*types.Transaction{tx}, false, true)[0]
|
||||
return pool.Add([]*types.Transaction{tx}, true)[0]
|
||||
}
|
||||
|
||||
// Add enqueues a batch of transactions into the pool if they are valid. Depending
|
||||
// on the local flag, full pricing constraints will or will not be applied.
|
||||
// Add enqueues a batch of transactions into the pool if they are valid.
|
||||
//
|
||||
// If sync is set, the method will block until all internal maintenance related
|
||||
// to the add is finished. Only use this during tests for determinism!
|
||||
func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error {
|
||||
// Do not treat as local if local transactions have been disabled
|
||||
local = local && !pool.config.NoLocals
|
||||
|
||||
func (pool *LegacyPool) Add(txs []*types.Transaction, sync bool) []error {
|
||||
// Filter out known ones without obtaining the pool lock or recovering signatures
|
||||
var (
|
||||
errs = make([]error, len(txs))
|
||||
|
|
@ -1195,7 +1070,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
|
|||
// Exclude transactions with basic errors, e.g invalid signatures and
|
||||
// insufficient intrinsic gas as soon as possible and cache senders
|
||||
// in transactions before obtaining lock
|
||||
if err := pool.validateTxBasics(tx, local); err != nil {
|
||||
if err := pool.validateTxBasics(tx); err != nil {
|
||||
errs[i] = err
|
||||
invalidTxMeter.Mark(1)
|
||||
continue
|
||||
|
|
@ -1209,7 +1084,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
|
|||
|
||||
// Process all the new transaction and merge any errors into the original slice
|
||||
pool.mu.Lock()
|
||||
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
|
||||
newErrs, dirtyAddrs := pool.addTxsLocked(news)
|
||||
pool.mu.Unlock()
|
||||
|
||||
var nilSlot = 0
|
||||
|
|
@ -1230,11 +1105,11 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
|
|||
|
||||
// addTxsLocked attempts to queue a batch of transactions if they are valid.
|
||||
// The transaction pool lock must be held.
|
||||
func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
|
||||
func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction) ([]error, *accountSet) {
|
||||
dirty := newAccountSet(pool.signer)
|
||||
errs := make([]error, len(txs))
|
||||
for i, tx := range txs {
|
||||
replaced, err := pool.add(tx, local)
|
||||
replaced, err := pool.add(tx)
|
||||
errs[i] = err
|
||||
if err == nil && !replaced {
|
||||
dirty.addTx(tx)
|
||||
|
|
@ -1320,9 +1195,6 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
|
|||
if outofbound {
|
||||
pool.priced.Removed(1)
|
||||
}
|
||||
if pool.locals.contains(addr) {
|
||||
localGauge.Dec(1)
|
||||
}
|
||||
// Remove the transaction from the pending lists and reset the account nonce
|
||||
if pending := pool.pending[addr]; pending != nil {
|
||||
if removed, invalids := pending.Remove(tx); removed {
|
||||
|
|
@ -1334,7 +1206,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
|
|||
// Postpone any invalidated transactions
|
||||
for _, tx := range invalids {
|
||||
// Internal shuffle shouldn't touch the lookup set.
|
||||
pool.enqueueTx(tx.Hash(), tx, false, false)
|
||||
pool.enqueueTx(tx.Hash(), tx, false)
|
||||
}
|
||||
// Update the account nonce if needed
|
||||
pool.pendingNonces.setIfLower(addr, tx.Nonce())
|
||||
|
|
@ -1398,7 +1270,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
|
|||
launchNextRun bool
|
||||
reset *txpoolResetRequest
|
||||
dirtyAccounts *accountSet
|
||||
queuedEvents = make(map[common.Address]*sortedMap)
|
||||
queuedEvents = make(map[common.Address]*SortedMap)
|
||||
)
|
||||
for {
|
||||
// Launch next background reorg if needed
|
||||
|
|
@ -1411,7 +1283,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
|
|||
launchNextRun = false
|
||||
|
||||
reset, dirtyAccounts = nil, nil
|
||||
queuedEvents = make(map[common.Address]*sortedMap)
|
||||
queuedEvents = make(map[common.Address]*SortedMap)
|
||||
}
|
||||
|
||||
select {
|
||||
|
|
@ -1440,7 +1312,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
|
|||
// request one later if they want the events sent.
|
||||
addr, _ := types.Sender(pool.signer, tx)
|
||||
if _, ok := queuedEvents[addr]; !ok {
|
||||
queuedEvents[addr] = newSortedMap()
|
||||
queuedEvents[addr] = NewSortedMap()
|
||||
}
|
||||
queuedEvents[addr].Put(tx)
|
||||
|
||||
|
|
@ -1459,7 +1331,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
|
|||
}
|
||||
|
||||
// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
|
||||
func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*sortedMap) {
|
||||
func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*SortedMap) {
|
||||
defer func(t0 time.Time) {
|
||||
reorgDurationTimer.Update(time.Since(t0))
|
||||
}(time.Now())
|
||||
|
|
@ -1526,7 +1398,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
|
|||
for _, tx := range promoted {
|
||||
addr, _ := types.Sender(pool.signer, tx)
|
||||
if _, ok := events[addr]; !ok {
|
||||
events[addr] = newSortedMap()
|
||||
events[addr] = NewSortedMap()
|
||||
}
|
||||
events[addr].Put(tx)
|
||||
}
|
||||
|
|
@ -1636,7 +1508,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
|
|||
// Inject any transactions discarded due to reorgs
|
||||
log.Debug("Reinjecting stale transactions", "count", len(reinject))
|
||||
core.SenderCacher().Recover(pool.signer, reinject)
|
||||
pool.addTxsLocked(reinject, false)
|
||||
pool.addTxsLocked(reinject)
|
||||
}
|
||||
|
||||
// promoteExecutables moves transactions that have become processable from the
|
||||
|
|
@ -1688,22 +1560,17 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
|
|||
queuedGauge.Dec(int64(len(readies)))
|
||||
|
||||
// Drop all transactions over the allowed limit
|
||||
var caps types.Transactions
|
||||
if !pool.locals.contains(addr) {
|
||||
caps = list.Cap(int(pool.config.AccountQueue))
|
||||
for _, tx := range caps {
|
||||
hash := tx.Hash()
|
||||
pool.all.Remove(hash)
|
||||
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
|
||||
}
|
||||
queuedRateLimitMeter.Mark(int64(len(caps)))
|
||||
var caps = list.Cap(int(pool.config.AccountQueue))
|
||||
for _, tx := range caps {
|
||||
hash := tx.Hash()
|
||||
pool.all.Remove(hash)
|
||||
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
|
||||
}
|
||||
queuedRateLimitMeter.Mark(int64(len(caps)))
|
||||
// Mark all the items dropped as removed
|
||||
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
|
||||
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
|
||||
if pool.locals.contains(addr) {
|
||||
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
|
||||
}
|
||||
|
||||
// Delete the entire queue entry if it became empty.
|
||||
if list.Empty() {
|
||||
delete(pool.queue, addr)
|
||||
|
|
@ -1734,14 +1601,14 @@ func (pool *LegacyPool) truncatePending() {
|
|||
spammers := prque.New[int64, common.Address](nil)
|
||||
for addr, list := range pool.pending {
|
||||
// Only evict transactions from high rollers
|
||||
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
|
||||
if uint64(list.Len()) > pool.config.AccountSlots {
|
||||
spammers.Push(addr, int64(list.Len()))
|
||||
}
|
||||
}
|
||||
// Gradually drop transactions from offenders
|
||||
offenders := []common.Address{}
|
||||
for pending > pool.config.GlobalSlots && !spammers.Empty() {
|
||||
// Retrieve the next offender if not local address
|
||||
// Retrieve the next offender
|
||||
offender, _ := spammers.Pop()
|
||||
offenders = append(offenders, offender)
|
||||
|
||||
|
|
@ -1767,9 +1634,6 @@ func (pool *LegacyPool) truncatePending() {
|
|||
}
|
||||
pool.priced.Removed(len(caps))
|
||||
pendingGauge.Dec(int64(len(caps)))
|
||||
if pool.locals.contains(offenders[i]) {
|
||||
localGauge.Dec(int64(len(caps)))
|
||||
}
|
||||
pending--
|
||||
}
|
||||
}
|
||||
|
|
@ -1794,9 +1658,6 @@ func (pool *LegacyPool) truncatePending() {
|
|||
}
|
||||
pool.priced.Removed(len(caps))
|
||||
pendingGauge.Dec(int64(len(caps)))
|
||||
if pool.locals.contains(addr) {
|
||||
localGauge.Dec(int64(len(caps)))
|
||||
}
|
||||
pending--
|
||||
}
|
||||
}
|
||||
|
|
@ -1817,9 +1678,7 @@ func (pool *LegacyPool) truncateQueue() {
|
|||
// Sort all accounts with queued transactions by heartbeat
|
||||
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
|
||||
for addr := range pool.queue {
|
||||
if !pool.locals.contains(addr) { // don't drop locals
|
||||
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
|
||||
}
|
||||
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
|
||||
}
|
||||
sort.Sort(sort.Reverse(addresses))
|
||||
|
||||
|
|
@ -1887,13 +1746,11 @@ func (pool *LegacyPool) demoteUnexecutables() {
|
|||
log.Trace("Demoting pending transaction", "hash", hash)
|
||||
|
||||
// Internal shuffle shouldn't touch the lookup set.
|
||||
pool.enqueueTx(hash, tx, false, false)
|
||||
pool.enqueueTx(hash, tx, false)
|
||||
}
|
||||
pool.priced.Removed(len(olds) + len(drops))
|
||||
pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
|
||||
if pool.locals.contains(addr) {
|
||||
localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
|
||||
}
|
||||
|
||||
// If there's a gap in front, alert (should never happen) and postpone all transactions
|
||||
if list.Len() > 0 && list.txs.Get(nonce) == nil {
|
||||
gapped := list.Cap(0)
|
||||
|
|
@ -1902,7 +1759,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
|
|||
log.Warn("Demoting invalidated transaction", "hash", hash)
|
||||
|
||||
// Internal shuffle shouldn't touch the lookup set.
|
||||
pool.enqueueTx(hash, tx, false, false)
|
||||
pool.enqueueTx(hash, tx, false)
|
||||
}
|
||||
pendingGauge.Dec(int64(len(gapped)))
|
||||
}
|
||||
|
|
@ -1960,25 +1817,6 @@ func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
|
|||
return as
|
||||
}
|
||||
|
||||
// contains checks if a given address is contained within the set.
|
||||
func (as *accountSet) contains(addr common.Address) bool {
|
||||
_, exist := as.accounts[addr]
|
||||
return exist
|
||||
}
|
||||
|
||||
func (as *accountSet) empty() bool {
|
||||
return len(as.accounts) == 0
|
||||
}
|
||||
|
||||
// containsTx checks if the sender of a given tx is within the set. If the sender
|
||||
// cannot be derived, this method returns false.
|
||||
func (as *accountSet) containsTx(tx *types.Transaction) bool {
|
||||
if addr, err := types.Sender(as.signer, tx); err == nil {
|
||||
return as.contains(addr)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// add inserts a new address into the set to track.
|
||||
func (as *accountSet) add(addr common.Address) {
|
||||
as.accounts[addr] = struct{}{}
|
||||
|
|
@ -2008,23 +1846,19 @@ func (as *accountSet) merge(other *accountSet) {
|
|||
as.cache = nil
|
||||
}
|
||||
|
||||
// lookup is used internally by TxPool to track transactions while allowing
|
||||
// lookup is used internally by LegacyPool to track transactions while allowing
|
||||
// lookup without mutex contention.
|
||||
//
|
||||
// Note, although this type is properly protected against concurrent access, it
|
||||
// is **not** a type that should ever be mutated or even exposed outside of the
|
||||
// transaction pool, since its internal state is tightly coupled with the pools
|
||||
// internal mechanisms. The sole purpose of the type is to permit out-of-bound
|
||||
// peeking into the pool in TxPool.Get without having to acquire the widely scoped
|
||||
// TxPool.mu mutex.
|
||||
//
|
||||
// This lookup set combines the notion of "local transactions", which is useful
|
||||
// to build upper-level structure.
|
||||
// peeking into the pool in LegacyPool.Get without having to acquire the widely scoped
|
||||
// LegacyPool.mu mutex.
|
||||
type lookup struct {
|
||||
slots int
|
||||
lock sync.RWMutex
|
||||
locals map[common.Hash]*types.Transaction
|
||||
remotes map[common.Hash]*types.Transaction
|
||||
slots int
|
||||
lock sync.RWMutex
|
||||
txs map[common.Hash]*types.Transaction
|
||||
|
||||
auths map[common.Address][]common.Hash // All accounts with a pooled authorization
|
||||
}
|
||||
|
|
@ -2032,31 +1866,21 @@ type lookup struct {
|
|||
// newLookup returns a new lookup structure.
|
||||
func newLookup() *lookup {
|
||||
return &lookup{
|
||||
locals: make(map[common.Hash]*types.Transaction),
|
||||
remotes: make(map[common.Hash]*types.Transaction),
|
||||
auths: make(map[common.Address][]common.Hash),
|
||||
txs: make(map[common.Hash]*types.Transaction),
|
||||
auths: make(map[common.Address][]common.Hash),
|
||||
}
|
||||
}
|
||||
|
||||
// Range calls f on each key and value present in the map. The callback passed
|
||||
// should return the indicator whether the iteration needs to be continued.
|
||||
// Callers need to specify which set (or both) to be iterated.
|
||||
func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction, local bool) bool, local bool, remote bool) {
|
||||
func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
if local {
|
||||
for key, value := range t.locals {
|
||||
if !f(key, value, true) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if remote {
|
||||
for key, value := range t.remotes {
|
||||
if !f(key, value, false) {
|
||||
return
|
||||
}
|
||||
for key, value := range t.txs {
|
||||
if !f(key, value) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -2066,26 +1890,7 @@ func (t *lookup) Get(hash common.Hash) *types.Transaction {
|
|||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
if tx := t.locals[hash]; tx != nil {
|
||||
return tx
|
||||
}
|
||||
return t.remotes[hash]
|
||||
}
|
||||
|
||||
// GetLocal returns a transaction if it exists in the lookup, or nil if not found.
|
||||
func (t *lookup) GetLocal(hash common.Hash) *types.Transaction {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
return t.locals[hash]
|
||||
}
|
||||
|
||||
// GetRemote returns a transaction if it exists in the lookup, or nil if not found.
|
||||
func (t *lookup) GetRemote(hash common.Hash) *types.Transaction {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
return t.remotes[hash]
|
||||
return t.txs[hash]
|
||||
}
|
||||
|
||||
// Count returns the current number of transactions in the lookup.
|
||||
|
|
@ -2093,23 +1898,7 @@ func (t *lookup) Count() int {
|
|||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
return len(t.locals) + len(t.remotes)
|
||||
}
|
||||
|
||||
// LocalCount returns the current number of local transactions in the lookup.
|
||||
func (t *lookup) LocalCount() int {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
return len(t.locals)
|
||||
}
|
||||
|
||||
// RemoteCount returns the current number of remote transactions in the lookup.
|
||||
func (t *lookup) RemoteCount() int {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
|
||||
return len(t.remotes)
|
||||
return len(t.txs)
|
||||
}
|
||||
|
||||
// Slots returns the current number of slots used in the lookup.
|
||||
|
|
@ -2121,18 +1910,14 @@ func (t *lookup) Slots() int {
|
|||
}
|
||||
|
||||
// Add adds a transaction to the lookup.
|
||||
func (t *lookup) Add(tx *types.Transaction, local bool) {
|
||||
func (t *lookup) Add(tx *types.Transaction) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
t.slots += numSlots(tx)
|
||||
slotsGauge.Update(int64(t.slots))
|
||||
|
||||
if local {
|
||||
t.locals[tx.Hash()] = tx
|
||||
} else {
|
||||
t.remotes[tx.Hash()] = tx
|
||||
}
|
||||
t.txs[tx.Hash()] = tx
|
||||
t.addAuthorities(tx)
|
||||
}
|
||||
|
||||
|
|
@ -2142,10 +1927,7 @@ func (t *lookup) Remove(hash common.Hash) {
|
|||
defer t.lock.Unlock()
|
||||
|
||||
t.removeAuthorities(hash)
|
||||
tx, ok := t.locals[hash]
|
||||
if !ok {
|
||||
tx, ok = t.remotes[hash]
|
||||
}
|
||||
tx, ok := t.txs[hash]
|
||||
if !ok {
|
||||
log.Error("No transaction found to be deleted", "hash", hash)
|
||||
return
|
||||
|
|
@ -2153,36 +1935,18 @@ func (t *lookup) Remove(hash common.Hash) {
|
|||
t.slots -= numSlots(tx)
|
||||
slotsGauge.Update(int64(t.slots))
|
||||
|
||||
delete(t.locals, hash)
|
||||
delete(t.remotes, hash)
|
||||
delete(t.txs, hash)
|
||||
}
|
||||
|
||||
// RemoteToLocals migrates the transactions belongs to the given locals to locals
|
||||
// set. The assumption is held the locals set is thread-safe to be used.
|
||||
func (t *lookup) RemoteToLocals(locals *accountSet) int {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
var migrated int
|
||||
for hash, tx := range t.remotes {
|
||||
if locals.containsTx(tx) {
|
||||
t.locals[hash] = tx
|
||||
delete(t.remotes, hash)
|
||||
migrated += 1
|
||||
}
|
||||
}
|
||||
return migrated
|
||||
}
|
||||
|
||||
// RemotesBelowTip finds all remote transactions below the given tip threshold.
|
||||
func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions {
|
||||
// TxsBelowTip finds all remote transactions below the given tip threshold.
|
||||
func (t *lookup) TxsBelowTip(threshold *big.Int) types.Transactions {
|
||||
found := make(types.Transactions, 0, 128)
|
||||
t.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
|
||||
t.Range(func(hash common.Hash, tx *types.Transaction) bool {
|
||||
if tx.GasTipCapIntCmp(threshold) < 0 {
|
||||
found = append(found, tx)
|
||||
}
|
||||
return true
|
||||
}, false, true) // Only iterate remotes
|
||||
})
|
||||
return found
|
||||
}
|
||||
|
||||
|
|
@ -2257,11 +2021,7 @@ func (pool *LegacyPool) Clear() {
|
|||
// The transaction addition may attempt to reserve the sender addr which
|
||||
// can't happen until Clear releases the reservation lock. Clear cannot
|
||||
// acquire the subpool lock until the transaction addition is completed.
|
||||
for _, tx := range pool.all.locals {
|
||||
senderAddr, _ := types.Sender(pool.signer, tx)
|
||||
pool.reserver.Release(senderAddr)
|
||||
}
|
||||
for _, tx := range pool.all.remotes {
|
||||
for _, tx := range pool.all.txs {
|
||||
senderAddr, _ := types.Sender(pool.signer, tx)
|
||||
pool.reserver.Release(senderAddr)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ import (
|
|||
"math"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -217,7 +216,7 @@ func validatePoolInternals(pool *LegacyPool) error {
|
|||
return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued)
|
||||
}
|
||||
pool.priced.Reheap()
|
||||
priced, remote := pool.priced.urgent.Len()+pool.priced.floating.Len(), pool.all.RemoteCount()
|
||||
priced, remote := pool.priced.urgent.Len()+pool.priced.floating.Len(), pool.all.Count()
|
||||
if priced != remote {
|
||||
return fmt.Errorf("total priced transaction count %d != %d", priced, remote)
|
||||
}
|
||||
|
|
@ -293,7 +292,7 @@ func TestPromoteSpecialTxUpdatesTotalCost(t *testing.T) {
|
|||
if !inserted {
|
||||
t.Fatal("failed to insert baseline transaction")
|
||||
}
|
||||
if _, err := pool.promoteSpecialTx(addr, special, false); err != nil {
|
||||
if _, err := pool.promoteSpecialTx(addr, special); err != nil {
|
||||
t.Fatalf("promoteSpecialTx failed: %v", err)
|
||||
}
|
||||
want, overflow := uint256.FromBig(special.Cost())
|
||||
|
|
@ -383,7 +382,7 @@ func TestPromoteSpecialTxReplacementAvoidsIntermediateOverflow(t *testing.T) {
|
|||
if !inserted {
|
||||
t.Fatal("failed to insert baseline transaction")
|
||||
}
|
||||
inserted, err = pool.promoteSpecialTx(addr, special, false)
|
||||
inserted, err = pool.promoteSpecialTx(addr, special)
|
||||
if err != nil {
|
||||
t.Fatalf("promoteSpecialTx failed: %v", err)
|
||||
}
|
||||
|
|
@ -421,7 +420,7 @@ func TestPromoteSpecialTxOverflowReturnsErrorWithoutMutation(t *testing.T) {
|
|||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
inserted, err := pool.promoteSpecialTx(addr, special, false)
|
||||
inserted, err := pool.promoteSpecialTx(addr, special)
|
||||
if inserted {
|
||||
t.Fatal("overflowing special tx must not be inserted")
|
||||
}
|
||||
|
|
@ -558,10 +557,6 @@ func TestInvalidTransactions(t *testing.T) {
|
|||
if err, want := pool.addRemote(tx), txpool.ErrUnderpriced; !errors.Is(err, want) {
|
||||
t.Errorf("want %v have %v", want, err)
|
||||
}
|
||||
// Local transactions should be accepted even if underpriced
|
||||
if err := pool.addLocal(tx); err != nil {
|
||||
t.Error("expected", nil, "got", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueue(t *testing.T) {
|
||||
|
|
@ -575,7 +570,7 @@ func TestQueue(t *testing.T) {
|
|||
testAddBalance(pool, from, big.NewInt(1000))
|
||||
<-pool.requestReset(nil, nil)
|
||||
|
||||
pool.enqueueTx(tx.Hash(), tx, false, true)
|
||||
pool.enqueueTx(tx.Hash(), tx, true)
|
||||
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
|
||||
if len(pool.pending) != 1 {
|
||||
t.Error("expected valid txs to be 1 is", len(pool.pending))
|
||||
|
|
@ -584,7 +579,7 @@ func TestQueue(t *testing.T) {
|
|||
tx = transaction(1, 100, key)
|
||||
from, _ = deriveSender(tx)
|
||||
testSetNonce(pool, from, 2)
|
||||
pool.enqueueTx(tx.Hash(), tx, false, true)
|
||||
pool.enqueueTx(tx.Hash(), tx, true)
|
||||
|
||||
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
|
||||
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
|
||||
|
|
@ -609,9 +604,9 @@ func TestQueue2(t *testing.T) {
|
|||
testAddBalance(pool, from, big.NewInt(1000))
|
||||
pool.reset(nil, nil)
|
||||
|
||||
pool.enqueueTx(tx1.Hash(), tx1, false, true)
|
||||
pool.enqueueTx(tx2.Hash(), tx2, false, true)
|
||||
pool.enqueueTx(tx3.Hash(), tx3, false, true)
|
||||
pool.enqueueTx(tx1.Hash(), tx1, true)
|
||||
pool.enqueueTx(tx2.Hash(), tx2, true)
|
||||
pool.enqueueTx(tx3.Hash(), tx3, true)
|
||||
|
||||
pool.promoteExecutables([]common.Address{from})
|
||||
if len(pool.pending) != 1 {
|
||||
|
|
@ -693,7 +688,7 @@ func TestValidateTransactionEIP2681(t *testing.T) {
|
|||
GasPrice: tt.gasPrice,
|
||||
})
|
||||
signedTx, _ := types.SignTx(tx, types.HomesteadSigner{}, key)
|
||||
err := pool.validateTxBasics(signedTx, true)
|
||||
err := pool.validateTxBasics(signedTx)
|
||||
|
||||
if tt.wantErr == nil && err != nil {
|
||||
t.Errorf("expected nil, got %v", err)
|
||||
|
|
@ -754,14 +749,14 @@ func TestChainFork(t *testing.T) {
|
|||
resetState()
|
||||
|
||||
tx := pricedTransaction(0, 100000, big.NewInt(300000000), key)
|
||||
if _, err := pool.add(tx, false); err != nil {
|
||||
if _, err := pool.add(tx); err != nil {
|
||||
t.Error("didn't expect error", err)
|
||||
}
|
||||
pool.removeTx(tx.Hash(), true, true)
|
||||
|
||||
// reset the pool's internal state
|
||||
resetState()
|
||||
if _, err := pool.add(tx, false); err != nil {
|
||||
if _, err := pool.add(tx); err != nil {
|
||||
t.Error("didn't expect error", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -790,10 +785,10 @@ func TestDoubleNonce(t *testing.T) {
|
|||
tx3, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 1000000, big.NewInt(250000000), nil), signer, key)
|
||||
|
||||
// Add the first two transaction, ensure higher priced stays only
|
||||
if replace, err := pool.add(tx1, false); err != nil || replace {
|
||||
if replace, err := pool.add(tx1); err != nil || replace {
|
||||
t.Errorf("first transaction insert failed (%v) or reported replacement (%v)", err, replace)
|
||||
}
|
||||
if replace, err := pool.add(tx2, false); err != nil || !replace {
|
||||
if replace, err := pool.add(tx2); err != nil || !replace {
|
||||
t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace)
|
||||
}
|
||||
<-pool.requestPromoteExecutables(newAccountSet(signer, addr))
|
||||
|
|
@ -805,7 +800,7 @@ func TestDoubleNonce(t *testing.T) {
|
|||
}
|
||||
|
||||
// Add the third transaction and ensure it's not saved (smaller price)
|
||||
pool.add(tx3, false)
|
||||
pool.add(tx3)
|
||||
<-pool.requestPromoteExecutables(newAccountSet(signer, addr))
|
||||
if pool.pending[addr].Len() != 1 {
|
||||
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
|
||||
|
|
@ -828,7 +823,7 @@ func TestMissingNonce(t *testing.T) {
|
|||
addr := crypto.PubkeyToAddress(key.PublicKey)
|
||||
testAddBalance(pool, addr, big.NewInt(100000000000000))
|
||||
tx := pricedTransaction(1, 100000, big.NewInt(300000000), key)
|
||||
if _, err := pool.add(tx, false); err != nil {
|
||||
if _, err := pool.add(tx); err != nil {
|
||||
t.Error("didn't expect error", err)
|
||||
}
|
||||
if len(pool.pending) != 0 {
|
||||
|
|
@ -887,21 +882,21 @@ func TestDropping(t *testing.T) {
|
|||
tx11 = transaction(11, 200, key)
|
||||
tx12 = transaction(12, 300, key)
|
||||
)
|
||||
pool.all.Add(tx0, false)
|
||||
pool.priced.Put(tx0, false)
|
||||
pool.all.Add(tx0)
|
||||
pool.priced.Put(tx0)
|
||||
pool.promoteTx(account, tx0.Hash(), tx0)
|
||||
|
||||
pool.all.Add(tx1, false)
|
||||
pool.priced.Put(tx1, false)
|
||||
pool.all.Add(tx1)
|
||||
pool.priced.Put(tx1)
|
||||
pool.promoteTx(account, tx1.Hash(), tx1)
|
||||
|
||||
pool.all.Add(tx2, false)
|
||||
pool.priced.Put(tx2, false)
|
||||
pool.all.Add(tx2)
|
||||
pool.priced.Put(tx2)
|
||||
pool.promoteTx(account, tx2.Hash(), tx2)
|
||||
|
||||
pool.enqueueTx(tx10.Hash(), tx10, false, true)
|
||||
pool.enqueueTx(tx11.Hash(), tx11, false, true)
|
||||
pool.enqueueTx(tx12.Hash(), tx12, false, true)
|
||||
pool.enqueueTx(tx10.Hash(), tx10, true)
|
||||
pool.enqueueTx(tx11.Hash(), tx11, true)
|
||||
pool.enqueueTx(tx12.Hash(), tx12, true)
|
||||
|
||||
// Check that pre and post validations leave the pool as is
|
||||
if pool.pending[account].Len() != 3 {
|
||||
|
|
@ -1178,14 +1173,6 @@ func TestQueueAccountLimiting(t *testing.T) {
|
|||
// This logic should not hold for local transactions, unless the local tracking
|
||||
// mechanism is disabled.
|
||||
func TestQueueGlobalLimiting(t *testing.T) {
|
||||
testQueueGlobalLimiting(t, false)
|
||||
}
|
||||
|
||||
func TestQueueGlobalLimitingNoLocals(t *testing.T) {
|
||||
testQueueGlobalLimiting(t, true)
|
||||
}
|
||||
|
||||
func testQueueGlobalLimiting(t *testing.T, nolocals bool) {
|
||||
t.Parallel()
|
||||
|
||||
// Create the pool to test the limit enforcement with
|
||||
|
|
@ -1193,7 +1180,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) {
|
|||
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
|
||||
|
||||
config := testTxPoolConfig
|
||||
config.NoLocals = nolocals
|
||||
config.NoLocals = true
|
||||
config.AccountQueue = 1
|
||||
config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible)
|
||||
|
||||
|
|
@ -1207,7 +1194,6 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) {
|
|||
keys[i], _ = crypto.GenerateKey()
|
||||
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(100000000000000))
|
||||
}
|
||||
local := keys[len(keys)-1]
|
||||
|
||||
// Generate and queue a batch of transactions
|
||||
nonces := make(map[common.Address]uint64)
|
||||
|
|
@ -1233,35 +1219,6 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) {
|
|||
if queued > int(config.GlobalQueue) {
|
||||
t.Fatalf("total transactions overflow allowance: %d > %d", queued, config.GlobalQueue)
|
||||
}
|
||||
// Generate a batch of transactions from the local account and import them
|
||||
txs = txs[:0]
|
||||
for i := uint64(0); i < 3*config.GlobalQueue; i++ {
|
||||
txs = append(txs, pricedTransaction(i+1, 100000, big.NewInt(300000000), local))
|
||||
}
|
||||
pool.addLocals(txs)
|
||||
|
||||
// If locals are disabled, the previous eviction algorithm should apply here too
|
||||
if nolocals {
|
||||
queued := 0
|
||||
for addr, list := range pool.queue {
|
||||
if list.Len() > int(config.AccountQueue) {
|
||||
t.Errorf("addr %x: queued accounts overflown allowance: %d > %d", addr, list.Len(), config.AccountQueue)
|
||||
}
|
||||
queued += list.Len()
|
||||
}
|
||||
if queued > int(config.GlobalQueue) {
|
||||
t.Fatalf("total transactions overflow allowance: %d > %d", queued, config.GlobalQueue)
|
||||
}
|
||||
} else {
|
||||
// Local exemptions are enabled, make sure the local account owned the queue
|
||||
if len(pool.queue) != 1 {
|
||||
t.Errorf("multiple accounts in queue: have %v, want %v", len(pool.queue), 1)
|
||||
}
|
||||
// Also ensure no local transactions are ever dropped, even if above global limits
|
||||
if queued := pool.queue[crypto.PubkeyToAddress(local.PublicKey)].Len(); uint64(queued) != 3*config.GlobalQueue {
|
||||
t.Fatalf("local account queued transaction count mismatch: have %v, want %v", queued, 3*config.GlobalQueue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that if an account remains idle for a prolonged amount of time, any
|
||||
|
|
@ -1270,12 +1227,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) {
|
|||
//
|
||||
// This logic should not hold for local transactions, unless the local tracking
|
||||
// mechanism is disabled.
|
||||
func TestQueueTimeLimiting(t *testing.T) { testQueueTimeLimiting(t, false) }
|
||||
func TestQueueTimeLimitingNoLocals(t *testing.T) {
|
||||
testQueueTimeLimiting(t, true)
|
||||
}
|
||||
|
||||
func testQueueTimeLimiting(t *testing.T, nolocals bool) {
|
||||
func TestQueueTimeLimiting(t *testing.T) {
|
||||
// Reduce the eviction interval to a testable amount
|
||||
defer func(old time.Duration) { evictionInterval = old }(evictionInterval)
|
||||
evictionInterval = time.Millisecond * 100
|
||||
|
|
@ -1286,23 +1238,17 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
|
|||
|
||||
config := testTxPoolConfig
|
||||
config.Lifetime = time.Second
|
||||
config.NoLocals = nolocals
|
||||
|
||||
pool := New(config, blockchain)
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create two test accounts to ensure remotes expire but locals do not
|
||||
local, _ := crypto.GenerateKey()
|
||||
// Create a test account to ensure remotes expire
|
||||
remote, _ := crypto.GenerateKey()
|
||||
|
||||
testAddBalance(pool, crypto.PubkeyToAddress(local.PublicKey), big.NewInt(100000000000000))
|
||||
testAddBalance(pool, crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(100000000000000))
|
||||
|
||||
// Add the two transactions and ensure they both are queued up
|
||||
if err := pool.addLocal(pricedTransaction(1, 100000, big.NewInt(300000000), local)); err != nil {
|
||||
t.Fatalf("failed to add local transaction: %v", err)
|
||||
}
|
||||
// Add the transaction and ensure it is queued up
|
||||
if err := pool.addRemote(pricedTransaction(1, 100000, big.NewInt(300000000), remote)); err != nil {
|
||||
t.Fatalf("failed to add remote transaction: %v", err)
|
||||
}
|
||||
|
|
@ -1310,8 +1256,8 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
|
|||
if pending != 0 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
|
||||
}
|
||||
if queued != 2 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
|
||||
if queued != 1 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
|
|
@ -1325,8 +1271,8 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
|
|||
if pending != 0 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
|
||||
}
|
||||
if queued != 2 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
|
||||
if queued != 1 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
|
|
@ -1339,14 +1285,8 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
|
|||
if pending != 0 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
|
||||
}
|
||||
if nolocals {
|
||||
if queued != 0 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
|
||||
}
|
||||
} else {
|
||||
if queued != 1 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
|
||||
}
|
||||
if queued != 0 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
|
|
@ -1354,7 +1294,6 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
|
|||
|
||||
// remove current transactions and increase nonce to prepare for a reset and cleanup
|
||||
statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 2)
|
||||
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
|
||||
<-pool.requestReset(nil, nil)
|
||||
|
||||
// make sure queue, pending are cleared
|
||||
|
|
@ -1370,18 +1309,12 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
|
|||
}
|
||||
|
||||
// Queue gapped transactions
|
||||
if err := pool.addLocal(pricedTransaction(4, 100000, big.NewInt(300000000), local)); err != nil {
|
||||
t.Fatalf("failed to add remote transaction: %v", err)
|
||||
}
|
||||
if err := pool.addRemoteSync(pricedTransaction(4, 100000, big.NewInt(300000000), remote)); err != nil {
|
||||
t.Fatalf("failed to add remote transaction: %v", err)
|
||||
}
|
||||
time.Sleep(5 * evictionInterval) // A half lifetime pass
|
||||
|
||||
// Queue executable transactions, the life cycle should be restarted.
|
||||
if err := pool.addLocal(pricedTransaction(2, 100000, big.NewInt(300000000), local)); err != nil {
|
||||
t.Fatalf("failed to add remote transaction: %v", err)
|
||||
}
|
||||
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(300000000), remote)); err != nil {
|
||||
t.Fatalf("failed to add remote transaction: %v", err)
|
||||
}
|
||||
|
|
@ -1389,11 +1322,11 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
|
|||
|
||||
// All gapped transactions shouldn't be kicked out
|
||||
pending, queued = pool.Stats()
|
||||
if pending != 2 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
|
||||
if pending != 1 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
|
||||
}
|
||||
if queued != 2 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
|
||||
if queued != 1 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
|
|
@ -1402,17 +1335,11 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
|
|||
// The whole life time pass after last promotion, kick out stale transactions
|
||||
time.Sleep(2 * config.Lifetime)
|
||||
pending, queued = pool.Stats()
|
||||
if pending != 2 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
|
||||
if pending != 1 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
|
||||
}
|
||||
if nolocals {
|
||||
if queued != 0 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
|
||||
}
|
||||
} else {
|
||||
if queued != 1 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
|
||||
}
|
||||
if queued != 0 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
|
|
@ -1675,7 +1602,7 @@ func TestRepricing(t *testing.T) {
|
|||
defer sub.Unsubscribe()
|
||||
|
||||
// Create a number of test accounts and fund them
|
||||
keys := make([]*ecdsa.PrivateKey, 4)
|
||||
keys := make([]*ecdsa.PrivateKey, 3)
|
||||
for i := 0; i < len(keys); i++ {
|
||||
keys[i], _ = crypto.GenerateKey()
|
||||
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(200000000000000))
|
||||
|
|
@ -1695,20 +1622,17 @@ func TestRepricing(t *testing.T) {
|
|||
txs = append(txs, pricedTransaction(2, 100000, big.NewInt(250000000), keys[2]))
|
||||
txs = append(txs, pricedTransaction(3, 100000, big.NewInt(500000000), keys[2]))
|
||||
|
||||
ltx := pricedTransaction(0, 100000, big.NewInt(250000000), keys[3])
|
||||
|
||||
// Import the batch and that both pending and queued transactions match up
|
||||
pool.addRemotesSync(txs)
|
||||
pool.addLocal(ltx)
|
||||
|
||||
pending, queued := pool.Stats()
|
||||
if pending != 7 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 7)
|
||||
if pending != 6 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 6)
|
||||
}
|
||||
if queued != 3 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
|
||||
}
|
||||
if err := validateEvents(events, 7); err != nil {
|
||||
if err := validateEvents(events, 6); err != nil {
|
||||
t.Fatalf("original event firing failed: %v", err)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
|
|
@ -1718,8 +1642,8 @@ func TestRepricing(t *testing.T) {
|
|||
pool.SetGasTip(big.NewInt(500000000))
|
||||
|
||||
pending, queued = pool.Stats()
|
||||
if pending != 2 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
|
||||
if pending != 1 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
|
||||
}
|
||||
if queued != 5 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 5)
|
||||
|
|
@ -1746,21 +1670,7 @@ func TestRepricing(t *testing.T) {
|
|||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
}
|
||||
// However we can add local underpriced transactions
|
||||
tx := pricedTransaction(1, 100000, big.NewInt(250000000), keys[3])
|
||||
if err := pool.addLocal(tx); err != nil {
|
||||
t.Fatalf("failed to add underpriced local transaction: %v", err)
|
||||
}
|
||||
if pending, _ = pool.Stats(); pending != 3 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
|
||||
}
|
||||
if err := validateEvents(events, 1); err != nil {
|
||||
t.Fatalf("post-reprice local event firing failed: %v", err)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
}
|
||||
// And we can fill gaps with properly priced transactions
|
||||
// we can fill gaps with properly priced transactions
|
||||
if err := pool.addRemote(pricedTransaction(1, 100000, big.NewInt(500000000), keys[0])); err != nil {
|
||||
t.Fatalf("failed to add pending transaction: %v", err)
|
||||
}
|
||||
|
|
@ -1802,29 +1712,16 @@ func TestMinGasPriceEnforced(t *testing.T) {
|
|||
tx := pricedTransaction(0, 100000, legacyPrice, key)
|
||||
pool.SetGasTip(new(big.Int).Add(legacyPrice, big.NewInt(1)))
|
||||
|
||||
if err := pool.addLocal(tx); !errors.Is(err, txpool.ErrUnderpriced) {
|
||||
t.Fatalf("Min tip not enforced")
|
||||
}
|
||||
|
||||
if err := pool.Add([]*types.Transaction{tx}, true, false)[0]; !errors.Is(err, txpool.ErrUnderpriced) {
|
||||
if err := pool.Add([]*types.Transaction{tx}, false)[0]; !errors.Is(err, txpool.ErrUnderpriced) {
|
||||
t.Fatalf("Min tip not enforced")
|
||||
}
|
||||
|
||||
tx = dynamicFeeTx(0, 100000, dynamicFeeCap, dynamicTip, key)
|
||||
pool.SetGasTip(new(big.Int).Add(dynamicTip, big.NewInt(1)))
|
||||
|
||||
if err := pool.addLocal(tx); !errors.Is(err, txpool.ErrUnderpriced) {
|
||||
if err := pool.Add([]*types.Transaction{tx}, true)[0]; !errors.Is(err, txpool.ErrUnderpriced) {
|
||||
t.Fatalf("Min tip not enforced")
|
||||
}
|
||||
|
||||
if err := pool.Add([]*types.Transaction{tx}, true, false)[0]; !errors.Is(err, txpool.ErrUnderpriced) {
|
||||
t.Fatalf("Min tip not enforced")
|
||||
}
|
||||
// Make sure the tx is accepted if locals are enabled
|
||||
pool.config.NoLocals = false
|
||||
if err := pool.Add([]*types.Transaction{tx}, true, false)[0]; err != nil {
|
||||
t.Fatalf("Min tip enforced with locals enabled, error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that setting the transaction pool gas price to a higher value correctly
|
||||
|
|
@ -1865,20 +1762,17 @@ func TestRepricingDynamicFee(t *testing.T) {
|
|||
txs = append(txs, dynamicFeeTx(2, 100000, big.NewInt(300000000), big.NewInt(300000000), keys[2]))
|
||||
txs = append(txs, dynamicFeeTx(3, 100000, big.NewInt(350000000), big.NewInt(350000000), keys[2]))
|
||||
|
||||
ltx := dynamicFeeTx(0, 100000, big.NewInt(350000000), big.NewInt(300000000), keys[3])
|
||||
|
||||
// Import the batch and that both pending and queued transactions match up
|
||||
pool.addRemotesSync(txs)
|
||||
pool.addLocal(ltx)
|
||||
|
||||
pending, queued := pool.Stats()
|
||||
if pending != 7 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 7)
|
||||
if pending != 6 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 6)
|
||||
}
|
||||
if queued != 3 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
|
||||
}
|
||||
if err := validateEvents(events, 7); err != nil {
|
||||
if err := validateEvents(events, 6); err != nil {
|
||||
t.Fatalf("original event firing failed: %v", err)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
|
|
@ -1888,8 +1782,8 @@ func TestRepricingDynamicFee(t *testing.T) {
|
|||
pool.SetGasTip(big.NewInt(350000000))
|
||||
|
||||
pending, queued = pool.Stats()
|
||||
if pending != 2 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
|
||||
if pending != 1 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
|
||||
}
|
||||
if queued != 5 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 5)
|
||||
|
|
@ -1919,20 +1813,7 @@ func TestRepricingDynamicFee(t *testing.T) {
|
|||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
}
|
||||
// However we can add local underpriced transactions
|
||||
tx = dynamicFeeTx(1, 100000, big.NewInt(300000000), big.NewInt(300000000), keys[3])
|
||||
if err := pool.addLocal(tx); err != nil {
|
||||
t.Fatalf("failed to add underpriced local transaction: %v", err)
|
||||
}
|
||||
if pending, _ = pool.Stats(); pending != 3 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
|
||||
}
|
||||
if err := validateEvents(events, 1); err != nil {
|
||||
t.Fatalf("post-reprice local event firing failed: %v", err)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
}
|
||||
|
||||
// And we can fill gaps with properly priced transactions
|
||||
tx = pricedTransaction(1, 100000, big.NewInt(350000000), keys[0])
|
||||
if err := pool.addRemote(tx); err != nil {
|
||||
|
|
@ -1954,78 +1835,6 @@ func TestRepricingDynamicFee(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Tests that setting the transaction pool gas price to a higher value does not
|
||||
// remove local transactions (legacy & dynamic fee).
|
||||
func TestRepricingKeepsLocals(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create the pool to test the pricing enforcement with
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
|
||||
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := New(testTxPoolConfig, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create a number of test accounts and fund them
|
||||
keys := make([]*ecdsa.PrivateKey, 3)
|
||||
for i := 0; i < len(keys); i++ {
|
||||
keys[i], _ = crypto.GenerateKey()
|
||||
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1500000000000000))
|
||||
}
|
||||
// Create transaction (both pending and queued) with a linearly growing gasprice
|
||||
// common.LimitThresholdNonceInQueue = 10
|
||||
for i := uint64(0); i < 5; i++ {
|
||||
// Add pending transaction.
|
||||
pendingTx := pricedTransaction(i, 100000, big.NewInt(int64((i+1)*250000000)), keys[2])
|
||||
if err := pool.addLocal(pendingTx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Add queued transaction.
|
||||
queuedTx := pricedTransaction(i+6, 100000, big.NewInt(int64((i+1)*250000000)), keys[2])
|
||||
if err := pool.addLocal(queuedTx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Add pending dynamic fee transaction.
|
||||
pendingTx = dynamicFeeTx(i, 100000, big.NewInt(int64((i+1)*250000000)), big.NewInt(int64((i+1)*250000000)), keys[1])
|
||||
if err := pool.addLocal(pendingTx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Add queued dynamic fee transaction.
|
||||
queuedTx = dynamicFeeTx(i+6, 100000, big.NewInt(int64((i+1)*250000000)), big.NewInt(int64((i+1)*250000000)), keys[1])
|
||||
if err := pool.addLocal(queuedTx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
pending, queued := pool.Stats()
|
||||
expPending, expQueued := 10, 10
|
||||
validate := func() {
|
||||
pending, queued = pool.Stats()
|
||||
if pending != expPending {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, expPending)
|
||||
}
|
||||
if queued != expQueued {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, expQueued)
|
||||
}
|
||||
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
}
|
||||
}
|
||||
validate()
|
||||
|
||||
// Reprice the pool and check that nothing is dropped
|
||||
pool.SetGasTip(big.NewInt(500000000))
|
||||
validate()
|
||||
|
||||
pool.SetGasTip(big.NewInt(500000000))
|
||||
pool.SetGasTip(big.NewInt(1000000000))
|
||||
pool.SetGasTip(big.NewInt(2000000000))
|
||||
pool.SetGasTip(big.NewInt(25000000000))
|
||||
validate()
|
||||
}
|
||||
|
||||
// Tests that when the pool reaches its global transaction limit, underpriced
|
||||
// transactions are gradually shifted out for more expensive ones and any gapped
|
||||
// pending transactions are moved into the queue.
|
||||
|
|
@ -2055,24 +1864,18 @@ func TestUnderpricing(t *testing.T) {
|
|||
keys := make([]*ecdsa.PrivateKey, 5)
|
||||
for i := 0; i < len(keys); i++ {
|
||||
keys[i], _ = crypto.GenerateKey()
|
||||
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(600000000000000))
|
||||
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(6000000000000000))
|
||||
}
|
||||
// Generate and queue a batch of transactions, both pending and queued
|
||||
// Gas prices follow an intentional stepped pattern (250M, 500M, 750M, 1000M, 1250M) to test
|
||||
// transaction replacement and eviction logic based on price competition. The 1:5 ratio
|
||||
// (250M to 1250M) ensures clear differentiation when testing underpricing behavior.
|
||||
txs := types.Transactions{}
|
||||
|
||||
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(250000000), keys[0]))
|
||||
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(500000000), keys[0]))
|
||||
|
||||
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(250000000), keys[1]))
|
||||
|
||||
ltx := pricedTransaction(0, 100000, big.NewInt(250000000), keys[2])
|
||||
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(250000000), keys[0])) // pending
|
||||
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(500000000), keys[0])) // pending
|
||||
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(250000000), keys[2])) // pending
|
||||
|
||||
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(250000000), keys[1])) // queued
|
||||
// Import the batch and that both pending and queued transactions match up
|
||||
pool.AddRemotes(txs)
|
||||
pool.addLocal(ltx)
|
||||
pool.addRemotesSync(txs)
|
||||
|
||||
pending, queued := pool.Stats()
|
||||
if pending != 3 {
|
||||
|
|
@ -2102,44 +1905,23 @@ func TestUnderpricing(t *testing.T) {
|
|||
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1000000000), keys[1])); err != nil { // +K1:2 => -K0:0 => Pend K1:0, K2:0; Que K0:1 K1:2
|
||||
t.Fatalf("failed to add well priced transaction: %v", err)
|
||||
}
|
||||
if err := pool.addRemote(pricedTransaction(3, 100000, big.NewInt(1250000000), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3
|
||||
if err := pool.addRemoteSync(pricedTransaction(3, 100000, big.NewInt(1250000000), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3
|
||||
t.Fatalf("failed to add well priced transaction: %v", err)
|
||||
}
|
||||
// Ensure nonce continuity for key[1] before the future-replacement assertion.
|
||||
if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1500000000), keys[1])); err != nil {
|
||||
t.Fatalf("failed to restore contiguous pending set: %v", err)
|
||||
}
|
||||
// Ensure that replacing a pending transaction with a future transaction fails
|
||||
if err := pool.addRemote(pricedTransaction(5, 100000, big.NewInt(1500000000), keys[1])); err != ErrFutureReplacePending {
|
||||
if err := pool.addRemoteSync(pricedTransaction(5, 100000, big.NewInt(1750000000), keys[1])); !errors.Is(err, ErrFutureReplacePending) {
|
||||
t.Fatalf("adding future replace transaction error mismatch: have %v, want %v", err, ErrFutureReplacePending)
|
||||
}
|
||||
pending, queued = pool.Stats()
|
||||
if pending != 2 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
|
||||
if pending != 4 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4)
|
||||
}
|
||||
if queued != 2 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
|
||||
}
|
||||
if err := validateEvents(events, 2); err != nil {
|
||||
t.Fatalf("additional event firing failed: %v", err)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
}
|
||||
// Ensure that adding local transactions can push out even higher priced ones
|
||||
ltx = pricedTransaction(1, 100000, big.NewInt(250000000), keys[2])
|
||||
if err := pool.addLocal(ltx); err != nil {
|
||||
t.Fatalf("failed to append underpriced local transaction: %v", err)
|
||||
}
|
||||
ltx = pricedTransaction(0, 100000, big.NewInt(250000000), keys[3])
|
||||
if err := pool.addLocal(ltx); err != nil {
|
||||
t.Fatalf("failed to add new underpriced local transaction: %v", err)
|
||||
}
|
||||
pending, queued = pool.Stats()
|
||||
if pending != 3 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
|
||||
}
|
||||
if queued != 1 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
|
||||
}
|
||||
if err := validateEvents(events, 2); err != nil {
|
||||
t.Fatalf("local event firing failed: %v", err)
|
||||
if queued != 0 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
|
|
@ -2218,8 +2000,6 @@ func TestStableUnderpricing(t *testing.T) {
|
|||
// Tests that when the pool reaches its global transaction limit, underpriced
|
||||
// transactions (legacy & dynamic fee) are gradually shifted out for more
|
||||
// expensive ones and any gapped pending transactions are moved into the queue.
|
||||
//
|
||||
// Note, local transactions are never allowed to be dropped.
|
||||
func TestUnderpricingDynamicFee(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
@ -2244,15 +2024,13 @@ func TestUnderpricingDynamicFee(t *testing.T) {
|
|||
// Generate and queue a batch of transactions, both pending and queued
|
||||
txs := types.Transactions{}
|
||||
|
||||
txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(270000000), big.NewInt(260000000), keys[0]))
|
||||
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(260000000), keys[0]))
|
||||
txs = append(txs, dynamicFeeTx(1, 100000, big.NewInt(260000000), big.NewInt(250000000), keys[1]))
|
||||
|
||||
ltx := dynamicFeeTx(0, 100000, big.NewInt(260000000), big.NewInt(250000000), keys[2])
|
||||
txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(270000000), big.NewInt(260000000), keys[0])) // pending
|
||||
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(260000000), keys[0])) // pending
|
||||
txs = append(txs, dynamicFeeTx(1, 100000, big.NewInt(260000000), big.NewInt(250000000), keys[1])) // queued
|
||||
txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(260000000), big.NewInt(250000000), keys[2])) // pending
|
||||
|
||||
// Import the batch and that both pending and queued transactions match up
|
||||
pool.AddRemotes(txs) // Pend K0:0, K0:1; Que K1:1
|
||||
pool.addLocal(ltx) // +K2:0 => Pend K0:0, K0:1, K2:0; Que K1:1
|
||||
pool.addRemotesSync(txs) // Pend K0:0, K0:1; Que K1:1
|
||||
|
||||
pending, queued := pool.Stats()
|
||||
if pending != 3 {
|
||||
|
|
@ -2270,13 +2048,13 @@ func TestUnderpricingDynamicFee(t *testing.T) {
|
|||
|
||||
// Ensure that adding an underpriced transaction fails
|
||||
tx := dynamicFeeTx(0, 100000, big.NewInt(260000000), big.NewInt(250000000), keys[1])
|
||||
if err := pool.addRemote(tx); !errors.Is(err, txpool.ErrUnderpriced) { // Pend K0:0, K0:1, K2:0; Que K1:1
|
||||
if err := pool.addRemoteSync(tx); !errors.Is(err, txpool.ErrUnderpriced) { // Pend K0:0, K0:1, K2:0; Que K1:1
|
||||
t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, txpool.ErrUnderpriced)
|
||||
}
|
||||
|
||||
// Ensure that adding high priced transactions drops cheap ones, but not own
|
||||
tx = pricedTransaction(0, 100000, big.NewInt(260000000), keys[1])
|
||||
if err := pool.addRemote(tx); err != nil { // +K1:0, -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que -
|
||||
if err := pool.addRemoteSync(tx); err != nil { // +K1:0, -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que -
|
||||
t.Fatalf("failed to add well priced transaction: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -2289,40 +2067,18 @@ func TestUnderpricingDynamicFee(t *testing.T) {
|
|||
t.Fatalf("failed to add well priced transaction: %v", err)
|
||||
}
|
||||
pending, queued = pool.Stats()
|
||||
if pending != 2 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
|
||||
if pending != 4 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4)
|
||||
}
|
||||
if queued != 2 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
|
||||
if queued != 0 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
|
||||
}
|
||||
if err := validateEvents(events, 2); err != nil {
|
||||
if err := validateEvents(events, 3); err != nil {
|
||||
t.Fatalf("additional event firing failed: %v", err)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
}
|
||||
// Ensure that adding local transactions can push out even higher priced ones
|
||||
ltx = dynamicFeeTx(1, 100000, big.NewInt(250000000), big.NewInt(250000000), keys[2])
|
||||
if err := pool.addLocal(ltx); err != nil {
|
||||
t.Fatalf("failed to append underpriced local transaction: %v", err)
|
||||
}
|
||||
ltx = dynamicFeeTx(0, 100000, big.NewInt(250000000), big.NewInt(250000000), keys[3])
|
||||
if err := pool.addLocal(ltx); err != nil {
|
||||
t.Fatalf("failed to add new underpriced local transaction: %v", err)
|
||||
}
|
||||
pending, queued = pool.Stats()
|
||||
if pending != 3 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
|
||||
}
|
||||
if queued != 1 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
|
||||
}
|
||||
if err := validateEvents(events, 2); err != nil {
|
||||
t.Fatalf("local event firing failed: %v", err)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests whether highest fee cap transaction is retained after a batch of high effective
|
||||
|
|
@ -2342,7 +2098,7 @@ func TestDualHeapEviction(t *testing.T) {
|
|||
)
|
||||
|
||||
check := func(tx *types.Transaction, name string) {
|
||||
if pool.all.GetRemote(tx.Hash()) == nil {
|
||||
if pool.all.Get(tx.Hash()) == nil {
|
||||
t.Fatalf("highest %s transaction evicted from the pool", name)
|
||||
}
|
||||
}
|
||||
|
|
@ -2639,123 +2395,6 @@ func TestReplacementDynamicFee(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Tests that local transactions are journaled to disk, but remote transactions
|
||||
// get discarded between restarts.
|
||||
func TestJournaling(t *testing.T) { testJournaling(t, false) }
|
||||
|
||||
func TestJournalingNoLocals(t *testing.T) { testJournaling(t, true) }
|
||||
|
||||
func testJournaling(t *testing.T, nolocals bool) {
|
||||
t.Parallel()
|
||||
|
||||
// Create a temporary file for the journal
|
||||
file, err := os.CreateTemp(t.TempDir(), "")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create temporary journal: %v", err)
|
||||
}
|
||||
journal := file.Name()
|
||||
defer os.Remove(journal)
|
||||
|
||||
// Clean up the temporary file, we only need the path for now
|
||||
file.Close()
|
||||
os.Remove(journal)
|
||||
|
||||
// Create the original pool to inject transaction into the journal
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
|
||||
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
|
||||
|
||||
config := testTxPoolConfig
|
||||
config.NoLocals = nolocals
|
||||
config.Journal = journal
|
||||
config.Rejournal = time.Second
|
||||
|
||||
pool := New(config, blockchain)
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
|
||||
// Create two test accounts to ensure remotes expire but locals do not
|
||||
local, _ := crypto.GenerateKey()
|
||||
remote, _ := crypto.GenerateKey()
|
||||
|
||||
testAddBalance(pool, crypto.PubkeyToAddress(local.PublicKey), big.NewInt(100000000000000))
|
||||
testAddBalance(pool, crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(100000000000000))
|
||||
|
||||
// Add three local and a remote transactions and ensure they are queued up
|
||||
if err := pool.addLocal(pricedTransaction(0, 100000, big.NewInt(300000000), local)); err != nil {
|
||||
t.Fatalf("failed to add local transaction: %v", err)
|
||||
}
|
||||
if err := pool.addLocal(pricedTransaction(1, 100000, big.NewInt(300000000), local)); err != nil {
|
||||
t.Fatalf("failed to add local transaction: %v", err)
|
||||
}
|
||||
if err := pool.addLocal(pricedTransaction(2, 100000, big.NewInt(300000000), local)); err != nil {
|
||||
t.Fatalf("failed to add local transaction: %v", err)
|
||||
}
|
||||
if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(300000000), remote)); err != nil {
|
||||
t.Fatalf("failed to add remote transaction: %v", err)
|
||||
}
|
||||
pending, queued := pool.Stats()
|
||||
if pending != 4 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4)
|
||||
}
|
||||
if queued != 0 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
}
|
||||
// Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive
|
||||
pool.Close()
|
||||
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
|
||||
blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
|
||||
|
||||
pool = New(config, blockchain)
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
|
||||
pending, queued = pool.Stats()
|
||||
if queued != 0 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
|
||||
}
|
||||
if nolocals {
|
||||
if pending != 0 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
|
||||
}
|
||||
} else {
|
||||
if pending != 2 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
|
||||
}
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
}
|
||||
// Bump the nonce temporarily and ensure the newly invalidated transaction is removed
|
||||
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
|
||||
<-pool.requestReset(nil, nil)
|
||||
time.Sleep(2 * config.Rejournal)
|
||||
pool.Close()
|
||||
|
||||
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
|
||||
blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
|
||||
pool = New(config, blockchain)
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
|
||||
pending, queued = pool.Stats()
|
||||
if pending != 0 {
|
||||
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
|
||||
}
|
||||
if nolocals {
|
||||
if queued != 0 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
|
||||
}
|
||||
} else {
|
||||
if queued != 1 {
|
||||
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
|
||||
}
|
||||
}
|
||||
if err := validatePoolInternals(pool); err != nil {
|
||||
t.Fatalf("pool internal state corrupted: %v", err)
|
||||
}
|
||||
pool.Close()
|
||||
}
|
||||
|
||||
// TestTransactionStatusCheck tests that the pool can correctly retrieve the
|
||||
// pending status of individual transactions.
|
||||
func TestStatusCheck(t *testing.T) {
|
||||
|
|
@ -3140,7 +2779,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
|
|||
|
||||
for i := 0; i < size; i++ {
|
||||
tx := transaction(uint64(1+i), 100000, key)
|
||||
pool.enqueueTx(tx.Hash(), tx, false, true)
|
||||
pool.enqueueTx(tx.Hash(), tx, true)
|
||||
}
|
||||
// Benchmark the speed of pool validation
|
||||
b.ResetTimer()
|
||||
|
|
@ -3150,15 +2789,11 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
|
|||
}
|
||||
|
||||
// Benchmarks the speed of batched transaction insertion.
|
||||
func BenchmarkBatchInsert100(b *testing.B) { benchmarkBatchInsert(b, 100, false) }
|
||||
func BenchmarkBatchInsert1000(b *testing.B) { benchmarkBatchInsert(b, 1000, false) }
|
||||
func BenchmarkBatchInsert10000(b *testing.B) { benchmarkBatchInsert(b, 10000, false) }
|
||||
func BenchmarkBatchInsert100(b *testing.B) { benchmarkBatchInsert(b, 100) }
|
||||
func BenchmarkBatchInsert1000(b *testing.B) { benchmarkBatchInsert(b, 1000) }
|
||||
func BenchmarkBatchInsert10000(b *testing.B) { benchmarkBatchInsert(b, 10000) }
|
||||
|
||||
func BenchmarkBatchLocalInsert100(b *testing.B) { benchmarkBatchInsert(b, 100, true) }
|
||||
func BenchmarkBatchLocalInsert1000(b *testing.B) { benchmarkBatchInsert(b, 1000, true) }
|
||||
func BenchmarkBatchLocalInsert10000(b *testing.B) { benchmarkBatchInsert(b, 10000, true) }
|
||||
|
||||
func benchmarkBatchInsert(b *testing.B, size int, local bool) {
|
||||
func benchmarkBatchInsert(b *testing.B, size int) {
|
||||
// Generate a batch of transactions to enqueue into the pool
|
||||
pool, key := setupPool()
|
||||
defer pool.Close()
|
||||
|
|
@ -3176,46 +2811,7 @@ func benchmarkBatchInsert(b *testing.B, size int, local bool) {
|
|||
// Benchmark importing the transactions into the queue
|
||||
b.ResetTimer()
|
||||
for _, batch := range batches {
|
||||
if local {
|
||||
pool.addLocals(batch)
|
||||
} else {
|
||||
pool.AddRemotes(batch)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkInsertRemoteWithAllLocals(b *testing.B) {
|
||||
// Allocate keys for testing
|
||||
key, _ := crypto.GenerateKey()
|
||||
account := crypto.PubkeyToAddress(key.PublicKey)
|
||||
|
||||
remoteKey, _ := crypto.GenerateKey()
|
||||
remoteAddr := crypto.PubkeyToAddress(remoteKey.PublicKey)
|
||||
|
||||
locals := make([]*types.Transaction, 4096+1024) // Occupy all slots
|
||||
for i := 0; i < len(locals); i++ {
|
||||
locals[i] = transaction(uint64(i), 100000, key)
|
||||
}
|
||||
remotes := make([]*types.Transaction, 1000)
|
||||
for i := 0; i < len(remotes); i++ {
|
||||
remotes[i] = pricedTransaction(uint64(i), 100000, big.NewInt(2), remoteKey) // Higher gasprice
|
||||
}
|
||||
// Benchmark importing the transactions into the queue
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
pool, _ := setupPool()
|
||||
testAddBalance(pool, account, big.NewInt(100000000))
|
||||
for _, local := range locals {
|
||||
pool.addLocal(local)
|
||||
}
|
||||
b.StartTimer()
|
||||
// Assign a high enough balance for testing
|
||||
testAddBalance(pool, remoteAddr, big.NewInt(100000000))
|
||||
for i := 0; i < len(remotes); i++ {
|
||||
pool.AddRemotes([]*types.Transaction{remotes[i]})
|
||||
}
|
||||
pool.Close()
|
||||
pool.AddRemotes(batch)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -3350,8 +2946,9 @@ func TestPendingKeepsLocalAndSpecialTransactions(t *testing.T) {
|
|||
localKey, _ := crypto.GenerateKey()
|
||||
localAddr := crypto.PubkeyToAddress(localKey.PublicKey)
|
||||
testAddBalance(pool, localAddr, big.NewInt(1_000_000_000_000_000))
|
||||
if err := pool.addLocal(pricedTransaction(0, 100000, new(big.Int).Add(new(big.Int).Set(minGasTip), big.NewInt(1)), localKey)); err != nil {
|
||||
t.Fatalf("failed to add local tx: %v", err)
|
||||
localTx := pricedTransaction(0, 100000, new(big.Int).Add(new(big.Int).Set(filterTip), big.NewInt(1)), localKey)
|
||||
if err := pool.addRemoteSync(localTx); err != nil {
|
||||
t.Fatalf("failed to add local account tx: %v", err)
|
||||
}
|
||||
|
||||
filtered := pool.Pending(txpool.PendingFilter{MinTip: uint256.MustFromBig(filterTip)})
|
||||
|
|
|
|||
|
|
@ -52,31 +52,31 @@ func (h *nonceHeap) Pop() interface{} {
|
|||
return x
|
||||
}
|
||||
|
||||
// sortedMap is a nonce->transaction hash map with a heap based index to allow
|
||||
// SortedMap is a nonce->transaction hash map with a heap based index to allow
|
||||
// iterating over the contents in a nonce-incrementing way.
|
||||
type sortedMap struct {
|
||||
type SortedMap struct {
|
||||
items map[uint64]*types.Transaction // Hash map storing the transaction data
|
||||
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
|
||||
cache types.Transactions // Cache of the transactions already sorted
|
||||
cacheMu sync.Mutex // Mutex covering the cache
|
||||
}
|
||||
|
||||
// newSortedMap creates a new nonce-sorted transaction map.
|
||||
func newSortedMap() *sortedMap {
|
||||
return &sortedMap{
|
||||
// NewSortedMap creates a new nonce-sorted transaction map.
|
||||
func NewSortedMap() *SortedMap {
|
||||
return &SortedMap{
|
||||
items: make(map[uint64]*types.Transaction),
|
||||
index: new(nonceHeap),
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves the current transactions associated with the given nonce.
|
||||
func (m *sortedMap) Get(nonce uint64) *types.Transaction {
|
||||
func (m *SortedMap) Get(nonce uint64) *types.Transaction {
|
||||
return m.items[nonce]
|
||||
}
|
||||
|
||||
// Put inserts a new transaction into the map, also updating the map's nonce
|
||||
// index. If a transaction already exists with the same nonce, it's overwritten.
|
||||
func (m *sortedMap) Put(tx *types.Transaction) {
|
||||
func (m *SortedMap) Put(tx *types.Transaction) {
|
||||
nonce := tx.Nonce()
|
||||
if m.items[nonce] == nil {
|
||||
heap.Push(m.index, nonce)
|
||||
|
|
@ -89,7 +89,7 @@ func (m *sortedMap) Put(tx *types.Transaction) {
|
|||
// Forward removes all transactions from the map with a nonce lower than the
|
||||
// provided threshold. Every removed transaction is returned for any post-removal
|
||||
// maintenance.
|
||||
func (m *sortedMap) Forward(threshold uint64) types.Transactions {
|
||||
func (m *SortedMap) Forward(threshold uint64) types.Transactions {
|
||||
var removed types.Transactions
|
||||
|
||||
// Pop off heap items until the threshold is reached
|
||||
|
|
@ -112,7 +112,7 @@ func (m *sortedMap) Forward(threshold uint64) types.Transactions {
|
|||
// Filter, as opposed to 'filter', re-initialises the heap after the operation is done.
|
||||
// If you want to do several consecutive filterings, it's therefore better to first
|
||||
// do a .filter(func1) followed by .Filter(func2) or reheap()
|
||||
func (m *sortedMap) Filter(filterFunc func(*types.Transaction) bool) types.Transactions {
|
||||
func (m *SortedMap) Filter(filterFunc func(*types.Transaction) bool) types.Transactions {
|
||||
removed := m.filter(filterFunc)
|
||||
// If transactions were removed, the heap and cache are ruined
|
||||
if len(removed) > 0 {
|
||||
|
|
@ -121,7 +121,7 @@ func (m *sortedMap) Filter(filterFunc func(*types.Transaction) bool) types.Trans
|
|||
return removed
|
||||
}
|
||||
|
||||
func (m *sortedMap) reheap() {
|
||||
func (m *SortedMap) reheap() {
|
||||
*m.index = make([]uint64, 0, len(m.items))
|
||||
for nonce := range m.items {
|
||||
*m.index = append(*m.index, nonce)
|
||||
|
|
@ -134,7 +134,7 @@ func (m *sortedMap) reheap() {
|
|||
|
||||
// filter is identical to Filter, but **does not** regenerate the heap. This method
|
||||
// should only be used if followed immediately by a call to Filter or reheap()
|
||||
func (m *sortedMap) filter(filterFunc func(*types.Transaction) bool) types.Transactions {
|
||||
func (m *SortedMap) filter(filterFunc func(*types.Transaction) bool) types.Transactions {
|
||||
var removed types.Transactions
|
||||
|
||||
// Collect all the transactions to filter out
|
||||
|
|
@ -154,7 +154,7 @@ func (m *sortedMap) filter(filterFunc func(*types.Transaction) bool) types.Trans
|
|||
|
||||
// Cap places a hard limit on the number of items, returning all transactions
|
||||
// exceeding that limit.
|
||||
func (m *sortedMap) Cap(threshold int) types.Transactions {
|
||||
func (m *SortedMap) Cap(threshold int) types.Transactions {
|
||||
// Short circuit if the number of items is under the limit
|
||||
if len(m.items) <= threshold {
|
||||
return nil
|
||||
|
|
@ -182,7 +182,7 @@ func (m *sortedMap) Cap(threshold int) types.Transactions {
|
|||
|
||||
// Remove deletes a transaction from the maintained map, returning whether the
|
||||
// transaction was found.
|
||||
func (m *sortedMap) Remove(nonce uint64) bool {
|
||||
func (m *SortedMap) Remove(nonce uint64) bool {
|
||||
// Short circuit if no transaction is present
|
||||
_, ok := m.items[nonce]
|
||||
if !ok {
|
||||
|
|
@ -210,7 +210,7 @@ func (m *sortedMap) Remove(nonce uint64) bool {
|
|||
// Note, all transactions with nonces lower than start will also be returned to
|
||||
// prevent getting into an invalid state. This is not something that should ever
|
||||
// happen but better to be self correcting than failing!
|
||||
func (m *sortedMap) Ready(start uint64) types.Transactions {
|
||||
func (m *SortedMap) Ready(start uint64) types.Transactions {
|
||||
// Short circuit if no transactions are available
|
||||
if m.index.Len() == 0 || (*m.index)[0] > start {
|
||||
return nil
|
||||
|
|
@ -230,11 +230,11 @@ func (m *sortedMap) Ready(start uint64) types.Transactions {
|
|||
}
|
||||
|
||||
// Len returns the length of the transaction map.
|
||||
func (m *sortedMap) Len() int {
|
||||
func (m *SortedMap) Len() int {
|
||||
return len(m.items)
|
||||
}
|
||||
|
||||
func (m *sortedMap) flatten() types.Transactions {
|
||||
func (m *SortedMap) flatten() types.Transactions {
|
||||
m.cacheMu.Lock()
|
||||
defer m.cacheMu.Unlock()
|
||||
// If the sorting was not cached yet, create and cache it
|
||||
|
|
@ -251,7 +251,7 @@ func (m *sortedMap) flatten() types.Transactions {
|
|||
// Flatten creates a nonce-sorted slice of transactions based on the loosely
|
||||
// sorted internal representation. The result of the sorting is cached in case
|
||||
// it's requested again before any modifications are made to the contents.
|
||||
func (m *sortedMap) Flatten() types.Transactions {
|
||||
func (m *SortedMap) Flatten() types.Transactions {
|
||||
cache := m.flatten()
|
||||
// Copy the cache to prevent accidental modification
|
||||
txs := make(types.Transactions, len(cache))
|
||||
|
|
@ -261,7 +261,7 @@ func (m *sortedMap) Flatten() types.Transactions {
|
|||
|
||||
// LastElement returns the last element of a flattened list, thus, the
|
||||
// transaction with the highest nonce
|
||||
func (m *sortedMap) LastElement() *types.Transaction {
|
||||
func (m *SortedMap) LastElement() *types.Transaction {
|
||||
cache := m.flatten()
|
||||
return cache[len(cache)-1]
|
||||
}
|
||||
|
|
@ -272,7 +272,7 @@ func (m *sortedMap) LastElement() *types.Transaction {
|
|||
// executable/future queue, with minor behavioral changes.
|
||||
type list struct {
|
||||
strict bool // Whether nonces are strictly continuous or not
|
||||
txs *sortedMap // Heap indexed sorted hash map of the transactions
|
||||
txs *SortedMap // Heap indexed sorted hash map of the transactions
|
||||
|
||||
costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance)
|
||||
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
|
||||
|
|
@ -284,7 +284,7 @@ type list struct {
|
|||
func newList(strict bool) *list {
|
||||
return &list{
|
||||
strict: strict,
|
||||
txs: newSortedMap(),
|
||||
txs: NewSortedMap(),
|
||||
costcap: new(big.Int),
|
||||
totalcost: new(uint256.Int),
|
||||
}
|
||||
|
|
@ -573,10 +573,7 @@ func newPricedList(all *lookup) *pricedList {
|
|||
}
|
||||
|
||||
// Put inserts a new transaction into the heap.
|
||||
func (l *pricedList) Put(tx *types.Transaction, local bool) {
|
||||
if local {
|
||||
return
|
||||
}
|
||||
func (l *pricedList) Put(tx *types.Transaction) {
|
||||
// Insert every new transaction to the urgent heap first; Discard will balance the heaps
|
||||
heap.Push(&l.urgent, tx)
|
||||
}
|
||||
|
|
@ -610,7 +607,7 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool {
|
|||
// Discard stale price points if found at the heap start
|
||||
for len(h.list) > 0 {
|
||||
head := h.list[0]
|
||||
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
|
||||
if l.all.Get(head.Hash()) == nil { // Removed or migrated
|
||||
l.stales.Add(-1)
|
||||
heap.Pop(h)
|
||||
continue
|
||||
|
|
@ -629,15 +626,13 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool {
|
|||
// Discard finds a number of most underpriced transactions, removes them from the
|
||||
// priced list and returns them for further removal from the entire pool.
|
||||
// If noPending is set to true, we will only consider the floating list
|
||||
//
|
||||
// Note local transaction won't be considered for eviction.
|
||||
func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
|
||||
func (l *pricedList) Discard(slots int) (types.Transactions, bool) {
|
||||
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
|
||||
for slots > 0 {
|
||||
if len(l.urgent.list)*floatingRatio > len(l.floating.list)*urgentRatio {
|
||||
// Discard stale transactions if found during cleanup
|
||||
tx := heap.Pop(&l.urgent).(*types.Transaction)
|
||||
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
|
||||
if l.all.Get(tx.Hash()) == nil { // Removed or migrated
|
||||
l.stales.Add(-1)
|
||||
continue
|
||||
}
|
||||
|
|
@ -650,7 +645,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
|
|||
}
|
||||
// Discard stale transactions if found during cleanup
|
||||
tx := heap.Pop(&l.floating).(*types.Transaction)
|
||||
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
|
||||
if l.all.Get(tx.Hash()) == nil { // Removed or migrated
|
||||
l.stales.Add(-1)
|
||||
continue
|
||||
}
|
||||
|
|
@ -660,7 +655,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
|
|||
}
|
||||
}
|
||||
// If we still can't make enough room for the new transaction
|
||||
if slots > 0 && !force {
|
||||
if slots > 0 {
|
||||
for _, tx := range drop {
|
||||
heap.Push(&l.urgent, tx)
|
||||
}
|
||||
|
|
@ -675,11 +670,11 @@ func (l *pricedList) Reheap() {
|
|||
defer l.reheapMu.Unlock()
|
||||
start := time.Now()
|
||||
l.stales.Store(0)
|
||||
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
|
||||
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
|
||||
l.urgent.list = make([]*types.Transaction, 0, l.all.Count())
|
||||
l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
|
||||
l.urgent.list = append(l.urgent.list, tx)
|
||||
return true
|
||||
}, false, true) // Only iterate remotes
|
||||
})
|
||||
heap.Init(&l.urgent)
|
||||
|
||||
// balance out the two heaps by moving the worse half of transactions into the
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@
|
|||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package legacypool
|
||||
package locals
|
||||
|
||||
import (
|
||||
"errors"
|
||||
212
core/txpool/locals/tx_tracker.go
Normal file
212
core/txpool/locals/tx_tracker.go
Normal file
|
|
@ -0,0 +1,212 @@
|
|||
// Copyright 2023 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// Package locals implements tracking for "local" transactions
|
||||
package locals
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/common"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/txpool"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/txpool/legacypool"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/types"
|
||||
"github.com/XinFinOrg/XDPoSChain/log"
|
||||
"github.com/XinFinOrg/XDPoSChain/metrics"
|
||||
"github.com/XinFinOrg/XDPoSChain/params"
|
||||
)
|
||||
|
||||
var (
|
||||
recheckInterval = time.Minute
|
||||
localGauge = metrics.GetOrRegisterGauge("txpool/local", nil)
|
||||
)
|
||||
|
||||
// TxTracker is a struct used to track priority transactions; it will check from
|
||||
// time to time if the main pool has forgotten about any of the transaction
|
||||
// it is tracking, and if so, submit it again.
|
||||
// This is used to track 'locals'.
|
||||
// This struct does not care about transaction validity, price-bumps or account limits,
|
||||
// but optimistically accepts transactions.
|
||||
type TxTracker struct {
|
||||
all map[common.Hash]*types.Transaction // All tracked transactions
|
||||
byAddr map[common.Address]*legacypool.SortedMap // Transactions by address
|
||||
|
||||
journal *journal // Journal of local transaction to back up to disk
|
||||
rejournal time.Duration // How often to rotate journal
|
||||
pool *txpool.TxPool // The tx pool to interact with
|
||||
signer types.Signer
|
||||
|
||||
shutdownCh chan struct{}
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// New creates a new TxTracker
|
||||
func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker {
|
||||
pool := &TxTracker{
|
||||
all: make(map[common.Hash]*types.Transaction),
|
||||
byAddr: make(map[common.Address]*legacypool.SortedMap),
|
||||
signer: types.LatestSigner(chainConfig),
|
||||
shutdownCh: make(chan struct{}),
|
||||
pool: next,
|
||||
}
|
||||
if journalPath != "" {
|
||||
pool.journal = newTxJournal(journalPath)
|
||||
pool.rejournal = journalTime
|
||||
}
|
||||
return pool
|
||||
}
|
||||
|
||||
// Track adds a transaction to the tracked set.
|
||||
func (tracker *TxTracker) Track(tx *types.Transaction) {
|
||||
tracker.TrackAll([]*types.Transaction{tx})
|
||||
}
|
||||
|
||||
// TrackAll adds a list of transactions to the tracked set.
|
||||
func (tracker *TxTracker) TrackAll(txs []*types.Transaction) {
|
||||
tracker.mu.Lock()
|
||||
defer tracker.mu.Unlock()
|
||||
|
||||
for _, tx := range txs {
|
||||
// If we're already tracking it, it's a no-op
|
||||
if _, ok := tracker.all[tx.Hash()]; ok {
|
||||
continue
|
||||
}
|
||||
// Theoretically, checking the error here is unnecessary since sender recovery
|
||||
// is already part of basic validation. However, retrieving the sender address
|
||||
// from the transaction cache is effectively a no-op if it was previously verified.
|
||||
// Therefore, the error is still checked just in case.
|
||||
addr, err := types.Sender(tracker.signer, tx)
|
||||
if err != nil { // Ignore this tx
|
||||
continue
|
||||
}
|
||||
tracker.all[tx.Hash()] = tx
|
||||
if tracker.byAddr[addr] == nil {
|
||||
tracker.byAddr[addr] = legacypool.NewSortedMap()
|
||||
}
|
||||
tracker.byAddr[addr].Put(tx)
|
||||
|
||||
if tracker.journal != nil {
|
||||
_ = tracker.journal.insert(tx)
|
||||
}
|
||||
}
|
||||
localGauge.Update(int64(len(tracker.all)))
|
||||
}
|
||||
|
||||
// recheck checks and returns any transactions that needs to be resubmitted.
|
||||
func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) {
|
||||
tracker.mu.Lock()
|
||||
defer tracker.mu.Unlock()
|
||||
|
||||
var (
|
||||
numStales = 0
|
||||
numOk = 0
|
||||
)
|
||||
for sender, txs := range tracker.byAddr {
|
||||
// Wipe the stales
|
||||
stales := txs.Forward(tracker.pool.Nonce(sender))
|
||||
for _, tx := range stales {
|
||||
delete(tracker.all, tx.Hash())
|
||||
}
|
||||
numStales += len(stales)
|
||||
|
||||
// Check the non-stale
|
||||
for _, tx := range txs.Flatten() {
|
||||
if tracker.pool.Has(tx.Hash()) {
|
||||
numOk++
|
||||
continue
|
||||
}
|
||||
resubmits = append(resubmits, tx)
|
||||
}
|
||||
}
|
||||
|
||||
if journalCheck { // rejournal
|
||||
rejournal = make(map[common.Address]types.Transactions)
|
||||
for _, tx := range tracker.all {
|
||||
addr, _ := types.Sender(tracker.signer, tx)
|
||||
rejournal[addr] = append(rejournal[addr], tx)
|
||||
}
|
||||
// Sort them
|
||||
for _, list := range rejournal {
|
||||
// cmp(a, b) should return a negative number when a < b,
|
||||
slices.SortFunc(list, func(a, b *types.Transaction) int {
|
||||
return cmp.Compare(a.Nonce(), b.Nonce())
|
||||
})
|
||||
}
|
||||
}
|
||||
localGauge.Update(int64(len(tracker.all)))
|
||||
log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", numStales, "ok", numOk)
|
||||
return resubmits, rejournal
|
||||
}
|
||||
|
||||
// Start implements node.Lifecycle interface
|
||||
// Start is called after all services have been constructed and the networking
|
||||
// layer was also initialized to spawn any goroutines required by the service.
|
||||
func (tracker *TxTracker) Start() error {
|
||||
tracker.wg.Add(1)
|
||||
go tracker.loop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop implements node.Lifecycle interface
|
||||
// Stop terminates all goroutines belonging to the service, blocking until they
|
||||
// are all terminated.
|
||||
func (tracker *TxTracker) Stop() error {
|
||||
close(tracker.shutdownCh)
|
||||
tracker.wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tracker *TxTracker) loop() {
|
||||
defer tracker.wg.Done()
|
||||
|
||||
if tracker.journal != nil {
|
||||
tracker.journal.load(func(transactions []*types.Transaction) []error {
|
||||
tracker.TrackAll(transactions)
|
||||
return nil
|
||||
})
|
||||
defer tracker.journal.close()
|
||||
}
|
||||
var (
|
||||
lastJournal = time.Now()
|
||||
timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom.
|
||||
)
|
||||
for {
|
||||
select {
|
||||
case <-tracker.shutdownCh:
|
||||
return
|
||||
case <-timer.C:
|
||||
checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal
|
||||
resubmits, rejournal := tracker.recheck(checkJournal)
|
||||
if len(resubmits) > 0 {
|
||||
tracker.pool.Add(resubmits, false)
|
||||
}
|
||||
if checkJournal {
|
||||
// Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts
|
||||
tracker.mu.Lock()
|
||||
lastJournal = time.Now()
|
||||
if err := tracker.journal.rotate(rejournal); err != nil {
|
||||
log.Warn("Transaction journal rotation failed", "err", err)
|
||||
}
|
||||
tracker.mu.Unlock()
|
||||
}
|
||||
timer.Reset(recheckInterval)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -117,7 +117,7 @@ type SubPool interface {
|
|||
// Add enqueues a batch of transactions into the pool if they are valid. Due
|
||||
// to the large transaction churn, add may postpone fully integrating the tx
|
||||
// to a later point to batch multiple ones together.
|
||||
Add(txs []*types.Transaction, local bool, sync bool) []error
|
||||
Add(txs []*types.Transaction, sync bool) []error
|
||||
|
||||
// Pending retrieves all currently processable transactions, grouped by origin
|
||||
// account and sorted by nonce.
|
||||
|
|
@ -147,9 +147,6 @@ type SubPool interface {
|
|||
// pending as well as queued transactions of this address, grouped by nonce.
|
||||
ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction)
|
||||
|
||||
// Locals retrieves the accounts currently considered local by the pool.
|
||||
Locals() []common.Address
|
||||
|
||||
// Status returns the known status (unknown/pending/queued) of a transaction
|
||||
// identified by their hashes.
|
||||
Status(hash common.Hash) TxStatus
|
||||
|
|
|
|||
|
|
@ -56,6 +56,8 @@ type BlockChain interface {
|
|||
type TxPool struct {
|
||||
subpools []SubPool // List of subpools for specialized transaction handling
|
||||
|
||||
localTracker LocalTracker // Optional tracker for local tx submissions
|
||||
|
||||
subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown
|
||||
quit chan chan error // Quit channel to tear down the head updater
|
||||
term chan struct{} // Termination channel to detect a closed pool
|
||||
|
|
@ -63,6 +65,13 @@ type TxPool struct {
|
|||
sync chan chan error // Testing / simulator channel to block until internal reset is done
|
||||
}
|
||||
|
||||
// LocalTracker is the minimal local transaction tracking functionality used by
|
||||
// TxPool local submission helpers.
|
||||
type LocalTracker interface {
|
||||
Track(tx *types.Transaction)
|
||||
TrackAll(txs []*types.Transaction)
|
||||
}
|
||||
|
||||
// New creates a new transaction pool to gather, sort and filter inbound
|
||||
// transactions from the network.
|
||||
func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
|
||||
|
|
@ -268,7 +277,7 @@ func (p *TxPool) Get(hash common.Hash) *types.Transaction {
|
|||
// Add enqueues a batch of transactions into the pool if they are valid. Due
|
||||
// to the large transaction churn, add may postpone fully integrating the tx
|
||||
// to a later point to batch multiple ones together.
|
||||
func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
|
||||
func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error {
|
||||
// Split the input transactions between the subpools. It shouldn't really
|
||||
// happen that we receive merged batches, but better graceful than strange
|
||||
// errors.
|
||||
|
|
@ -295,7 +304,7 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
|
|||
// back the errors into the original sort order.
|
||||
errsets := make([][]error, len(p.subpools))
|
||||
for i := 0; i < len(p.subpools); i++ {
|
||||
errsets[i] = p.subpools[i].Add(txsets[i], local, sync)
|
||||
errsets[i] = p.subpools[i].Add(txsets[i], sync)
|
||||
}
|
||||
errs := make([]error, len(txs))
|
||||
for i, split := range splits {
|
||||
|
|
@ -311,6 +320,26 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
|
|||
return errs
|
||||
}
|
||||
|
||||
// SetLocalTracker configures an optional tracker that will receive all local
|
||||
// transaction submissions.
|
||||
func (p *TxPool) SetLocalTracker(tracker LocalTracker) {
|
||||
p.localTracker = tracker
|
||||
}
|
||||
|
||||
// AddLocals enqueues a batch of local transactions into the pool if they are
|
||||
// valid and tracks them for re-journal/re-submit flows.
|
||||
func (p *TxPool) AddLocals(txs []*types.Transaction, sync bool) []error {
|
||||
if p.localTracker != nil {
|
||||
p.localTracker.TrackAll(txs)
|
||||
}
|
||||
return p.Add(txs, sync)
|
||||
}
|
||||
|
||||
// AddLocal enqueues a single local transaction into the pool if it is valid.
|
||||
func (p *TxPool) AddLocal(tx *types.Transaction, sync bool) error {
|
||||
return p.AddLocals([]*types.Transaction{tx}, sync)[0]
|
||||
}
|
||||
|
||||
// Pending retrieves all currently processable transactions, grouped by origin
|
||||
// account and sorted by nonce.
|
||||
//
|
||||
|
|
@ -394,23 +423,6 @@ func (p *TxPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*type
|
|||
return []*types.Transaction{}, []*types.Transaction{}
|
||||
}
|
||||
|
||||
// Locals retrieves the accounts currently considered local by the pool.
|
||||
func (p *TxPool) Locals() []common.Address {
|
||||
// Retrieve the locals from each subpool and deduplicate them
|
||||
locals := make(map[common.Address]struct{})
|
||||
for _, subpool := range p.subpools {
|
||||
for _, local := range subpool.Locals() {
|
||||
locals[local] = struct{}{}
|
||||
}
|
||||
}
|
||||
// Flatten and return the deduplicated local set
|
||||
flat := make([]common.Address, 0, len(locals))
|
||||
for local := range locals {
|
||||
flat = append(flat, local)
|
||||
}
|
||||
return flat
|
||||
}
|
||||
|
||||
// Status returns the known status (unknown/pending/queued) of a transaction
|
||||
// identified by its hashes.
|
||||
func (p *TxPool) Status(hash common.Hash) TxStatus {
|
||||
|
|
|
|||
183
core/txpool/txpool_local_test.go
Normal file
183
core/txpool/txpool_local_test.go
Normal file
|
|
@ -0,0 +1,183 @@
|
|||
package txpool
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/common"
|
||||
"github.com/XinFinOrg/XDPoSChain/core"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/types"
|
||||
"github.com/XinFinOrg/XDPoSChain/event"
|
||||
)
|
||||
|
||||
type testChain struct{}
|
||||
|
||||
func (testChain) CurrentBlock() *types.Header { return &types.Header{Number: big.NewInt(0)} }
|
||||
|
||||
func (testChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
|
||||
return event.NewSubscription(func(quit <-chan struct{}) error {
|
||||
<-quit
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
type testLocalTracker struct {
|
||||
mu sync.Mutex
|
||||
events *[]string
|
||||
tracked []common.Hash
|
||||
}
|
||||
|
||||
func (t *testLocalTracker) Track(tx *types.Transaction) {
|
||||
t.TrackAll([]*types.Transaction{tx})
|
||||
}
|
||||
|
||||
func (t *testLocalTracker) TrackAll(txs []*types.Transaction) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
*t.events = append(*t.events, "track")
|
||||
for _, tx := range txs {
|
||||
t.tracked = append(t.tracked, tx.Hash())
|
||||
}
|
||||
}
|
||||
|
||||
type testSubPool struct {
|
||||
events *[]string
|
||||
|
||||
lastAdd []*types.Transaction
|
||||
lastSync bool
|
||||
}
|
||||
|
||||
func (s *testSubPool) Filter(tx *types.Transaction) bool { return true }
|
||||
|
||||
func (s *testSubPool) Init(gasTip uint64, head *types.Header, reserver *Reserver) error { return nil }
|
||||
|
||||
func (s *testSubPool) Close() error { return nil }
|
||||
|
||||
func (s *testSubPool) Reset(oldHead, newHead *types.Header) {}
|
||||
|
||||
func (s *testSubPool) SetGasTip(tip *big.Int) error { return nil }
|
||||
|
||||
func (s *testSubPool) Has(hash common.Hash) bool { return false }
|
||||
|
||||
func (s *testSubPool) Get(hash common.Hash) *types.Transaction { return nil }
|
||||
|
||||
func (s *testSubPool) Add(txs []*types.Transaction, sync bool) []error {
|
||||
*s.events = append(*s.events, "add")
|
||||
s.lastAdd = txs
|
||||
s.lastSync = sync
|
||||
return make([]error, len(txs))
|
||||
}
|
||||
|
||||
func (s *testSubPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransaction {
|
||||
return map[common.Address][]*LazyTransaction{}
|
||||
}
|
||||
|
||||
func (s *testSubPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
|
||||
return event.NewSubscription(func(quit <-chan struct{}) error {
|
||||
<-quit
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *testSubPool) Nonce(addr common.Address) uint64 { return 0 }
|
||||
|
||||
func (s *testSubPool) Stats() (int, int) { return 0, 0 }
|
||||
|
||||
func (s *testSubPool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
|
||||
return map[common.Address][]*types.Transaction{}, map[common.Address][]*types.Transaction{}
|
||||
}
|
||||
|
||||
func (s *testSubPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *testSubPool) Status(hash common.Hash) TxStatus { return TxStatusUnknown }
|
||||
|
||||
func (s *testSubPool) SetSigner(f func(address common.Address) bool) {}
|
||||
|
||||
func (s *testSubPool) IsSigner(addr common.Address) bool { return false }
|
||||
|
||||
func TestAddLocalTracksBeforeAdd(t *testing.T) {
|
||||
events := []string{}
|
||||
tracker := &testLocalTracker{events: &events}
|
||||
subpool := &testSubPool{events: &events}
|
||||
|
||||
pool, err := New(0, testChain{}, []SubPool{subpool})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create txpool: %v", err)
|
||||
}
|
||||
defer pool.Close()
|
||||
|
||||
pool.SetLocalTracker(tracker)
|
||||
|
||||
tx := types.NewTransaction(0, common.Address{0x1}, big.NewInt(1), 21000, big.NewInt(1), nil)
|
||||
if err := pool.AddLocal(tx, true); err != nil {
|
||||
t.Fatalf("AddLocal failed: %v", err)
|
||||
}
|
||||
|
||||
if len(tracker.tracked) != 1 || tracker.tracked[0] != tx.Hash() {
|
||||
t.Fatalf("tracker did not receive local tx hash")
|
||||
}
|
||||
if len(subpool.lastAdd) != 1 || subpool.lastAdd[0].Hash() != tx.Hash() {
|
||||
t.Fatalf("subpool Add did not receive local tx")
|
||||
}
|
||||
if !subpool.lastSync {
|
||||
t.Fatalf("sync flag not propagated to subpool Add")
|
||||
}
|
||||
if !reflect.DeepEqual(events, []string{"track", "add"}) {
|
||||
t.Fatalf("unexpected call order: have %v", events)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddLocalsTracksBeforeAdd(t *testing.T) {
|
||||
events := []string{}
|
||||
tracker := &testLocalTracker{events: &events}
|
||||
subpool := &testSubPool{events: &events}
|
||||
|
||||
pool, err := New(0, testChain{}, []SubPool{subpool})
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create txpool: %v", err)
|
||||
}
|
||||
defer pool.Close()
|
||||
|
||||
pool.SetLocalTracker(tracker)
|
||||
|
||||
tx0 := types.NewTransaction(0, common.Address{0x1}, big.NewInt(1), 21000, big.NewInt(1), nil)
|
||||
tx1 := types.NewTransaction(1, common.Address{0x1}, big.NewInt(1), 21000, big.NewInt(1), nil)
|
||||
txs := []*types.Transaction{tx0, tx1}
|
||||
|
||||
errs := pool.AddLocals(txs, true)
|
||||
if len(errs) != len(txs) {
|
||||
t.Fatalf("unexpected error result length: have %d, want %d", len(errs), len(txs))
|
||||
}
|
||||
for i, err := range errs {
|
||||
if err != nil {
|
||||
t.Fatalf("AddLocals error at index %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
hashes := []common.Hash{tx0.Hash(), tx1.Hash()}
|
||||
if len(tracker.tracked) != len(hashes) {
|
||||
t.Fatalf("tracker tx count mismatch: have %d, want %d", len(tracker.tracked), len(hashes))
|
||||
}
|
||||
if !reflect.DeepEqual(tracker.tracked, hashes) {
|
||||
t.Fatalf("tracker hashes mismatch: have %v, want %v", tracker.tracked, hashes)
|
||||
}
|
||||
|
||||
if len(subpool.lastAdd) != len(hashes) {
|
||||
t.Fatalf("subpool Add tx count mismatch: have %d, want %d", len(subpool.lastAdd), len(hashes))
|
||||
}
|
||||
for i, tx := range subpool.lastAdd {
|
||||
if tx.Hash() != hashes[i] {
|
||||
t.Fatalf("subpool Add hash mismatch at index %d", i)
|
||||
}
|
||||
}
|
||||
if !subpool.lastSync {
|
||||
t.Fatalf("sync flag not propagated to subpool Add")
|
||||
}
|
||||
if !reflect.DeepEqual(events, []string{"track", "add"}) {
|
||||
t.Fatalf("unexpected call order: have %v", events)
|
||||
}
|
||||
}
|
||||
|
|
@ -301,7 +301,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
|
|||
}
|
||||
|
||||
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
|
||||
return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0]
|
||||
if locals := b.eth.localTxTracker; locals != nil {
|
||||
locals.Track(signedTx)
|
||||
}
|
||||
return b.eth.txPool.Add([]*types.Transaction{signedTx}, false)[0]
|
||||
}
|
||||
|
||||
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import (
|
|||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/XDCx"
|
||||
"github.com/XinFinOrg/XDPoSChain/XDCxlending"
|
||||
|
|
@ -41,6 +42,7 @@ import (
|
|||
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/txpool"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/txpool/legacypool"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/txpool/locals"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/types"
|
||||
"github.com/XinFinOrg/XDPoSChain/core/vm"
|
||||
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
|
||||
|
|
@ -65,17 +67,17 @@ import (
|
|||
|
||||
// Ethereum implements the Ethereum full node service.
|
||||
type Ethereum struct {
|
||||
config *ethconfig.Config
|
||||
// core protocol objects
|
||||
config *ethconfig.Config
|
||||
txPool *txpool.TxPool
|
||||
localTxTracker *locals.TxTracker
|
||||
blockchain *core.BlockChain
|
||||
|
||||
// Channel for shutting down the service
|
||||
shutdownChan chan bool // Channel for shutting down the ethereum
|
||||
|
||||
// Handlers
|
||||
txPool *txpool.TxPool
|
||||
|
||||
orderPool *legacypool.OrderPool
|
||||
lendingPool *legacypool.LendingPool
|
||||
blockchain *core.BlockChain
|
||||
protocolManager *ProtocolManager
|
||||
|
||||
// DB interfaces
|
||||
|
|
@ -267,14 +269,24 @@ func New(stack *node.Node, config *ethconfig.Config, XDCXServ *XDCx.XDCX, lendin
|
|||
config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
|
||||
}
|
||||
legacyPool := legacypool.New(config.TxPool, eth.blockchain)
|
||||
eth.orderPool = legacypool.NewOrderPool(eth.blockchain.Config(), eth.blockchain)
|
||||
eth.lendingPool = legacypool.NewLendingPool(eth.blockchain.Config(), eth.blockchain)
|
||||
|
||||
eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
eth.orderPool = legacypool.NewOrderPool(eth.blockchain.Config(), eth.blockchain)
|
||||
eth.lendingPool = legacypool.NewLendingPool(eth.blockchain.Config(), eth.blockchain)
|
||||
if !config.TxPool.NoLocals {
|
||||
rejournal := config.TxPool.Rejournal
|
||||
if rejournal < time.Second {
|
||||
log.Warn("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second)
|
||||
rejournal = time.Second
|
||||
}
|
||||
eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool)
|
||||
eth.txPool.SetLocalTracker(eth.localTxTracker)
|
||||
stack.RegisterLifecycle(eth.localTxTracker)
|
||||
}
|
||||
|
||||
if eth.protocolManager, err = NewProtocolManagerEx(eth.blockchain.Config(), config.SyncMode, networkID, eth.eventMux, eth.txPool, eth.orderPool, eth.lendingPool, eth.engine, eth.blockchain, chainDb); err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -780,7 +780,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
|||
pm.knownTxs.Add(tx.Hash(), struct{}{})
|
||||
}
|
||||
}
|
||||
pm.txpool.Add(txs, false, false)
|
||||
pm.txpool.Add(txs, false)
|
||||
|
||||
case msg.Code == OrderTxMsg:
|
||||
// Transactions arrived, make sure we have a valid and fresh chain to handle them
|
||||
|
|
|
|||
|
|
@ -132,7 +132,7 @@ func (p *testTxPool) Get(hash common.Hash) *types.Transaction {
|
|||
|
||||
// Add appends a batch of transactions to the pool, and notifies any
|
||||
// listeners if the addition channel is non nil
|
||||
func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
|
||||
func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ var errorToString = map[int]string{
|
|||
|
||||
type txPool interface {
|
||||
// Add should add the given transactions to the pool.
|
||||
Add(txs []*types.Transaction, local bool, sync bool) []error
|
||||
Add(txs []*types.Transaction, sync bool) []error
|
||||
|
||||
// Pending should return pending transactions.
|
||||
// The slice should be modifiable by the caller.
|
||||
|
|
|
|||
|
|
@ -136,7 +136,7 @@ func testSendTransactions(t *testing.T, protocol int) {
|
|||
tx := newTestTransaction(testAccount, uint64(nonce), txsize)
|
||||
alltxs[nonce] = tx
|
||||
}
|
||||
pm.txpool.Add(alltxs, false, false)
|
||||
pm.txpool.Add(alltxs, false)
|
||||
|
||||
// Connect several peers. They should all receive the pending transactions.
|
||||
var wg sync.WaitGroup
|
||||
|
|
|
|||
Loading…
Reference in a new issue