core, eth, miner: 4844 blob transaction pool #26940 (#1911)

This commit is contained in:
Daniel Liu 2026-01-19 13:54:01 +08:00 committed by GitHub
parent 4a7eea65b9
commit 3e1f75eafb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 772 additions and 377 deletions

View file

@ -38,6 +38,10 @@ var (
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
// ErrAccountLimitExceeded is returned if a transaction would exceed the number
// allowed by a pool for a single account.
ErrAccountLimitExceeded = errors.New("account limit exceeded")
// ErrGasLimit is returned if a transaction's requested gas limit exceeds the
// maximum allowance of the current block.
ErrGasLimit = errors.New("exceeds block gas limit")

View file

@ -231,6 +231,7 @@ type LegacyPool struct {
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *journal // Journal of local transaction to back up to disk
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
@ -307,7 +308,10 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
// head to allow balance / nonce checks. The transaction journal will be loaded
// from disk and filtered based on the provided starting settings. The internal
// goroutines will be spun up and the pool deemed operational afterwards.
func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header) error {
func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.AddressReserver) error {
// Set the address reserver to request exclusive access to pooled accounts
pool.reserve = reserve
// Set the basic pool parameters
pool.gasTip.Store(gasTip)
@ -390,7 +394,7 @@ func (pool *LegacyPool) loop() {
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.queue[addr].Flatten()
for _, tx := range list {
pool.removeTx(tx.Hash(), true)
pool.removeTx(tx.Hash(), true, true)
}
queuedEvictionMeter.Mark(int64(len(list)))
}
@ -468,7 +472,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) error {
// pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead
drop := pool.all.RemotesBelowTip(tip)
for _, tx := range drop {
pool.removeTx(tx.Hash(), false)
pool.removeTx(tx.Hash(), false, true)
}
pool.priced.Removed(len(drop))
}
@ -549,11 +553,11 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
// The enforceTips parameter can be used to do an extra filtering on the pending
// transactions and only return those whose **effective** tip is large enough in
// the next pending execution environment.
func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction {
func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
pool.mu.Lock()
defer pool.mu.Unlock()
pending := make(map[common.Address][]*types.Transaction, len(pool.pending))
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs := list.Flatten()
@ -567,7 +571,18 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*types.Tr
}
}
if len(txs) > 0 {
pending[addr] = txs
lazies := make([]*txpool.LazyTransaction, len(txs))
for i := 0; i < len(txs); i++ {
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: &txpool.Transaction{Tx: txs[i]},
Time: txs[i].Time(),
GasFeeCap: txs[i].GasFeeCap(),
GasTipCap: txs[i].GasTipCap(),
}
}
pending[addr] = lazies
}
}
return pending
@ -630,6 +645,16 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction, local bool) error {
State: pool.currentState,
FirstNonceGap: nil, // Pool allows arbitrary arrival order, don't invalidate nonce gaps
UsedAndLeftSlots: func(addr common.Address) (int, int) {
var have int
if list := pool.pending[addr]; list != nil {
have += list.Len()
}
if list := pool.queue[addr]; list != nil {
have += list.Len()
}
return have, math.MaxInt
},
ExistingExpenditure: func(addr common.Address) *big.Int {
if list := pool.pending[addr]; list != nil {
return list.totalcost
@ -703,13 +728,35 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
invalidTxMeter.Mark(1)
return false, err
}
// already validated by this point
from, _ := types.Sender(pool.signer, tx)
if tx.IsSpecialTransaction() && pool.IsSigner(from) && pool.pendingNonces.get(from) == tx.Nonce() {
return pool.promoteSpecialTx(from, tx, isLocal)
}
// If the address is not yet known, request exclusivity to track the account
// only by this subpool until all transactions are evicted
var (
_, hasPending = pool.pending[from]
_, hasQueued = pool.queue[from]
)
if !hasPending && !hasQueued {
if err := pool.reserve(from, true); err != nil {
return false, err
}
defer func() {
// If the transaction is rejected by some post-validation check, remove
// the lock on the reservation set.
//
// Note, `err` here is the named error return, which will be initialized
// by a return statement before running deferred methods. Take care with
// removing or subscoping err as it will break this clause.
if err != nil {
pool.reserve(from, false)
}
}()
}
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
@ -763,7 +810,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
dropped := pool.removeTx(tx.Hash(), false)
sender, _ := types.Sender(pool.signer, tx)
dropped := pool.removeTx(tx.Hash(), false, sender != from) // Don't unreserve the sender of the tx being added if last from the acc
pool.changesSinceReorg += dropped
}
}
@ -1126,8 +1176,14 @@ func (pool *LegacyPool) Has(hash common.Hash) bool {
// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
//
// In unreserve is false, the account will not be relinquished to the main txpool
// even if there are no more references to it. This is used to handle a race when
// a tx being added, and it evicts a previously scheduled tx from the same account,
// which could lead to a premature release of the lock.
//
// Returns the number of transactions removed from the pending queue.
func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool) int {
func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bool) int {
// Fetch the transaction we wish to delete
tx := pool.all.Get(hash)
if tx == nil {
@ -1135,6 +1191,20 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool) int {
}
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
// If after deletion there are no more transactions belonging to this account,
// relinquish the address reservation. It's a bit convoluted do this, via a
// defer, but it's safer vs. the many return pathways.
if unreserve {
defer func() {
var (
_, hasPending = pool.pending[addr]
_, hasQueued = pool.queue[addr]
)
if !hasPending && !hasQueued {
pool.reserve(addr, false)
}
}()
}
// Remove it from the list of known transactions
pool.all.Remove(hash)
if outofbound {
@ -1383,7 +1453,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// there's nothing to add
if newNum >= oldNum {
// If we reorged to a same or higher number, then it's not a case of setHead
log.Warn("Transaction pool reset with missing oldhead",
log.Warn("Transaction pool reset with missing old head",
"old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum)
return
}
@ -1427,7 +1497,13 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
return
}
}
reinject = types.TxDifference(discarded, included)
lost := make([]*types.Transaction, 0, len(discarded))
for _, tx := range types.TxDifference(discarded, included) {
if pool.Filter(tx) {
lost = append(lost, tx)
}
}
reinject = lost
}
}
}
@ -1522,6 +1598,9 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
if list.Empty() {
delete(pool.queue, addr)
delete(pool.beats, addr)
if _, ok := pool.pending[addr]; !ok {
pool.reserve(addr, false)
}
}
}
return promoted
@ -1643,7 +1722,7 @@ func (pool *LegacyPool) truncateQueue() {
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash(), true)
pool.removeTx(tx.Hash(), true, true)
}
drop -= size
queuedRateLimitMeter.Mark(int64(size))
@ -1652,7 +1731,7 @@ func (pool *LegacyPool) truncateQueue() {
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true)
pool.removeTx(txs[i].Hash(), true, true)
drop--
queuedRateLimitMeter.Mark(1)
}
@ -1719,6 +1798,9 @@ func (pool *LegacyPool) demoteUnexecutables() {
// Delete the entire pending entry if it became empty.
if list.Empty() {
delete(pool.pending, addr)
if _, ok := pool.queue[addr]; !ok {
pool.reserve(addr, false)
}
}
}
}

View file

@ -85,7 +85,7 @@ func TestTransactionFutureAttack(t *testing.T) {
config.GlobalQueue = 100
config.GlobalSlots = 100
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
fillPool(t, pool)
pending, _ := pool.Stats()
@ -119,7 +119,7 @@ func TestTransactionFuture1559(t *testing.T) {
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create a number of test accounts, fund them and make transactions
@ -152,7 +152,7 @@ func TestTransactionZAttack(t *testing.T) {
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create a number of test accounts, fund them and make transactions
fillPool(t, pool)
@ -226,7 +226,7 @@ func BenchmarkFutureAttack(b *testing.B) {
config.GlobalQueue = 100
config.GlobalSlots = 100
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
fillPool(b, pool)

View file

@ -25,6 +25,7 @@ import (
"math/big"
"math/rand"
"os"
"sync"
"sync/atomic"
"testing"
"time"
@ -143,6 +144,31 @@ func dynamicFeeTx(nonce uint64, gaslimit uint64, gasFee *big.Int, tip *big.Int,
return tx
}
func makeAddressReserver() txpool.AddressReserver {
var (
reserved = make(map[common.Address]struct{})
lock sync.Mutex
)
return func(addr common.Address, reserve bool) error {
lock.Lock()
defer lock.Unlock()
_, exists := reserved[addr]
if reserve {
if exists {
panic("already reserved")
}
reserved[addr] = struct{}{}
return nil
}
if !exists {
panic("not reserved")
}
delete(reserved, addr)
return nil
}
}
func setupPool() (*LegacyPool, *ecdsa.PrivateKey) {
return setupPoolWithConfig(params.TestChainConfig)
}
@ -154,7 +180,7 @@ func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.Privat
key, _ := crypto.GenerateKey()
pool := New(testTxPoolConfig, blockchain)
if err := pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()); err != nil {
if err := pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()); err != nil {
panic(err)
}
// wait for the pool to initialize
@ -274,7 +300,7 @@ func TestStateChangeDuringReset(t *testing.T) {
tx1 := pricedTransaction(1, 100000, big.NewInt(250000000), key)
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
nonce := pool.Nonce(address)
@ -546,7 +572,7 @@ func TestChainFork(t *testing.T) {
if _, err := pool.add(tx, false); err != nil {
t.Error("didn't expect error", err)
}
pool.removeTx(tx.Hash(), true)
pool.removeTx(tx.Hash(), true, true)
// reset the pool's internal state
resetState()
@ -769,7 +795,7 @@ func TestPostponing(t *testing.T) {
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create two test accounts to produce different gap profiles with
@ -987,7 +1013,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) {
config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible)
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create a number of test accounts and fund them (last one will be the local)
@ -1078,7 +1104,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
config.NoLocals = nolocals
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create two test accounts to ensure remotes expire but locals do not
@ -1264,7 +1290,7 @@ func TestPendingGlobalLimiting(t *testing.T) {
config.GlobalSlots = config.AccountSlots * 10
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create a number of test accounts and fund them
@ -1368,7 +1394,7 @@ func TestCapClearsFromAll(t *testing.T) {
config.GlobalSlots = 8
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create a number of test accounts and fund them
@ -1402,7 +1428,7 @@ func TestPendingMinimumAllowance(t *testing.T) {
config.GlobalSlots = 1
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create a number of test accounts and fund them
@ -1448,7 +1474,7 @@ func TestRepricing(t *testing.T) {
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Keep track of transaction events to ensure all executables get announced
@ -1697,7 +1723,7 @@ func TestRepricingKeepsLocals(t *testing.T) {
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create a number of test accounts and fund them
@ -1776,7 +1802,7 @@ func TestUnderpricing(t *testing.T) {
config.GlobalQueue = 2
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Keep track of transaction events to ensure all executables get announced
@ -1895,7 +1921,7 @@ func TestStableUnderpricing(t *testing.T) {
config.AccountSlots = config.GlobalSlots - 1
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Keep track of transaction events to ensure all executables get announced
@ -2124,7 +2150,7 @@ func TestDeduplication(t *testing.T) {
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create a test account to add transactions with
@ -2191,7 +2217,7 @@ func TestReplacement(t *testing.T) {
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Keep track of transaction events to ensure all executables get announced
@ -2403,7 +2429,7 @@ func testJournaling(t *testing.T, nolocals bool) {
config.Rejournal = time.Second
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
// Create two test accounts to ensure remotes expire but locals do not
local, _ := crypto.GenerateKey()
@ -2441,7 +2467,7 @@ func testJournaling(t *testing.T, nolocals bool) {
blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool = New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
pending, queued = pool.Stats()
if queued != 0 {
@ -2468,7 +2494,7 @@ func testJournaling(t *testing.T, nolocals bool) {
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool = New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
pending, queued = pool.Stats()
if pending != 0 {
@ -2499,7 +2525,7 @@ func TestStatusCheck(t *testing.T) {
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create the test accounts to check various transaction statuses with

View file

@ -18,6 +18,7 @@ package txpool
import (
"math/big"
"time"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core"
@ -31,6 +32,32 @@ type Transaction struct {
Tx *types.Transaction // Canonical transaction
}
// LazyTransaction contains a small subset of the transaction properties that is
// enough for the miner and other APIs to handle large batches of transactions;
// and supports pulling up the entire transaction when really needed.
type LazyTransaction struct {
Pool SubPool // Transaction subpool to pull the real transaction up
Hash common.Hash // Transaction hash to pull up if needed
Tx *Transaction // Transaction if already resolved
Time time.Time // Time when the transaction was first seen
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay
}
// Resolve retrieves the full transaction belonging to a lazy handle if it is still
// maintained by the transaction pool.
func (ltx *LazyTransaction) Resolve() *Transaction {
if ltx.Tx == nil {
ltx.Tx = ltx.Pool.Get(ltx.Hash)
}
return ltx.Tx
}
// AddressReserver is passed by the main transaction pool to subpools, so they
// may request (and relinquish) exclusive access to certain addresses.
type AddressReserver func(addr common.Address, reserve bool) error
// SubPool represents a specialized transaction pool that lives on its own (e.g.
// blob pool). Since independent of how many specialized pools we have, they do
// need to be updated in lockstep and assemble into one coherent view for block
@ -48,7 +75,7 @@ type SubPool interface {
// These should not be passed as a constructor argument - nor should the pools
// start by themselves - in order to keep multiple subpools in lockstep with
// one another.
Init(gasTip *big.Int, head *types.Header) error
Init(gasTip *big.Int, head *types.Header, reserve AddressReserver) error
// Close terminates any background processing threads and releases any held
// resources.
@ -76,7 +103,7 @@ type SubPool interface {
// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
Pending(enforceTips bool) map[common.Address][]*types.Transaction
Pending(enforceTips bool) map[common.Address][]*LazyTransaction
// SubscribeTransactions subscribes to new transaction events.
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription

View file

@ -17,14 +17,18 @@
package txpool
import (
"errors"
"fmt"
"maps"
"math/big"
"sync"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
)
// TxStatus is the current status of a transaction as seen by the pool.
@ -37,6 +41,15 @@ const (
TxStatusIncluded
)
var (
// reservationsGaugeName is the prefix of a per-subpool address reservation
// metric.
//
// This is mostly a sanity metric to ensure there's no bug that would make
// some subpool hog all the reservations due to mis-accounting.
reservationsGaugeName = "txpool/reservations"
)
// BlockChain defines the minimal set of methods needed to back a tx pool with
// a chain. Exists to allow mocking the live chain out of tests.
type BlockChain interface {
@ -53,9 +66,13 @@ type BlockChain interface {
// They exit the pool when they are included in the blockchain or evicted due to
// resource constraints.
type TxPool struct {
subpools []SubPool // List of subpools for specialized transaction handling
subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown
quit chan chan error // Quit channel to tear down the head updater
subpools []SubPool // List of subpools for specialized transaction handling
reservations map[common.Address]SubPool // Map with the account to pool reservations
reserveLock sync.Mutex // Lock protecting the account reservations
subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown
quit chan chan error // Quit channel to tear down the head updater
}
// New creates a new transaction pool to gather, sort and filter inbound
@ -67,11 +84,12 @@ func New(gasTip *big.Int, chain BlockChain, subpools []SubPool) (*TxPool, error)
head := chain.CurrentBlock()
pool := &TxPool{
subpools: subpools,
quit: make(chan chan error),
subpools: subpools,
reservations: make(map[common.Address]SubPool),
quit: make(chan chan error),
}
for i, subpool := range subpools {
if err := subpool.Init(gasTip, head); err != nil {
if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil {
for j := i - 1; j >= 0; j-- {
subpools[j].Close()
}
@ -82,6 +100,52 @@ func New(gasTip *big.Int, chain BlockChain, subpools []SubPool) (*TxPool, error)
return pool, nil
}
// reserver is a method to create an address reservation callback to exclusively
// assign/deassign addresses to/from subpools. This can ensure that at any point
// in time, only a single subpool is able to manage an account, avoiding cross
// subpool eviction issues and nonce conflicts.
func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver {
return func(addr common.Address, reserve bool) error {
p.reserveLock.Lock()
defer p.reserveLock.Unlock()
owner, exists := p.reservations[addr]
if reserve {
// Double reservations are forbidden even from the same pool to
// avoid subtle bugs in the long term.
if exists {
if owner == subpool {
log.Error("pool attempted to reserve already-owned address", "address", addr)
return nil // Ignore fault to give the pool a chance to recover while the bug gets fixed
}
return errors.New("address already reserved")
}
p.reservations[addr] = subpool
if metrics.Enabled() {
m := fmt.Sprintf("%s/%d", reservationsGaugeName, id)
metrics.GetOrRegisterGauge(m, nil).Inc(1)
}
return nil
}
// Ensure subpools only attempt to unreserve their own owned addresses,
// otherwise flag as a programming error.
if !exists {
log.Error("pool attempted to unreserve non-reserved address", "address", addr)
return errors.New("address not reserved")
}
if subpool != owner {
log.Error("pool attempted to unreserve non-owned address", "address", addr)
return errors.New("address not owned")
}
delete(p.reservations, addr)
if metrics.Enabled() {
m := fmt.Sprintf("%s/%d", reservationsGaugeName, id)
metrics.GetOrRegisterGauge(m, nil).Dec(1)
}
return nil
}
}
// Close terminates the transaction pool and all its subpools.
func (p *TxPool) Close() error {
var errs []error
@ -245,8 +309,8 @@ func (p *TxPool) Add(txs []*Transaction, local bool, sync bool) []error {
// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction {
txs := make(map[common.Address][]*types.Transaction)
func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction {
txs := make(map[common.Address][]*LazyTransaction)
for _, subpool := range p.subpools {
maps.Copy(txs, subpool.Pending(enforceTips))
}

View file

@ -132,6 +132,11 @@ type ValidationOptionsWithState struct {
// nonce gaps will be ignored and permitted.
FirstNonceGap func(addr common.Address) uint64
// UsedAndLeftSlots is a mandatory callback to retrieve the number of tx slots
// used and the number still permitted for an account. New transactions will
// be rejected once the number of remaining slots reaches zero.
UsedAndLeftSlots func(addr common.Address) (int, int)
// ExistingExpenditure is a mandatory callback to retrieve the cumulative
// cost of the already pooled transactions to check for overdrafts.
ExistingExpenditure func(addr common.Address) *big.Int
@ -210,6 +215,12 @@ func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, op
if newBalance.Cmp(need) < 0 {
return fmt.Errorf("%w: balance %v, queued cost %v, tx cost %v, overshot %v", core.ErrInsufficientFunds, balance, spent, cost, new(big.Int).Sub(need, newBalance))
}
// Transaction takes a new nonce value out of the pool. Ensure it doesn't
// overflow the number of permitted transactions from a single accoun
// (i.e. max cancellable via out-of-bound transaction).
if used, left := opts.UsedAndLeftSlots(from); left <= 0 {
return fmt.Errorf("%w: pooled %d txs", ErrAccountLimitExceeded, used)
}
}
// Ensure sender and receiver are not in black list

View file

@ -18,7 +18,6 @@ package types
import (
"bytes"
"container/heap"
"errors"
"fmt"
"io"
@ -401,6 +400,19 @@ func (tx *Transaction) SetCodeAuthorizations() []SetCodeAuthorization {
return setcodetx.AuthList
}
// SetTime sets the decoding time of a transaction. This is used by tests to set
// arbitrary times and by persistent transaction pools when loading old txs from
// disk.
func (tx *Transaction) SetTime(t time.Time) {
tx.time = t
}
// Time returns the time when the transaction was first seen on the network. It
// is a heuristic to prefer mining older txs vs new all other things equal.
func (tx *Transaction) Time() time.Time {
return tx.time
}
// Hash returns the transaction hash.
func (tx *Transaction) Hash() common.Hash {
if hash := tx.hash.Load(); hash != nil {
@ -715,128 +727,6 @@ func (s TxByNonce) Len() int { return len(s) }
func (s TxByNonce) Less(i, j int) bool { return s[i].Nonce() < s[j].Nonce() }
func (s TxByNonce) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// TxByPriceAndTime implements both the sort and the heap interface, making it useful
// for all at once sorting as well as individually adding and removing elements.
type TxByPriceAndTime struct {
txs Transactions
payersSwap map[common.Address]*big.Int
}
func (s TxByPriceAndTime) Len() int { return len(s.txs) }
func (s TxByPriceAndTime) Less(i, j int) bool {
i_price := s.txs[i].GasPrice()
if s.txs[i].To() != nil {
if _, ok := s.payersSwap[*s.txs[i].To()]; ok {
i_price = common.TRC21GasPrice
}
}
j_price := s.txs[j].GasPrice()
if s.txs[j].To() != nil {
if _, ok := s.payersSwap[*s.txs[j].To()]; ok {
j_price = common.TRC21GasPrice
}
}
// If the prices are equal, use the time the transaction was first seen for
// deterministic sorting
cmp := i_price.Cmp(j_price)
if cmp == 0 {
return s.txs[i].time.Before(s.txs[j].time)
}
return cmp > 0
}
func (s TxByPriceAndTime) Swap(i, j int) { s.txs[i], s.txs[j] = s.txs[j], s.txs[i] }
func (s *TxByPriceAndTime) Push(x interface{}) {
s.txs = append(s.txs, x.(*Transaction))
}
func (s *TxByPriceAndTime) Pop() interface{} {
old := s.txs
n := len(old)
x := old[n-1]
old[n-1] = nil // avoid memory leak
s.txs = old[0 : n-1]
return x
}
// TransactionsByPriceAndNonce represents a set of transactions that can return
// transactions in a profit-maximizing sorted order, while supporting removing
// entire batches of transactions for non-executable accounts.
type TransactionsByPriceAndNonce struct {
txs map[common.Address][]*Transaction // Per account nonce-sorted list of transactions
heads TxByPriceAndTime // Next transaction for each unique account (price heap)
signer Signer // Signer for the set of transactions
}
// NewTransactionsByPriceAndNonce creates a transaction set that can retrieve
// price sorted transactions in a nonce-honouring way.
//
// Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor.
//
// It also classifies special txs and normal txs
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address][]*Transaction, payersSwap map[common.Address]*big.Int) (*TransactionsByPriceAndNonce, Transactions) {
// Initialize a price and received time based heap with the head transactions
heads := TxByPriceAndTime{}
heads.payersSwap = payersSwap
specialTxs := Transactions{}
for _, accTxs := range txs {
from, _ := Sender(signer, accTxs[0])
var normalTxs Transactions
for _, tx := range accTxs {
if tx.IsSpecialTransaction() {
specialTxs = append(specialTxs, tx)
} else {
normalTxs = append(normalTxs, tx)
}
}
if len(normalTxs) > 0 {
heads.txs = append(heads.txs, normalTxs[0])
// Remove the first normal transaction for this sender
txs[from] = normalTxs[1:]
} else {
// Remove the account if there are no normal transactions
delete(txs, from)
}
}
heap.Init(&heads)
// Assemble and return the transaction set
return &TransactionsByPriceAndNonce{
txs: txs,
heads: heads,
signer: signer,
}, specialTxs
}
// Peek returns the next transaction by price.
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
if len(t.heads.txs) == 0 {
return nil
}
return t.heads.txs[0]
}
// Shift replaces the current best head with the next one from the same account.
func (t *TransactionsByPriceAndNonce) Shift() {
acc, _ := Sender(t.signer, t.heads.txs[0])
if txs, ok := t.txs[acc]; ok && len(txs) > 0 {
t.heads.txs[0], t.txs[acc] = txs[0], txs[1:]
heap.Fix(&t.heads, 0)
} else {
heap.Pop(&t.heads)
}
}
// Pop removes the best transaction, *not* replacing it with the next one from
// the same account. This should be used when a transaction cannot be executed
// and hence all subsequent ones should be discarded from the same account.
func (t *TransactionsByPriceAndNonce) Pop() {
heap.Pop(&t.heads)
}
// copyAddressPtr copies an address.
func copyAddressPtr(a *common.Address) *common.Address {
if a == nil {

View file

@ -25,7 +25,6 @@ import (
"math/big"
"reflect"
"testing"
"time"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/crypto"
@ -260,75 +259,6 @@ func TestRecipientNormal(t *testing.T) {
}
}
// Tests that transactions can be correctly sorted according to their price in
// decreasing order, but at the same time with increasing nonces when issued by
// the same account.
func TestTransactionPriceNonceSort(t *testing.T) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 25)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := HomesteadSigner{}
// Generate a batch of transactions with overlapping values, but shifted nonces
groups := map[common.Address][]*Transaction{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
for i := 0; i < 25; i++ {
tx, _ := SignTx(NewTransaction(uint64(start+i), common.Address{}, big.NewInt(100), 100, big.NewInt(int64(start+i)), nil), signer, key)
groups[addr] = append(groups[addr], tx)
}
}
// Sort the transactions and cross check the nonce ordering
txset, _ := NewTransactionsByPriceAndNonce(signer, groups, map[common.Address]*big.Int{})
txs := Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx)
txset.Shift()
}
if len(txs) != 25*25 {
t.Errorf("expected %d transactions, found %d", 25*25, len(txs))
}
for i, txi := range txs {
fromi, _ := Sender(signer, txi)
// Make sure the nonce order is valid
for j, txj := range txs[i+1:] {
fromj, _ := Sender(signer, txj)
if fromi == fromj && txi.Nonce() > txj.Nonce() {
t.Errorf("invalid nonce ordering: tx #%d (A=%x N=%v) < tx #%d (A=%x N=%v)", i, fromi[:4], txi.Nonce(), i+j, fromj[:4], txj.Nonce())
}
}
// Find the previous and next nonce of this account
prev, next := i-1, i+1
for j := i - 1; j >= 0; j-- {
if fromj, _ := Sender(signer, txs[j]); fromi == fromj {
prev = j
break
}
}
for j := i + 1; j < len(txs); j++ {
if fromj, _ := Sender(signer, txs[j]); fromi == fromj {
next = j
break
}
}
// Make sure that in between the neighbor nonces, the transaction is correctly positioned price wise
for j := prev + 1; j < next; j++ {
fromj, _ := Sender(signer, txs[j])
if j < i && txs[j].GasPrice().Cmp(txi.GasPrice()) < 0 {
t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", j, fromj[:4], txs[j].GasPrice(), i, fromi[:4], txi.GasPrice())
}
if j > i && txs[j].GasPrice().Cmp(txi.GasPrice()) > 0 {
t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) > tx #%d (A=%x P=%v)", j, fromj[:4], txs[j].GasPrice(), i, fromi[:4], txi.GasPrice())
}
}
}
}
// TestTransactionJSON tests serializing/de-serializing to/from JSON.
func TestTransactionJSON(t *testing.T) {
key, err := crypto.GenerateKey()
@ -371,54 +301,6 @@ func TestTransactionJSON(t *testing.T) {
}
}
// Tests that if multiple transactions have the same price, the ones seen earlier
// are prioritized to avoid network spam attacks aiming for a specific ordering.
func TestTransactionTimeSort(t *testing.T) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := HomesteadSigner{}
// Generate a batch of transactions with overlapping prices, but different creation times
groups := map[common.Address][]*Transaction{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
tx, _ := SignTx(NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)
tx.time = time.Unix(0, int64(len(keys)-start))
groups[addr] = append(groups[addr], tx)
}
// Sort the transactions and cross check the nonce ordering
txset, _ := NewTransactionsByPriceAndNonce(signer, groups, map[common.Address]*big.Int{})
txs := Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx)
txset.Shift()
}
if len(txs) != len(keys) {
t.Errorf("expected %d transactions, found %d", len(keys), len(txs))
}
for i, txi := range txs {
fromi, _ := Sender(signer, txi)
if i+1 < len(txs) {
next := txs[i+1]
fromNext, _ := Sender(signer, next)
if txi.GasPrice().Cmp(next.GasPrice()) < 0 {
t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", i, fromi[:4], txi.GasPrice(), i+1, fromNext[:4], next.GasPrice())
}
// Make sure time order is ascending if the txs have the same gas price
if txi.GasPrice().Cmp(next.GasPrice()) == 0 && txi.time.After(next.time) {
t.Errorf("invalid received time ordering: tx #%d (A=%x T=%v) > tx #%d (A=%x T=%v)", i, fromi[:4], txi.time, i+1, fromNext[:4], next.time)
}
}
}
}
// TestTransactionCoding tests serializing/de-serializing to/from rlp and JSON.
func TestTransactionCoding(t *testing.T) {
key, err := crypto.GenerateKey()
@ -718,78 +600,3 @@ func TestIsNonEVMTx(t *testing.T) {
})
}
}
// TestNewTransactionsByPriceAndNonce_SpecialSeparation uses table-driven tests to verify separation of special and normal transactions.
func TestNewTransactionsByPriceAndNonce_SpecialSeparation(t *testing.T) {
signer := HomesteadSigner{}
genNormalTx := func(nonce uint64, key *ecdsa.PrivateKey) *Transaction {
tx, _ := SignTx(NewTransaction(nonce, common.HexToAddress("0x1234567890123456789012345678901234567890"), big.NewInt(1), 21000, big.NewInt(1), nil), signer, key)
return tx
}
genSpecialTx := func(nonce uint64, key *ecdsa.PrivateKey) *Transaction {
tx, _ := SignTx(NewTransaction(nonce, common.BlockSignersBinary, big.NewInt(1), 21000, big.NewInt(1), nil), signer, key)
return tx
}
testCases := []struct {
name string
normalCount int
specialCount int
expectNormal int
expectSpecial int
}{
{"no transactions", 0, 0, 0, 0},
{"only 1 normal", 1, 0, 1, 0},
{"only 2 normal", 2, 0, 2, 0},
{"only 3 normal", 3, 0, 3, 0},
{"only 1 special", 0, 1, 0, 1},
{"only 2 special", 0, 2, 0, 2},
{"only 3 special", 0, 3, 0, 3},
{"1 normal, 1 special", 1, 1, 1, 1},
{"2 normal, 2 special", 2, 2, 2, 2},
{"3 normal, 3 special", 3, 3, 3, 3},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
key, _ := crypto.GenerateKey()
addr := crypto.PubkeyToAddress(key.PublicKey)
txs := make(Transactions, 0, tc.normalCount+tc.specialCount)
for i := 0; i < tc.normalCount; i++ {
txs = append(txs, genNormalTx(uint64(i), key))
}
for i := 0; i < tc.specialCount; i++ {
txs = append(txs, genSpecialTx(uint64(tc.normalCount+i), key))
}
group := map[common.Address][]*Transaction{}
if len(txs) > 0 {
group[addr] = txs
}
txset, specialTxs := NewTransactionsByPriceAndNonce(signer, group, map[common.Address]*big.Int{})
// Check special transactions
if len(specialTxs) != tc.expectSpecial {
t.Errorf("expected %d special txs, got %d", tc.expectSpecial, len(specialTxs))
}
for _, tx := range specialTxs {
if tx.To() == nil || *tx.To() != common.BlockSignersBinary {
t.Errorf("specialTxs contains non-special tx: %v", tx)
}
}
// Check normal transactions
normalCount := 0
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
if tx.To() == nil || *tx.To() == common.BlockSignersBinary {
t.Errorf("txset contains special or nil-to tx: %v", tx)
}
normalCount++
txset.Shift()
}
if normalCount != tc.expectNormal {
t.Errorf("expected %d normal txs, got %d", tc.expectNormal, normalCount)
}
})
}
}

View file

@ -312,7 +312,11 @@ func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(false)
var txs types.Transactions
for _, batch := range pending {
txs = append(txs, batch...)
for _, lazy := range batch {
if tx := lazy.Resolve(); tx != nil {
txs = append(txs, tx.Tx)
}
}
}
return txs, nil
}

View file

@ -150,7 +150,7 @@ func (p *testTxPool) Add(txs []*txpool.Transaction, local bool, sync bool) []err
}
// Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction {
func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction {
p.lock.RLock()
defer p.lock.RUnlock()
@ -162,7 +162,19 @@ func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*types.Trans
for _, batch := range batches {
sort.Sort(types.TxByNonce(batch))
}
return batches
pending := make(map[common.Address][]*txpool.LazyTransaction)
for addr, batch := range batches {
for _, tx := range batch {
pending[addr] = append(pending[addr], &txpool.LazyTransaction{
Hash: tx.Hash(),
Tx: &txpool.Transaction{Tx: tx},
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
})
}
}
return pending
}
// SubscribeNewTxsEvent should return an event subscription of NewTxsEvent and

View file

@ -109,7 +109,7 @@ type txPool interface {
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending(enforceTips bool) map[common.Address][]*types.Transaction
Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction
// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.

View file

@ -47,7 +47,11 @@ func (pm *ProtocolManager) syncTransactions(p *peer) {
var txs types.Transactions
pending := pm.txpool.Pending(false)
for _, batch := range pending {
txs = append(txs, batch...)
for _, lazy := range batch {
if tx := lazy.Resolve(); tx != nil {
txs = append(txs, tx.Tx)
}
}
}
if len(txs) == 0 {
return

190
miner/ordering.go Normal file
View file

@ -0,0 +1,190 @@
// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package miner
import (
"container/heap"
"math/big"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
)
// txWithMinerFee wraps a transaction with its gas price or effective miner gasTipCap
type txWithMinerFee struct {
tx *txpool.LazyTransaction
from common.Address
fees *big.Int
}
// newTxWithMinerFee creates a wrapped transaction, calculating the effective
// miner gasTipCap if a base fee is provided.
// Returns error in case of a negative effective miner gasTipCap.
func newTxWithMinerFee(tx *txpool.LazyTransaction, from common.Address, baseFee *big.Int) (*txWithMinerFee, error) {
tip := new(big.Int).Set(tx.GasTipCap)
if baseFee != nil {
if tx.GasFeeCap.Cmp(baseFee) < 0 {
return nil, types.ErrGasFeeCapTooLow
}
effectiveTip := new(big.Int).Sub(tx.GasFeeCap, baseFee)
if tip.Cmp(effectiveTip) > 0 {
tip = effectiveTip
}
}
return &txWithMinerFee{
tx: tx,
from: from,
fees: tip,
}, nil
}
// TxByPriceAndTime implements both the sort and the heap interface, making it useful
// for all at once sorting as well as individually adding and removing elements.
type txByPriceAndTime struct {
txs []*txWithMinerFee
payersSwap map[common.Address]*big.Int
}
func (s txByPriceAndTime) Len() int {
return len(s.txs)
}
func (s txByPriceAndTime) Less(i, j int) bool {
i_price := s.txs[i].fees
if tx := s.txs[i].tx.Resolve(); tx != nil && tx.Tx.To() != nil {
if _, ok := s.payersSwap[*tx.Tx.To()]; ok {
i_price = common.TRC21GasPrice
}
}
j_price := s.txs[j].fees
if tx := s.txs[j].tx.Resolve(); tx != nil && tx.Tx.To() != nil {
if _, ok := s.payersSwap[*tx.Tx.To()]; ok {
j_price = common.TRC21GasPrice
}
}
// If the prices are equal, use the time the transaction was first seen for
// deterministic sorting
cmp := i_price.Cmp(j_price)
if cmp == 0 {
return s.txs[i].tx.Time.Before(s.txs[j].tx.Time)
}
return cmp > 0
}
func (s txByPriceAndTime) Swap(i, j int) {
s.txs[i], s.txs[j] = s.txs[j], s.txs[i]
}
func (s *txByPriceAndTime) Push(x interface{}) {
s.txs = append(s.txs, x.(*txWithMinerFee))
}
func (s *txByPriceAndTime) Pop() interface{} {
old := s.txs
n := len(old)
x := old[n-1]
old[n-1] = nil // avoid memory leak
s.txs = old[0 : n-1]
return x
}
// transactionsByPriceAndNonce represents a set of transactions that can return
// transactions in a profit-maximizing sorted order, while supporting removing
// entire batches of transactions for non-executable accounts.
type transactionsByPriceAndNonce struct {
txs map[common.Address][]*txpool.LazyTransaction // Per account nonce-sorted list of transactions
heads txByPriceAndTime // Next transaction for each unique account (price heap)
signer types.Signer // Signer for the set of transactions
baseFee *big.Int // Current base fee
}
// newTransactionsByPriceAndNonce creates a transaction set that can retrieve
// price sorted transactions in a nonce-honouring way.
//
// Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor.
func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address][]*txpool.LazyTransaction, payersSwap map[common.Address]*big.Int, baseFee *big.Int) (*transactionsByPriceAndNonce, types.Transactions) {
// Initialize a price and received time based heap with the head transactions
heads := txByPriceAndTime{
txs: make([]*txWithMinerFee, 0, len(txs)),
payersSwap: payersSwap,
}
specialTxs := types.Transactions{}
for from, accTxs := range txs {
var normalTxs []*txpool.LazyTransaction
for _, lazyTx := range accTxs {
if tx := lazyTx.Resolve(); tx.Tx.IsSpecialTransaction() {
specialTxs = append(specialTxs, tx.Tx)
} else {
normalTxs = append(normalTxs, lazyTx)
}
}
if len(normalTxs) > 0 {
wrapped, err := newTxWithMinerFee(normalTxs[0], from, baseFee)
if err != nil {
delete(txs, from)
continue
}
heads.txs = append(heads.txs, wrapped)
// Remove the first normal transaction for this sender
txs[from] = normalTxs[1:]
} else {
// Remove the account if there are no normal transactions
delete(txs, from)
}
}
heap.Init(&heads)
// Assemble and return the transaction set
return &transactionsByPriceAndNonce{
txs: txs,
heads: heads,
signer: signer,
baseFee: baseFee,
}, specialTxs
}
// Peek returns the next transaction by price.
func (t *transactionsByPriceAndNonce) Peek() *txpool.LazyTransaction {
if len(t.heads.txs) == 0 {
return nil
}
return t.heads.txs[0].tx
}
// Shift replaces the current best head with the next one from the same account.
func (t *transactionsByPriceAndNonce) Shift() {
acc := t.heads.txs[0].from
if txs, ok := t.txs[acc]; ok && len(txs) > 0 {
if wrapped, err := newTxWithMinerFee(txs[0], acc, t.baseFee); err == nil {
t.heads.txs[0], t.txs[acc] = wrapped, txs[1:]
heap.Fix(&t.heads, 0)
return
}
}
heap.Pop(&t.heads)
}
// Pop removes the best transaction, *not* replacing it with the next one from
// the same account. This should be used when a transaction cannot be executed
// and hence all subsequent ones should be discarded from the same account.
func (t *transactionsByPriceAndNonce) Pop() {
heap.Pop(&t.heads)
}

264
miner/ordering_test.go Normal file
View file

@ -0,0 +1,264 @@
// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package miner
import (
"crypto/ecdsa"
"math/big"
"math/rand"
"testing"
"time"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/crypto"
)
func TestTransactionPriceNonceSortLegacy(t *testing.T) {
testTransactionPriceNonceSort(t, nil)
}
func TestTransactionPriceNonceSort1559(t *testing.T) {
testTransactionPriceNonceSort(t, big.NewInt(0))
testTransactionPriceNonceSort(t, big.NewInt(5))
testTransactionPriceNonceSort(t, big.NewInt(50))
}
// Tests that transactions can be correctly sorted according to their price in
// decreasing order, but at the same time with increasing nonces when issued by
// the same account.
func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 25)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := types.LatestSignerForChainID(common.Big1)
// Generate a batch of transactions with overlapping values, but shifted nonces
groups := map[common.Address][]*txpool.LazyTransaction{}
expectedCount := 0
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
count := 25
for i := 0; i < 25; i++ {
var tx *types.Transaction
gasFeeCap := rand.Intn(50)
if baseFee == nil {
tx = types.NewTx(&types.LegacyTx{
Nonce: uint64(start + i),
To: &common.Address{},
Value: big.NewInt(100),
Gas: 100,
GasPrice: big.NewInt(int64(gasFeeCap)),
Data: nil,
})
} else {
tx = types.NewTx(&types.DynamicFeeTx{
Nonce: uint64(start + i),
To: &common.Address{},
Value: big.NewInt(100),
Gas: 100,
GasFeeCap: big.NewInt(int64(gasFeeCap)),
GasTipCap: big.NewInt(int64(rand.Intn(gasFeeCap + 1))),
Data: nil,
})
if count == 25 && int64(gasFeeCap) < baseFee.Int64() {
count = i
}
}
tx, err := types.SignTx(tx, signer, key)
if err != nil {
t.Fatalf("failed to sign tx: %s", err)
}
groups[addr] = append(groups[addr], &txpool.LazyTransaction{
Hash: tx.Hash(),
Tx: &txpool.Transaction{Tx: tx},
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
})
}
expectedCount += count
}
// Sort the transactions and cross check the nonce ordering
txset, _ := newTransactionsByPriceAndNonce(signer, groups, map[common.Address]*big.Int{}, baseFee)
txs := types.Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx.Tx.Tx)
txset.Shift()
}
if len(txs) != expectedCount {
t.Errorf("expected %d transactions, found %d", expectedCount, len(txs))
}
for i, txi := range txs {
fromi, _ := types.Sender(signer, txi)
// Make sure the nonce order is valid
for j, txj := range txs[i+1:] {
fromj, _ := types.Sender(signer, txj)
if fromi == fromj && txi.Nonce() > txj.Nonce() {
t.Errorf("invalid nonce ordering: tx #%d (A=%x N=%v) < tx #%d (A=%x N=%v)", i, fromi[:4], txi.Nonce(), i+j, fromj[:4], txj.Nonce())
}
}
// If the next tx has different from account, the price must be lower than the current one
if i+1 < len(txs) {
next := txs[i+1]
fromNext, _ := types.Sender(signer, next)
tip, err := txi.EffectiveGasTip(baseFee)
nextTip, nextErr := next.EffectiveGasTip(baseFee)
if err != nil || nextErr != nil {
t.Errorf("error calculating effective tip: %v, %v", err, nextErr)
}
if fromi != fromNext && tip.Cmp(nextTip) < 0 {
t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", i, fromi[:4], txi.GasPrice(), i+1, fromNext[:4], next.GasPrice())
}
}
}
}
// Tests that if multiple transactions have the same price, the ones seen earlier
// are prioritized to avoid network spam attacks aiming for a specific ordering.
func TestTransactionTimeSort(t *testing.T) {
// Generate a batch of accounts to start with
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
}
signer := types.HomesteadSigner{}
// Generate a batch of transactions with overlapping prices, but different creation times
groups := map[common.Address][]*txpool.LazyTransaction{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 100, big.NewInt(1), nil), signer, key)
tx.SetTime(time.Unix(0, int64(len(keys)-start)))
groups[addr] = append(groups[addr], &txpool.LazyTransaction{
Hash: tx.Hash(),
Tx: &txpool.Transaction{Tx: tx},
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
})
}
// Sort the transactions and cross check the nonce ordering
txset, _ := newTransactionsByPriceAndNonce(signer, groups, map[common.Address]*big.Int{}, nil)
txs := types.Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx.Tx.Tx)
txset.Shift()
}
if len(txs) != len(keys) {
t.Errorf("expected %d transactions, found %d", len(keys), len(txs))
}
for i, txi := range txs {
fromi, _ := types.Sender(signer, txi)
if i+1 < len(txs) {
next := txs[i+1]
fromNext, _ := types.Sender(signer, next)
if txi.GasPrice().Cmp(next.GasPrice()) < 0 {
t.Errorf("invalid gasprice ordering: tx #%d (A=%x P=%v) < tx #%d (A=%x P=%v)", i, fromi[:4], txi.GasPrice(), i+1, fromNext[:4], next.GasPrice())
}
// Make sure time order is ascending if the txs have the same gas price
if txi.GasPrice().Cmp(next.GasPrice()) == 0 && txi.Time().After(next.Time()) {
t.Errorf("invalid received time ordering: tx #%d (A=%x T=%v) > tx #%d (A=%x T=%v)", i, fromi[:4], txi.Time(), i+1, fromNext[:4], next.Time())
}
}
}
}
// TestNewTransactionsByPriceAndNonce_SpecialSeparation uses table-driven tests to verify separation of special and normal transactions.
func TestNewTransactionsByPriceAndNonce_SpecialSeparation(t *testing.T) {
signer := types.HomesteadSigner{}
genNormalTx := func(nonce uint64, key *ecdsa.PrivateKey) *txpool.LazyTransaction {
tx, _ := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("0x1234567890123456789012345678901234567890"), big.NewInt(1), 21000, big.NewInt(1), nil), signer, key)
return &txpool.LazyTransaction{Tx: &txpool.Transaction{Tx: tx}, Hash: tx.Hash(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap()}
}
genSpecialTx := func(nonce uint64, key *ecdsa.PrivateKey) *txpool.LazyTransaction {
tx, _ := types.SignTx(types.NewTransaction(nonce, common.BlockSignersBinary, big.NewInt(1), 21000, big.NewInt(1), nil), signer, key)
return &txpool.LazyTransaction{Tx: &txpool.Transaction{Tx: tx}, Hash: tx.Hash(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap()}
}
testCases := []struct {
name string
normalCount int
specialCount int
expectNormal int
expectSpecial int
}{
{"no transactions", 0, 0, 0, 0},
{"only 1 normal", 1, 0, 1, 0},
{"only 2 normal", 2, 0, 2, 0},
{"only 3 normal", 3, 0, 3, 0},
{"only 1 special", 0, 1, 0, 1},
{"only 2 special", 0, 2, 0, 2},
{"only 3 special", 0, 3, 0, 3},
{"1 normal, 1 special", 1, 1, 1, 1},
{"2 normal, 2 special", 2, 2, 2, 2},
{"3 normal, 3 special", 3, 3, 3, 3},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
key, _ := crypto.GenerateKey()
addr := crypto.PubkeyToAddress(key.PublicKey)
txs := make([]*txpool.LazyTransaction, 0, tc.normalCount+tc.specialCount)
for i := 0; i < tc.normalCount; i++ {
txs = append(txs, genNormalTx(uint64(i), key))
}
for i := 0; i < tc.specialCount; i++ {
txs = append(txs, genSpecialTx(uint64(tc.normalCount+i), key))
}
group := map[common.Address][]*txpool.LazyTransaction{}
if len(txs) > 0 {
group[addr] = txs
}
txset, specialTxs := newTransactionsByPriceAndNonce(signer, group, map[common.Address]*big.Int{}, nil)
// Check special transactions
if len(specialTxs) != tc.expectSpecial {
t.Errorf("expected %d special txs, got %d", tc.expectSpecial, len(specialTxs))
}
for _, tx := range specialTxs {
if tx.To() == nil || *tx.To() != common.BlockSignersBinary {
t.Errorf("specialTxs contains non-special tx: %v", tx)
}
}
// Check normal transactions
normalCount := 0
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
resolved := tx.Resolve()
if resolved == nil || resolved.Tx.To() == nil || *resolved.Tx.To() == common.BlockSignersBinary {
t.Errorf("txset contains special or nil-to tx: %v", resolved)
}
normalCount++
txset.Shift()
}
if normalCount != tc.expectNormal {
t.Errorf("expected %d normal txs, got %d", tc.expectNormal, normalCount)
}
})
}
}

View file

@ -36,6 +36,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/contracts"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/core/vm"
"github.com/XinFinOrg/XDPoSChain/ethdb"
@ -355,13 +356,19 @@ func (w *worker) update() {
// be automatically eliminated.
if atomic.LoadInt32(&w.mining) == 0 {
w.currentMu.Lock()
txs := make(map[common.Address][]*types.Transaction, len(ev.Txs))
txs := make(map[common.Address][]*txpool.LazyTransaction, len(ev.Txs))
for _, tx := range ev.Txs {
acc, _ := types.Sender(w.current.signer, tx)
txs[acc] = append(txs[acc], tx)
txs[acc] = append(txs[acc], &txpool.LazyTransaction{
Hash: tx.Hash(),
Tx: &txpool.Transaction{Tx: tx},
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
})
}
feeCapacity := w.current.state.GetTRC21FeeCapacityFromState()
txset, specialTxs := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, feeCapacity)
txset, specialTxs := newTransactionsByPriceAndNonce(w.current.signer, txs, feeCapacity, w.current.header.BaseFee)
tcount := w.current.tcount
w.current.commitTransactions(w.mux, feeCapacity, txset, specialTxs, w.chain, w.coinbase, &w.pendingLogsFeed)
@ -778,7 +785,7 @@ func (w *worker) commitNewWork() {
}
// won't grasp txs at checkpoint
var (
txs *types.TransactionsByPriceAndNonce
txs *transactionsByPriceAndNonce
specialTxs types.Transactions
tradingTxMatches []tradingstate.TxDataMatch
lendingMatchingResults map[common.Hash]lendingstate.MatchingResult
@ -794,7 +801,7 @@ func (w *worker) commitNewWork() {
}
if !isEpochSwitchBlock {
pending := w.eth.TxPool().Pending(true)
txs, specialTxs = types.NewTransactionsByPriceAndNonce(w.current.signer, pending, feeCapacity)
txs, specialTxs = newTransactionsByPriceAndNonce(w.current.signer, pending, feeCapacity, header.BaseFee)
}
}
if atomic.LoadInt32(&w.mining) == 1 {
@ -959,7 +966,7 @@ func (w *worker) commitNewWork() {
w.updateSnapshot()
}
func (w *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Address]*big.Int, txs *types.TransactionsByPriceAndNonce, specialTxs types.Transactions, bc *core.BlockChain, coinbase common.Address, pendingLogsFeed *event.Feed) {
func (w *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Address]*big.Int, txs *transactionsByPriceAndNonce, specialTxs types.Transactions, bc *core.BlockChain, coinbase common.Address, pendingLogsFeed *event.Feed) {
gp := new(core.GasPool).AddGas(w.header.GasLimit)
balanceUpdated := map[common.Address]*big.Int{}
totalFeeUsed := big.NewInt(0)
@ -1070,12 +1077,15 @@ func (w *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Addr
break
}
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
if tx == nil {
lazyTx := txs.Peek()
if lazyTx == nil {
break
}
warped := lazyTx.Resolve()
if warped == nil || warped.Tx == nil {
break
}
tx := warped.Tx
to := tx.To()
if w.header.Number.Uint64() >= common.BlackListHFNumber {
from := tx.From()