refactor(core/txpool): migrate tx subscription to SubscribeTransactions #28243 (#2125)

* refactor(txpool): remove wrapper type #27841

Partial backport of ethereum/go-ethereum PR #27841, limited to txpool wrapper removal.

- Migrate txpool interfaces/call sites from `*txpool.Transaction` to `*types.Transaction`
- Update eth/miner/contracts paths and related tests accordingly
- No intended behavior change

Blob sidecar validation/handling changes from upstream are not included here.

* refactor(core/txpool): migrate tx subscription to SubscribeTransactions #28243

Replace the old SubscribeNewTxsEvent-style plumbing with the new
SubscribeTransactions(ch, reorgs) interface across txpool, eth protocol
manager, API backend, miner worker, and test helpers.

Key changes:
- Extend txpool/subpool tx subscription interface with a reorgs flag
- Route eth tx announcement path to reorgs=false (new tx announcements only)
- Route API/miner subscriptions to reorgs=true
- Move subscription-scope cleanup to TxPool.Close()
- Add Gas field to LazyTransaction in legacy pending view

Note:
LegacyPool currently cannot strictly separate newly seen and resurrected txs,
so the reorgs flag is accepted for API compatibility and future blob-subpool
integration.
This commit is contained in:
Daniel Liu 2026-03-10 21:14:38 +08:00 committed by GitHub
parent 5bb1f035a2
commit d4a6f43ef2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 103 additions and 105 deletions

View file

@ -93,7 +93,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool,
return err
}
// Add tx signed to local tx pool.
err = pool.Add([]*txpool.Transaction{{Tx: txSigned}}, true, true)[0]
err = pool.Add([]*types.Transaction{txSigned}, true, true)[0]
if err != nil {
log.Error("Fail to add tx sign to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce)
return err
@ -121,7 +121,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool,
return err
}
// Add tx signed to local tx pool.
err = pool.Add([]*txpool.Transaction{{Tx: txSigned}}, true, true)[0]
err = pool.Add([]*types.Transaction{txSigned}, true, true)[0]
if err != nil {
log.Error("Fail to add tx secret to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce)
return err
@ -150,7 +150,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool,
return err
}
// Add tx to pool.
err = pool.Add([]*txpool.Transaction{{Tx: txSigned}}, true, true)[0]
err = pool.Add([]*types.Transaction{txSigned}, true, true)[0]
if err != nil {
log.Error("Fail to add tx opening to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce)
return err

View file

@ -251,7 +251,6 @@ type LegacyPool struct {
chain BlockChain
gasTip atomic.Pointer[big.Int]
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
@ -447,9 +446,6 @@ func (pool *LegacyPool) loop() {
// Close terminates the transaction pool.
func (pool *LegacyPool) Close() error {
// Unsubscribe all subscriptions registered from txpool
pool.scope.Close()
// Terminate the pool reorger and return
close(pool.reorgShutdownCh)
pool.wg.Wait()
@ -468,10 +464,14 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
<-wait
}
// SubscribeTransactions registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
// The legacy pool has a very messed up internal shuffling, so it's kind of
// hard to separate newly discovered transactions from resurrected ones. This
// is because the new txs are added to the queue, resurrected ones too and
// reorgs run lazily, so separating the two would need a marker.
return pool.txFeed.Subscribe(ch)
}
// SetGasTip updates the minimum gas tip required by the transaction pool for a
@ -607,10 +607,11 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.L
lazies[i] = &txpool.LazyTransaction{
Pool: pool,
Hash: txs[i].Hash(),
Tx: &txpool.Transaction{Tx: txs[i]},
Tx: txs[i],
Time: txs[i].Time(),
GasFeeCap: txs[i].GasFeeCap(),
GasTipCap: txs[i].GasTipCap(),
Gas: txs[i].Gas(),
}
}
pending[addr] = lazies
@ -1099,26 +1100,13 @@ func (pool *LegacyPool) promoteSpecialTx(addr common.Address, tx *types.Transact
return true, nil
}
// Add enqueues a batch of transactions into the pool if they are valid. Depending
// on the local flag, full pricing constraints will or will not be applied.
//
// If sync is set, the method will block until all internal maintenance related
// to the add is finished. Only use this during tests for determinism!
func (pool *LegacyPool) Add(txs []*txpool.Transaction, local bool, sync bool) []error {
unwrapped := make([]*types.Transaction, len(txs))
for i, tx := range txs {
unwrapped[i] = tx.Tx
}
return pool.addTxs(unwrapped, local, sync)
}
// AddLocals enqueues a batch of transactions into the pool if they are valid, marking the
// senders as local ones, ensuring they go around the local pricing constraints.
//
// This method is used to add transactions from the RPC API and performs synchronous pool
// reorganization and event propagation.
func (pool *LegacyPool) addLocals(txs []*types.Transaction) []error {
return pool.addTxs(txs, !pool.config.NoLocals, true)
return pool.Add(txs, !pool.config.NoLocals, true)
}
// AddLocal enqueues a single local transaction into the pool if it is valid. This is
@ -1134,7 +1122,7 @@ func (pool *LegacyPool) addLocal(tx *types.Transaction) error {
// This method is used to add transactions from the p2p network and does not wait for pool
// reorganization and internal event propagation.
func (pool *LegacyPool) AddRemotes(txs []*types.Transaction) []error {
return pool.addTxs(txs, false, false)
return pool.Add(txs, false, false)
}
// addRemote enqueues a single transaction into the pool if it is valid. This is a convenience
@ -1146,16 +1134,20 @@ func (pool *LegacyPool) addRemote(tx *types.Transaction) error {
// AddRemotesSync is like AddRemotes, but waits for pool reorganization. Tests use this method.
func (pool *LegacyPool) addRemotesSync(txs []*types.Transaction) []error {
return pool.addTxs(txs, false, true)
return pool.Add(txs, false, true)
}
// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
return pool.addTxs([]*types.Transaction{tx}, false, true)[0]
return pool.Add([]*types.Transaction{tx}, false, true)[0]
}
// addTxs attempts to queue a batch of transactions if they are valid.
func (pool *LegacyPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
// Add enqueues a batch of transactions into the pool if they are valid. Depending
// on the local flag, full pricing constraints will or will not be applied.
//
// If sync is set, the method will block until all internal maintenance related
// to the add is finished. Only use this during tests for determinism!
func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error {
// Filter out known ones without obtaining the pool lock or recovering signatures
var (
errs = make([]error, len(txs))
@ -1241,12 +1233,12 @@ func (pool *LegacyPool) Status(hash common.Hash) txpool.TxStatus {
}
// Get returns a transaction if it is contained in the pool and nil otherwise.
func (pool *LegacyPool) Get(hash common.Hash) *txpool.Transaction {
func (pool *LegacyPool) Get(hash common.Hash) *types.Transaction {
tx := pool.get(hash)
if tx == nil {
return nil
}
return &txpool.Transaction{Tx: tx}
return tx
}
// get returns a transaction if it is contained in the pool and nil otherwise.

View file

@ -26,34 +26,38 @@ import (
"github.com/XinFinOrg/XDPoSChain/event"
)
// Transaction is a helper struct to group together a canonical transaction with
// satellite data items that are needed by the pool but are not part of the chain.
type Transaction struct {
Tx *types.Transaction // Canonical transaction
}
// LazyTransaction contains a small subset of the transaction properties that is
// enough for the miner and other APIs to handle large batches of transactions;
// and supports pulling up the entire transaction when really needed.
type LazyTransaction struct {
Pool SubPool // Transaction subpool to pull the real transaction up
Hash common.Hash // Transaction hash to pull up if needed
Tx *Transaction // Transaction if already resolved
Pool LazyResolver // Transaction resolver to pull the real transaction up
Hash common.Hash // Transaction hash to pull up if needed
Tx *types.Transaction // Transaction if already resolved
Time time.Time // Time when the transaction was first seen
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay
Gas uint64 // Amount of gas required by the transaction
}
// Resolve retrieves the full transaction belonging to a lazy handle if it is still
// maintained by the transaction pool.
func (ltx *LazyTransaction) Resolve() *Transaction {
func (ltx *LazyTransaction) Resolve() *types.Transaction {
if ltx.Tx == nil {
ltx.Tx = ltx.Pool.Get(ltx.Hash)
}
return ltx.Tx
}
// LazyResolver is a minimal interface needed for a transaction pool to satisfy
// resolving lazy transactions. It's mostly a helper to avoid the entire sub-
// pool being injected into the lazy transaction.
type LazyResolver interface {
// Get returns a transaction if it is contained in the pool, or nil otherwise.
Get(hash common.Hash) *types.Transaction
}
// SubPool represents a specialized transaction pool that lives on its own (e.g.
// blob pool). Since independent of how many specialized pools we have, they do
// need to be updated in lockstep and assemble into one coherent view for block
@ -90,19 +94,21 @@ type SubPool interface {
Has(hash common.Hash) bool
// Get returns a transaction if it is contained in the pool, or nil otherwise.
Get(hash common.Hash) *Transaction
Get(hash common.Hash) *types.Transaction
// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
Add(txs []*Transaction, local bool, sync bool) []error
Add(txs []*types.Transaction, local bool, sync bool) []error
// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
Pending(enforceTips bool) map[common.Address][]*LazyTransaction
// SubscribeTransactions subscribes to new transaction events.
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.

View file

@ -100,6 +100,9 @@ func (p *TxPool) Close() error {
errs = append(errs, err)
}
}
// Unsubscribe anyone still listening for tx events
p.subs.Close()
if len(errs) > 0 {
return fmt.Errorf("subpool close errors: %v", errs)
}
@ -190,7 +193,7 @@ func (p *TxPool) Has(hash common.Hash) bool {
}
// Get returns a transaction if it is contained in the pool, or nil otherwise.
func (p *TxPool) Get(hash common.Hash) *Transaction {
func (p *TxPool) Get(hash common.Hash) *types.Transaction {
for _, subpool := range p.subpools {
if tx := subpool.Get(hash); tx != nil {
return tx
@ -202,14 +205,14 @@ func (p *TxPool) Get(hash common.Hash) *Transaction {
// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
func (p *TxPool) Add(txs []*Transaction, local bool, sync bool) []error {
func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
// Split the input transactions between the subpools. It shouldn't really
// happen that we receive merged batches, but better graceful than strange
// errors.
//
// We also need to track how the transactions were split across the subpools,
// so we can piece back the returned errors into the original order.
txsets := make([][]*Transaction, len(p.subpools))
txsets := make([][]*types.Transaction, len(p.subpools))
splits := make([]int, len(txs))
for i, tx := range txs {
@ -218,7 +221,7 @@ func (p *TxPool) Add(txs []*Transaction, local bool, sync bool) []error {
// Try to find a subpool that accepts the transaction
for j, subpool := range p.subpools {
if subpool.Filter(tx.Tx) {
if subpool.Filter(tx) {
txsets[j] = append(txsets[j], tx)
splits[i] = j
break
@ -255,12 +258,12 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction
return txs
}
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and starts sending
// events to the given channel.
func (p *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
subs := make([]event.Subscription, len(p.subpools))
for i, subpool := range p.subpools {
subs[i] = subpool.SubscribeTransactions(ch)
subs[i] = subpool.SubscribeTransactions(ch, reorgs)
}
return p.subs.Track(event.JoinSubscriptions(subs...))
}

View file

@ -301,7 +301,7 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
}
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.Add([]*txpool.Transaction{{Tx: signedTx}}, true, false)[0]
return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0]
}
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
@ -310,7 +310,7 @@ func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
for _, batch := range pending {
for _, lazy := range batch {
if tx := lazy.Resolve(); tx != nil {
txs = append(txs, tx.Tx)
txs = append(txs, tx)
}
}
}
@ -319,7 +319,7 @@ func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction {
if tx := b.eth.txPool.Get(hash); tx != nil {
return tx.Tx
return tx
}
return nil
}
@ -350,7 +350,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool {
}
func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.eth.txPool.SubscribeNewTxsEvent(ch)
return b.eth.txPool.SubscribeTransactions(ch, true)
}
func (b *EthAPIBackend) Downloader() *downloader.Downloader {

View file

@ -31,7 +31,6 @@ import (
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/consensus/misc"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/eth/bft"
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
@ -282,9 +281,9 @@ func (pm *ProtocolManager) removePeer(id string) {
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// broadcast transactions
// broadcast and announce transactions (only new ones, not resurrected ones)
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
pm.txsSub = pm.txpool.SubscribeTransactions(pm.txsCh, false)
pm.orderTxCh = make(chan core.OrderTxPreEvent, txChanSize)
if pm.orderpool != nil {
pm.orderTxSub = pm.orderpool.SubscribeTxPreEvent(pm.orderTxCh)
@ -780,11 +779,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.knownTxs.Add(tx.Hash(), struct{}{})
}
}
warped := make([]*txpool.Transaction, len(txs))
for i := range txs {
warped[i] = &txpool.Transaction{Tx: txs[i]}
}
pm.txpool.Add(warped, false, false)
pm.txpool.Add(txs, false, false)
case msg.Code == OrderTxMsg:
// Transactions arrived, make sure we have a valid and fresh chain to handle them

View file

@ -119,34 +119,30 @@ func (p *testTxPool) Has(hash common.Hash) bool {
// Get retrieves the transaction from local txpool with given
// tx hash.
func (p *testTxPool) Get(hash common.Hash) *txpool.Transaction {
func (p *testTxPool) Get(hash common.Hash) *types.Transaction {
p.lock.Lock()
defer p.lock.Unlock()
if tx := p.pool[hash]; tx != nil {
return &txpool.Transaction{Tx: tx}
return tx
}
return nil
}
// Add appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil
func (p *testTxPool) Add(txs []*txpool.Transaction, local bool, sync bool) []error {
unwrapped := make([]*types.Transaction, len(txs))
for i, tx := range txs {
unwrapped[i] = tx.Tx
}
func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
p.lock.Lock()
defer p.lock.Unlock()
for _, tx := range unwrapped {
for _, tx := range txs {
p.pool[tx.Hash()] = tx
}
if p.added != nil {
p.added <- unwrapped
p.added <- txs
}
return make([]error, len(unwrapped))
return make([]error, len(txs))
}
// Pending returns all the transactions known to the pool
@ -167,7 +163,7 @@ func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*txpool.Lazy
for _, tx := range batch {
pending[addr] = append(pending[addr], &txpool.LazyTransaction{
Hash: tx.Hash(),
Tx: &txpool.Transaction{Tx: tx},
Tx: tx,
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
@ -177,10 +173,16 @@ func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*txpool.Lazy
return pending
}
// SubscribeTransactions should return an event subscription of NewTxsEvent and
// send events to the given channel.
func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
return p.txFeed.Subscribe(ch)
}
// SubscribeNewTxsEvent should return an event subscription of NewTxsEvent and
// send events to the given channel.
func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return p.txFeed.Subscribe(ch)
return p.SubscribeTransactions(ch, false)
}
// newTestTransaction create a new dummy transaction.

View file

@ -105,15 +105,16 @@ var errorToString = map[int]string{
type txPool interface {
// Add should add the given transactions to the pool.
Add(txs []*txpool.Transaction, local bool, sync bool) []error
Add(txs []*types.Transaction, local bool, sync bool) []error
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction
// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
}
type orderPool interface {

View file

@ -23,7 +23,6 @@ import (
"time"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
@ -132,10 +131,10 @@ func testSendTransactions(t *testing.T, protocol int) {
// Fill the pool with big transactions.
const txsize = txsyncPackSize / 10
alltxs := make([]*txpool.Transaction, 100)
alltxs := make([]*types.Transaction, 100)
for nonce := range alltxs {
tx := newTestTransaction(testAccount, uint64(nonce), txsize)
alltxs[nonce] = &txpool.Transaction{Tx: tx}
alltxs[nonce] = tx
}
pm.txpool.Add(alltxs, false, false)
@ -145,7 +144,7 @@ func testSendTransactions(t *testing.T, protocol int) {
defer p.close()
seen := make(map[common.Hash]bool)
for _, tx := range alltxs {
seen[tx.Tx.Hash()] = false
seen[tx.Hash()] = false
}
for n := 0; n < len(alltxs) && !t.Failed(); {
var txs []*types.Transaction

View file

@ -49,7 +49,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) {
for _, batch := range pending {
for _, lazy := range batch {
if tx := lazy.Resolve(); tx != nil {
txs = append(txs, tx.Tx)
txs = append(txs, tx)
}
}
}

View file

@ -66,15 +66,15 @@ func (s txByPriceAndTime) Len() int {
func (s txByPriceAndTime) Less(i, j int) bool {
i_price := s.txs[i].fees
if tx := s.txs[i].tx.Resolve(); tx != nil && tx.Tx.To() != nil {
if _, ok := s.payersSwap[*tx.Tx.To()]; ok {
if tx := s.txs[i].tx.Resolve(); tx != nil && tx.To() != nil {
if _, ok := s.payersSwap[*tx.To()]; ok {
i_price = common.TRC21GasPrice
}
}
j_price := s.txs[j].fees
if tx := s.txs[j].tx.Resolve(); tx != nil && tx.Tx.To() != nil {
if _, ok := s.payersSwap[*tx.Tx.To()]; ok {
if tx := s.txs[j].tx.Resolve(); tx != nil && tx.To() != nil {
if _, ok := s.payersSwap[*tx.To()]; ok {
j_price = common.TRC21GasPrice
}
}
@ -130,8 +130,8 @@ func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address]
for from, accTxs := range txs {
var normalTxs []*txpool.LazyTransaction
for _, lazyTx := range accTxs {
if tx := lazyTx.Resolve(); tx.Tx.IsSpecialTransaction() {
specialTxs = append(specialTxs, tx.Tx)
if tx := lazyTx.Resolve(); tx.IsSpecialTransaction() {
specialTxs = append(specialTxs, tx)
} else {
normalTxs = append(normalTxs, lazyTx)
}

View file

@ -88,7 +88,7 @@ func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) {
}
groups[addr] = append(groups[addr], &txpool.LazyTransaction{
Hash: tx.Hash(),
Tx: &txpool.Transaction{Tx: tx},
Tx: tx,
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
@ -101,7 +101,7 @@ func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) {
txs := types.Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx.Tx.Tx)
txs = append(txs, tx.Tx)
txset.Shift()
}
if len(txs) != expectedCount {
@ -153,7 +153,7 @@ func TestTransactionTimeSort(t *testing.T) {
groups[addr] = append(groups[addr], &txpool.LazyTransaction{
Hash: tx.Hash(),
Tx: &txpool.Transaction{Tx: tx},
Tx: tx,
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
@ -164,7 +164,7 @@ func TestTransactionTimeSort(t *testing.T) {
txs := types.Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx.Tx.Tx)
txs = append(txs, tx.Tx)
txset.Shift()
}
if len(txs) != len(keys) {
@ -193,11 +193,11 @@ func TestNewTransactionsByPriceAndNonce_SpecialSeparation(t *testing.T) {
genNormalTx := func(nonce uint64, key *ecdsa.PrivateKey) *txpool.LazyTransaction {
tx, _ := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("0x1234567890123456789012345678901234567890"), big.NewInt(1), 21000, big.NewInt(1), nil), signer, key)
return &txpool.LazyTransaction{Tx: &txpool.Transaction{Tx: tx}, Hash: tx.Hash(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap()}
return &txpool.LazyTransaction{Tx: tx, Hash: tx.Hash(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap()}
}
genSpecialTx := func(nonce uint64, key *ecdsa.PrivateKey) *txpool.LazyTransaction {
tx, _ := types.SignTx(types.NewTransaction(nonce, common.BlockSignersBinary, big.NewInt(1), 21000, big.NewInt(1), nil), signer, key)
return &txpool.LazyTransaction{Tx: &txpool.Transaction{Tx: tx}, Hash: tx.Hash(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap()}
return &txpool.LazyTransaction{Tx: tx, Hash: tx.Hash(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap()}
}
testCases := []struct {
@ -250,7 +250,7 @@ func TestNewTransactionsByPriceAndNonce_SpecialSeparation(t *testing.T) {
normalCount := 0
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
resolved := tx.Resolve()
if resolved == nil || resolved.Tx.To() == nil || *resolved.Tx.To() == common.BlockSignersBinary {
if resolved == nil || resolved.To() == nil || *resolved.To() == common.BlockSignersBinary {
t.Errorf("txset contains special or nil-to tx: %v", resolved)
}
normalCount++

View file

@ -186,7 +186,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
}
if worker.announceTxs {
// Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true)
}
// Subscribe events for blockchain
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
@ -375,7 +375,7 @@ func (w *worker) update() {
acc, _ := types.Sender(w.current.signer, tx)
txs[acc] = append(txs[acc], &txpool.LazyTransaction{
Hash: tx.Hash(),
Tx: &txpool.Transaction{Tx: tx},
Tx: tx,
Time: tx.Time(),
GasFeeCap: tx.GasFeeCap(),
GasTipCap: tx.GasTipCap(),
@ -1111,10 +1111,10 @@ func (w *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Addr
break
}
warped := lazyTx.Resolve()
if warped == nil || warped.Tx == nil {
if warped == nil {
break
}
tx := warped.Tx
tx := warped
to := tx.To()
if w.header.Number.Uint64() >= common.DenylistHFNumber {
from := tx.From()