perf(core/txpool): pre-filter dynamic fees during pending tx retrieval #29005 (#2137)

Introduce dynamic-fee pre-filtering in txpool pending retrieval to reduce
allocations and downstream processing work during mining and tx propagation.

What changed:
- Change the `Pending` API from `Pending(enforceTips bool)` to
  `Pending(minTip *uint256.Int, baseFee *uint256.Int)` in txpool interfaces.
- Implement pre-filtering in `legacypool.Pending` by comparing effective tip
  against the provided `minTip/baseFee` for non-local, non-special txs.
- Update call sites to the new API:
  - miner work assembly (`miner/worker.go`)
  - tx sync (`eth/sync.go`)
  - pool transaction retrieval (`eth/api_backend.go`)
  - protocol/test interfaces (`eth/protocol.go`, `eth/helper_test.go`).

Tests:
- Add targeted coverage for pending filtering semantics in
  `core/txpool/legacypool/legacypool_test.go`, including:
  - minTip threshold boundary behavior
  - baseFee-aware effective tip filtering
  - local/special transaction exemption behavior
  - dynamic-fee boundary behavior when baseFee is nil.

Impact:
- Preserves existing behavior while making pending selection cheaper for
  downstream consumers.
- Improves confidence in edge-case behavior through dedicated tests.
This commit is contained in:
Daniel Liu 2026-03-11 11:12:09 +08:00 committed by GitHub
parent 5bc363275c
commit b5eec529d0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 221 additions and 16 deletions

View file

@ -577,24 +577,34 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
}
// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
// account and sorted by nonce.
//
// The enforceTips parameter can be used to do an extra filtering on the pending
// transactions and only return those whose **effective** tip is large enough in
// the next pending execution environment.
func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(minTip *uint256.Int, baseFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
pool.mu.Lock()
defer pool.mu.Unlock()
// Convert the new uint256.Int types to the old big.Int ones used by the legacy pool
var (
minTipBig *big.Int
baseFeeBig *big.Int
)
if minTip != nil {
minTipBig = minTip.ToBig()
}
if baseFee != nil {
baseFeeBig = baseFee.ToBig()
}
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs := list.Flatten()
// If the miner requests tip enforcement, cap the lists now
if enforceTips && !pool.locals.contains(addr) {
if minTipBig != nil && !pool.locals.contains(addr) {
for i, tx := range txs {
if !tx.IsSpecialTransaction() && tx.EffectiveGasTipIntCmp(pool.gasTip.Load().ToBig(), pool.priced.urgent.baseFee) < 0 {
if !tx.IsSpecialTransaction() && tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
txs = txs[:i]
break
}

View file

@ -3066,6 +3066,185 @@ func BenchmarkMultiAccountBatchInsert(b *testing.B) {
}
}
func TestPendingMinTipThreshold(t *testing.T) {
t.Parallel()
pool, key := setupPool()
defer pool.Close()
addr := crypto.PubkeyToAddress(key.PublicKey)
testAddBalance(pool, addr, big.NewInt(1_000_000_000_000_000))
threshold := new(big.Int).Add(new(big.Int).Set(common.MinGasPrice), big.NewInt(100))
aboveThreshold := new(big.Int).Set(threshold)
belowThreshold := new(big.Int).Sub(new(big.Int).Set(threshold), big.NewInt(1))
if err := pool.addRemoteSync(pricedTransaction(0, 100000, aboveThreshold, key)); err != nil {
t.Fatalf("failed to add tx nonce 0: %v", err)
}
if err := pool.addRemoteSync(pricedTransaction(1, 100000, belowThreshold, key)); err != nil {
t.Fatalf("failed to add tx nonce 1: %v", err)
}
filtered := pool.Pending(uint256.MustFromBig(threshold), nil)
txs := filtered[addr]
if len(txs) != 1 {
t.Fatalf("unexpected filtered tx count: have %d, want %d", len(txs), 1)
}
resolved := txs[0].Resolve()
if resolved == nil || resolved.Nonce() != 0 {
have := uint64(math.MaxUint64)
if resolved != nil {
have = resolved.Nonce()
}
t.Fatalf("unexpected surviving tx after tip filter: got nonce %d", have)
}
all := pool.Pending(nil, nil)
if len(all[addr]) != 2 {
t.Fatalf("unexpected unfiltered tx count: have %d, want %d", len(all[addr]), 2)
}
}
func TestPendingMinTipWithBaseFee(t *testing.T) {
t.Parallel()
pool, key := setupPoolWithConfig(eip1559Config)
defer pool.Close()
addr := crypto.PubkeyToAddress(key.PublicKey)
testAddBalance(pool, addr, big.NewInt(1_000_000_000_000_000))
minGasTip := new(big.Int).Set(common.MinGasPrice)
tipPass := new(big.Int).Add(new(big.Int).Set(minGasTip), big.NewInt(80))
tipPassWithoutBaseFeeOnly := new(big.Int).Add(new(big.Int).Set(minGasTip), big.NewInt(60))
if err := pool.addRemoteSync(dynamicFeeTx(0, 100000, new(big.Int).Add(new(big.Int).Set(minGasTip), big.NewInt(300)), tipPass, key)); err != nil {
t.Fatalf("failed to add tx nonce 0: %v", err)
}
if err := pool.addRemoteSync(dynamicFeeTx(1, 100000, new(big.Int).Add(new(big.Int).Set(minGasTip), big.NewInt(240)), tipPassWithoutBaseFeeOnly, key)); err != nil {
t.Fatalf("failed to add tx nonce 1: %v", err)
}
if err := pool.addRemoteSync(dynamicFeeTx(2, 100000, new(big.Int).Add(new(big.Int).Set(minGasTip), big.NewInt(240)), tipPassWithoutBaseFeeOnly, key)); err != nil {
t.Fatalf("failed to add tx nonce 2: %v", err)
}
minTipBig := new(big.Int).Add(new(big.Int).Set(minGasTip), big.NewInt(45))
baseFeeBig := big.NewInt(200)
minTip := uint256.MustFromBig(minTipBig)
baseFee := uint256.MustFromBig(baseFeeBig)
withBaseFee := pool.Pending(minTip, baseFee)
txs := withBaseFee[addr]
if len(txs) != 1 {
t.Fatalf("unexpected tx count with base fee filter: have %d, want %d", len(txs), 1)
}
resolved := txs[0].Resolve()
if resolved == nil || resolved.Nonce() != 0 {
have := uint64(math.MaxUint64)
if resolved != nil {
have = resolved.Nonce()
}
t.Fatalf("unexpected surviving tx with base fee filter: got nonce %d", have)
}
withoutBaseFee := pool.Pending(minTip, nil)
if len(withoutBaseFee[addr]) != 3 {
t.Fatalf("unexpected tx count without base fee filter: have %d, want %d", len(withoutBaseFee[addr]), 3)
}
}
func TestPendingKeepsLocalAndSpecialTransactions(t *testing.T) {
t.Parallel()
pool, _ := setupPool()
defer pool.Close()
minGasTip := new(big.Int).Set(common.MinGasPrice)
filterTip := new(big.Int).Add(new(big.Int).Set(minGasTip), big.NewInt(100))
specialKey, _ := crypto.GenerateKey()
specialAddr := crypto.PubkeyToAddress(specialKey.PublicKey)
testAddBalance(pool, specialAddr, big.NewInt(1_000_000_000_000_000))
specialTx, _ := types.SignTx(types.NewTransaction(0, common.BlockSignersBinary, big.NewInt(1), 100000, new(big.Int).Add(new(big.Int).Set(minGasTip), big.NewInt(1)), nil), types.HomesteadSigner{}, specialKey)
if err := pool.addRemoteSync(specialTx); err != nil {
t.Fatalf("failed to add special tx: %v", err)
}
normalTx := pricedTransaction(1, 100000, new(big.Int).Add(new(big.Int).Set(minGasTip), big.NewInt(200)), specialKey)
if err := pool.addRemoteSync(normalTx); err != nil {
t.Fatalf("failed to add normal tx after special tx: %v", err)
}
localKey, _ := crypto.GenerateKey()
localAddr := crypto.PubkeyToAddress(localKey.PublicKey)
testAddBalance(pool, localAddr, big.NewInt(1_000_000_000_000_000))
if err := pool.addLocal(pricedTransaction(0, 100000, new(big.Int).Add(new(big.Int).Set(minGasTip), big.NewInt(1)), localKey)); err != nil {
t.Fatalf("failed to add local tx: %v", err)
}
filtered := pool.Pending(uint256.MustFromBig(filterTip), nil)
specialPending := filtered[specialAddr]
if len(specialPending) != 2 {
t.Fatalf("unexpected special account tx count: have %d, want %d", len(specialPending), 2)
}
first := specialPending[0].Resolve()
if first == nil || first.Nonce() != 0 || !first.IsSpecialTransaction() {
t.Fatalf("special tx should survive filtering at nonce 0")
}
second := specialPending[1].Resolve()
if second == nil || second.Nonce() != 1 {
t.Fatalf("unexpected tx order for special account")
}
localPending := filtered[localAddr]
if len(localPending) != 1 {
t.Fatalf("unexpected local account tx count: have %d, want %d", len(localPending), 1)
}
local := localPending[0].Resolve()
if local == nil || local.Nonce() != 0 {
t.Fatalf("unexpected local tx after filtering")
}
}
func TestPendingDynamicFeeThresholdWithoutBaseFee(t *testing.T) {
t.Parallel()
pool, key := setupPoolWithConfig(eip1559Config)
defer pool.Close()
addr := crypto.PubkeyToAddress(key.PublicKey)
testAddBalance(pool, addr, big.NewInt(1_000_000_000_000_000))
minTipBig := new(big.Int).Add(new(big.Int).Set(common.MinGasPrice), big.NewInt(50))
equalTip := new(big.Int).Set(minTipBig)
belowTip := new(big.Int).Sub(new(big.Int).Set(minTipBig), big.NewInt(1))
if err := pool.addRemoteSync(dynamicFeeTx(0, 100000, new(big.Int).Add(new(big.Int).Set(equalTip), big.NewInt(100)), equalTip, key)); err != nil {
t.Fatalf("failed to add tx nonce 0: %v", err)
}
if err := pool.addRemoteSync(dynamicFeeTx(1, 100000, new(big.Int).Add(new(big.Int).Set(equalTip), big.NewInt(100)), belowTip, key)); err != nil {
t.Fatalf("failed to add tx nonce 1: %v", err)
}
filtered := pool.Pending(uint256.MustFromBig(minTipBig), nil)
txs := filtered[addr]
if len(txs) != 1 {
t.Fatalf("unexpected filtered tx count: have %d, want %d", len(txs), 1)
}
resolved := txs[0].Resolve()
if resolved == nil || resolved.Nonce() != 0 {
have := uint64(math.MaxUint64)
if resolved != nil {
have = resolved.Nonce()
}
t.Fatalf("unexpected surviving tx without base fee: got nonce %d", have)
}
all := pool.Pending(nil, nil)
if len(all[addr]) != 2 {
t.Fatalf("unexpected unfiltered tx count: have %d, want %d", len(all[addr]), 2)
}
}
// TestSetGasPrice tests the SetGasPrice validation logic using table-driven tests
func TestSetGasPrice(t *testing.T) {
testCases := []struct {

View file

@ -24,6 +24,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/holiman/uint256"
)
// LazyTransaction contains a small subset of the transaction properties that is
@ -109,7 +110,10 @@ type SubPool interface {
// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
Pending(enforceTips bool) map[common.Address][]*LazyTransaction
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
Pending(minTip *uint256.Int, baseFee *uint256.Int) map[common.Address][]*LazyTransaction
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions

View file

@ -25,6 +25,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/holiman/uint256"
)
// TxStatus is the current status of a transaction as seen by the pool.
@ -250,10 +251,13 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *TxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int) map[common.Address][]*LazyTransaction {
txs := make(map[common.Address][]*LazyTransaction)
for _, subpool := range p.subpools {
maps.Copy(txs, subpool.Pending(enforceTips))
maps.Copy(txs, subpool.Pending(minTip, baseFee))
}
return txs
}

View file

@ -305,7 +305,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
}
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(false)
pending := b.eth.txPool.Pending(nil, nil)
var txs types.Transactions
for _, batch := range pending {
for _, lazy := range batch {

View file

@ -42,6 +42,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/p2p/discover"
"github.com/XinFinOrg/XDPoSChain/params"
"github.com/holiman/uint256"
)
var (
@ -146,7 +147,7 @@ func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []erro
}
// Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
func (p *testTxPool) Pending(minTip *uint256.Int, baseFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction {
p.lock.RLock()
defer p.lock.RUnlock()

View file

@ -27,6 +27,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/rlp"
"github.com/holiman/uint256"
)
// Constants to match up protocol versions and messages
@ -109,7 +110,7 @@ type txPool interface {
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction
Pending(minTip *uint256.Int, baseFee *uint256.Int) map[common.Address][]*txpool.LazyTransaction
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions

View file

@ -45,7 +45,7 @@ type txsync struct {
// syncTransactions starts sending all currently pending transactions to the given peer.
func (pm *ProtocolManager) syncTransactions(p *peer) {
var txs types.Transactions
pending := pm.txpool.Pending(false)
pending := pm.txpool.Pending(nil, nil)
for _, batch := range pending {
for _, lazy := range batch {
if tx := lazy.Resolve(); tx != nil {

View file

@ -46,6 +46,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/params"
"github.com/XinFinOrg/XDPoSChain/trie"
mapset "github.com/deckarep/golang-set/v2"
"github.com/holiman/uint256"
)
const (
@ -860,7 +861,12 @@ func (w *worker) commitNewWork() {
log.Error("[commitNewWork] fail to check if block is epoch switch block when fetching pending transactions", "BlockNum", header.Number, "Hash", header.Hash())
}
if !isEpochSwitchBlock {
pending := w.eth.TxPool().Pending(true)
// Retrieve the pending transactions pre-filtered by the 1559 dynamic fees
var baseFee *uint256.Int
if header.BaseFee != nil {
baseFee = uint256.MustFromBig(header.BaseFee)
}
pending := w.eth.TxPool().Pending(uint256.MustFromBig(w.tip), baseFee)
txs, specialTxs = newTransactionsByPriceAndNonce(w.current.signer, pending, feeCapacity, header.BaseFee)
}
}