From d4a6f43ef2b6d2e29c80ee3bfbf73c37eec3267b Mon Sep 17 00:00:00 2001 From: Daniel Liu <139250065@qq.com> Date: Tue, 10 Mar 2026 21:14:38 +0800 Subject: [PATCH] refactor(core/txpool): migrate tx subscription to SubscribeTransactions #28243 (#2125) * refactor(txpool): remove wrapper type #27841 Partial backport of ethereum/go-ethereum PR #27841, limited to txpool wrapper removal. - Migrate txpool interfaces/call sites from `*txpool.Transaction` to `*types.Transaction` - Update eth/miner/contracts paths and related tests accordingly - No intended behavior change Blob sidecar validation/handling changes from upstream are not included here. * refactor(core/txpool): migrate tx subscription to SubscribeTransactions #28243 Replace the old SubscribeNewTxsEvent-style plumbing with the new SubscribeTransactions(ch, reorgs) interface across txpool, eth protocol manager, API backend, miner worker, and test helpers. Key changes: - Extend txpool/subpool tx subscription interface with a reorgs flag - Route eth tx announcement path to reorgs=false (new tx announcements only) - Route API/miner subscriptions to reorgs=true - Move subscription-scope cleanup to TxPool.Close() - Add Gas field to LazyTransaction in legacy pending view Note: LegacyPool currently cannot strictly separate newly seen and resurrected txs, so the reorgs flag is accepted for API compatibility and future blob-subpool integration. --- contracts/utils.go | 6 ++-- core/txpool/legacypool/legacypool.go | 52 ++++++++++++---------------- core/txpool/subpool.go | 34 ++++++++++-------- core/txpool/txpool.go | 19 +++++----- eth/api_backend.go | 8 ++--- eth/handler.go | 11 ++---- eth/helper_test.go | 26 +++++++------- eth/protocol.go | 9 ++--- eth/protocol_test.go | 7 ++-- eth/sync.go | 2 +- miner/ordering.go | 12 +++---- miner/ordering_test.go | 14 ++++---- miner/worker.go | 8 ++--- 13 files changed, 103 insertions(+), 105 deletions(-) diff --git a/contracts/utils.go b/contracts/utils.go index e73266ec2e..d39a715aec 100644 --- a/contracts/utils.go +++ b/contracts/utils.go @@ -93,7 +93,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool, return err } // Add tx signed to local tx pool. - err = pool.Add([]*txpool.Transaction{{Tx: txSigned}}, true, true)[0] + err = pool.Add([]*types.Transaction{txSigned}, true, true)[0] if err != nil { log.Error("Fail to add tx sign to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce) return err @@ -121,7 +121,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool, return err } // Add tx signed to local tx pool. - err = pool.Add([]*txpool.Transaction{{Tx: txSigned}}, true, true)[0] + err = pool.Add([]*types.Transaction{txSigned}, true, true)[0] if err != nil { log.Error("Fail to add tx secret to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce) return err @@ -150,7 +150,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool, return err } // Add tx to pool. - err = pool.Add([]*txpool.Transaction{{Tx: txSigned}}, true, true)[0] + err = pool.Add([]*types.Transaction{txSigned}, true, true)[0] if err != nil { log.Error("Fail to add tx opening to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce) return err diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index afe7dc509f..c4e5527648 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -251,7 +251,6 @@ type LegacyPool struct { chain BlockChain gasTip atomic.Pointer[big.Int] txFeed event.Feed - scope event.SubscriptionScope signer types.Signer mu sync.RWMutex @@ -447,9 +446,6 @@ func (pool *LegacyPool) loop() { // Close terminates the transaction pool. func (pool *LegacyPool) Close() error { - // Unsubscribe all subscriptions registered from txpool - pool.scope.Close() - // Terminate the pool reorger and return close(pool.reorgShutdownCh) pool.wg.Wait() @@ -468,10 +464,14 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) { <-wait } -// SubscribeTransactions registers a subscription of NewTxsEvent and -// starts sending event to the given channel. -func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription { - return pool.scope.Track(pool.txFeed.Subscribe(ch)) +// SubscribeTransactions registers a subscription for new transaction events, +// supporting feeding only newly seen or also resurrected transactions. +func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { + // The legacy pool has a very messed up internal shuffling, so it's kind of + // hard to separate newly discovered transactions from resurrected ones. This + // is because the new txs are added to the queue, resurrected ones too and + // reorgs run lazily, so separating the two would need a marker. + return pool.txFeed.Subscribe(ch) } // SetGasTip updates the minimum gas tip required by the transaction pool for a @@ -607,10 +607,11 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.L lazies[i] = &txpool.LazyTransaction{ Pool: pool, Hash: txs[i].Hash(), - Tx: &txpool.Transaction{Tx: txs[i]}, + Tx: txs[i], Time: txs[i].Time(), GasFeeCap: txs[i].GasFeeCap(), GasTipCap: txs[i].GasTipCap(), + Gas: txs[i].Gas(), } } pending[addr] = lazies @@ -1099,26 +1100,13 @@ func (pool *LegacyPool) promoteSpecialTx(addr common.Address, tx *types.Transact return true, nil } -// Add enqueues a batch of transactions into the pool if they are valid. Depending -// on the local flag, full pricing constraints will or will not be applied. -// -// If sync is set, the method will block until all internal maintenance related -// to the add is finished. Only use this during tests for determinism! -func (pool *LegacyPool) Add(txs []*txpool.Transaction, local bool, sync bool) []error { - unwrapped := make([]*types.Transaction, len(txs)) - for i, tx := range txs { - unwrapped[i] = tx.Tx - } - return pool.addTxs(unwrapped, local, sync) -} - // AddLocals enqueues a batch of transactions into the pool if they are valid, marking the // senders as local ones, ensuring they go around the local pricing constraints. // // This method is used to add transactions from the RPC API and performs synchronous pool // reorganization and event propagation. func (pool *LegacyPool) addLocals(txs []*types.Transaction) []error { - return pool.addTxs(txs, !pool.config.NoLocals, true) + return pool.Add(txs, !pool.config.NoLocals, true) } // AddLocal enqueues a single local transaction into the pool if it is valid. This is @@ -1134,7 +1122,7 @@ func (pool *LegacyPool) addLocal(tx *types.Transaction) error { // This method is used to add transactions from the p2p network and does not wait for pool // reorganization and internal event propagation. func (pool *LegacyPool) AddRemotes(txs []*types.Transaction) []error { - return pool.addTxs(txs, false, false) + return pool.Add(txs, false, false) } // addRemote enqueues a single transaction into the pool if it is valid. This is a convenience @@ -1146,16 +1134,20 @@ func (pool *LegacyPool) addRemote(tx *types.Transaction) error { // AddRemotesSync is like AddRemotes, but waits for pool reorganization. Tests use this method. func (pool *LegacyPool) addRemotesSync(txs []*types.Transaction) []error { - return pool.addTxs(txs, false, true) + return pool.Add(txs, false, true) } // This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method. func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error { - return pool.addTxs([]*types.Transaction{tx}, false, true)[0] + return pool.Add([]*types.Transaction{tx}, false, true)[0] } -// addTxs attempts to queue a batch of transactions if they are valid. -func (pool *LegacyPool) addTxs(txs []*types.Transaction, local, sync bool) []error { +// Add enqueues a batch of transactions into the pool if they are valid. Depending +// on the local flag, full pricing constraints will or will not be applied. +// +// If sync is set, the method will block until all internal maintenance related +// to the add is finished. Only use this during tests for determinism! +func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error { // Filter out known ones without obtaining the pool lock or recovering signatures var ( errs = make([]error, len(txs)) @@ -1241,12 +1233,12 @@ func (pool *LegacyPool) Status(hash common.Hash) txpool.TxStatus { } // Get returns a transaction if it is contained in the pool and nil otherwise. -func (pool *LegacyPool) Get(hash common.Hash) *txpool.Transaction { +func (pool *LegacyPool) Get(hash common.Hash) *types.Transaction { tx := pool.get(hash) if tx == nil { return nil } - return &txpool.Transaction{Tx: tx} + return tx } // get returns a transaction if it is contained in the pool and nil otherwise. diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 9cf6ee05d5..e27e1b4754 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -26,34 +26,38 @@ import ( "github.com/XinFinOrg/XDPoSChain/event" ) -// Transaction is a helper struct to group together a canonical transaction with -// satellite data items that are needed by the pool but are not part of the chain. -type Transaction struct { - Tx *types.Transaction // Canonical transaction -} - // LazyTransaction contains a small subset of the transaction properties that is // enough for the miner and other APIs to handle large batches of transactions; // and supports pulling up the entire transaction when really needed. type LazyTransaction struct { - Pool SubPool // Transaction subpool to pull the real transaction up - Hash common.Hash // Transaction hash to pull up if needed - Tx *Transaction // Transaction if already resolved + Pool LazyResolver // Transaction resolver to pull the real transaction up + Hash common.Hash // Transaction hash to pull up if needed + Tx *types.Transaction // Transaction if already resolved Time time.Time // Time when the transaction was first seen GasFeeCap *big.Int // Maximum fee per gas the transaction may consume GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay + + Gas uint64 // Amount of gas required by the transaction } // Resolve retrieves the full transaction belonging to a lazy handle if it is still // maintained by the transaction pool. -func (ltx *LazyTransaction) Resolve() *Transaction { +func (ltx *LazyTransaction) Resolve() *types.Transaction { if ltx.Tx == nil { ltx.Tx = ltx.Pool.Get(ltx.Hash) } return ltx.Tx } +// LazyResolver is a minimal interface needed for a transaction pool to satisfy +// resolving lazy transactions. It's mostly a helper to avoid the entire sub- +// pool being injected into the lazy transaction. +type LazyResolver interface { + // Get returns a transaction if it is contained in the pool, or nil otherwise. + Get(hash common.Hash) *types.Transaction +} + // SubPool represents a specialized transaction pool that lives on its own (e.g. // blob pool). Since independent of how many specialized pools we have, they do // need to be updated in lockstep and assemble into one coherent view for block @@ -90,19 +94,21 @@ type SubPool interface { Has(hash common.Hash) bool // Get returns a transaction if it is contained in the pool, or nil otherwise. - Get(hash common.Hash) *Transaction + Get(hash common.Hash) *types.Transaction // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. - Add(txs []*Transaction, local bool, sync bool) []error + Add(txs []*types.Transaction, local bool, sync bool) []error // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. Pending(enforceTips bool) map[common.Address][]*LazyTransaction - // SubscribeTransactions subscribes to new transaction events. - SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription + // SubscribeTransactions subscribes to new transaction events. The subscriber + // can decide whether to receive notifications only for newly seen transactions + // or also for reorged out ones. + SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription // Nonce returns the next nonce of an account, with all transactions executable // by the pool already applied on top. diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 83dfc22f8b..9cdb6a39fa 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -100,6 +100,9 @@ func (p *TxPool) Close() error { errs = append(errs, err) } } + // Unsubscribe anyone still listening for tx events + p.subs.Close() + if len(errs) > 0 { return fmt.Errorf("subpool close errors: %v", errs) } @@ -190,7 +193,7 @@ func (p *TxPool) Has(hash common.Hash) bool { } // Get returns a transaction if it is contained in the pool, or nil otherwise. -func (p *TxPool) Get(hash common.Hash) *Transaction { +func (p *TxPool) Get(hash common.Hash) *types.Transaction { for _, subpool := range p.subpools { if tx := subpool.Get(hash); tx != nil { return tx @@ -202,14 +205,14 @@ func (p *TxPool) Get(hash common.Hash) *Transaction { // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. -func (p *TxPool) Add(txs []*Transaction, local bool, sync bool) []error { +func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { // Split the input transactions between the subpools. It shouldn't really // happen that we receive merged batches, but better graceful than strange // errors. // // We also need to track how the transactions were split across the subpools, // so we can piece back the returned errors into the original order. - txsets := make([][]*Transaction, len(p.subpools)) + txsets := make([][]*types.Transaction, len(p.subpools)) splits := make([]int, len(txs)) for i, tx := range txs { @@ -218,7 +221,7 @@ func (p *TxPool) Add(txs []*Transaction, local bool, sync bool) []error { // Try to find a subpool that accepts the transaction for j, subpool := range p.subpools { - if subpool.Filter(tx.Tx) { + if subpool.Filter(tx) { txsets[j] = append(txsets[j], tx) splits[i] = j break @@ -255,12 +258,12 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction return txs } -// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and starts sending -// events to the given channel. -func (p *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { +// SubscribeTransactions registers a subscription for new transaction events, +// supporting feeding only newly seen or also resurrected transactions. +func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { subs := make([]event.Subscription, len(p.subpools)) for i, subpool := range p.subpools { - subs[i] = subpool.SubscribeTransactions(ch) + subs[i] = subpool.SubscribeTransactions(ch, reorgs) } return p.subs.Track(event.JoinSubscriptions(subs...)) } diff --git a/eth/api_backend.go b/eth/api_backend.go index 695091f9db..d3bfefc8b0 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -301,7 +301,7 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri } func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { - return b.eth.txPool.Add([]*txpool.Transaction{{Tx: signedTx}}, true, false)[0] + return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0] } func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { @@ -310,7 +310,7 @@ func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { for _, batch := range pending { for _, lazy := range batch { if tx := lazy.Resolve(); tx != nil { - txs = append(txs, tx.Tx) + txs = append(txs, tx) } } } @@ -319,7 +319,7 @@ func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction { if tx := b.eth.txPool.Get(hash); tx != nil { - return tx.Tx + return tx } return nil } @@ -350,7 +350,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool { } func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { - return b.eth.txPool.SubscribeNewTxsEvent(ch) + return b.eth.txPool.SubscribeTransactions(ch, true) } func (b *EthAPIBackend) Downloader() *downloader.Downloader { diff --git a/eth/handler.go b/eth/handler.go index 2372491256..39ce446d06 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -31,7 +31,6 @@ import ( "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS" "github.com/XinFinOrg/XDPoSChain/consensus/misc" "github.com/XinFinOrg/XDPoSChain/core" - "github.com/XinFinOrg/XDPoSChain/core/txpool" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/eth/bft" "github.com/XinFinOrg/XDPoSChain/eth/downloader" @@ -282,9 +281,9 @@ func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers - // broadcast transactions + // broadcast and announce transactions (only new ones, not resurrected ones) pm.txsCh = make(chan core.NewTxsEvent, txChanSize) - pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) + pm.txsSub = pm.txpool.SubscribeTransactions(pm.txsCh, false) pm.orderTxCh = make(chan core.OrderTxPreEvent, txChanSize) if pm.orderpool != nil { pm.orderTxSub = pm.orderpool.SubscribeTxPreEvent(pm.orderTxCh) @@ -780,11 +779,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { pm.knownTxs.Add(tx.Hash(), struct{}{}) } } - warped := make([]*txpool.Transaction, len(txs)) - for i := range txs { - warped[i] = &txpool.Transaction{Tx: txs[i]} - } - pm.txpool.Add(warped, false, false) + pm.txpool.Add(txs, false, false) case msg.Code == OrderTxMsg: // Transactions arrived, make sure we have a valid and fresh chain to handle them diff --git a/eth/helper_test.go b/eth/helper_test.go index 6aa5d7e559..7202aee29e 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -119,34 +119,30 @@ func (p *testTxPool) Has(hash common.Hash) bool { // Get retrieves the transaction from local txpool with given // tx hash. -func (p *testTxPool) Get(hash common.Hash) *txpool.Transaction { +func (p *testTxPool) Get(hash common.Hash) *types.Transaction { p.lock.Lock() defer p.lock.Unlock() if tx := p.pool[hash]; tx != nil { - return &txpool.Transaction{Tx: tx} + return tx } return nil } // Add appends a batch of transactions to the pool, and notifies any // listeners if the addition channel is non nil -func (p *testTxPool) Add(txs []*txpool.Transaction, local bool, sync bool) []error { - unwrapped := make([]*types.Transaction, len(txs)) - for i, tx := range txs { - unwrapped[i] = tx.Tx - } +func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { p.lock.Lock() defer p.lock.Unlock() - for _, tx := range unwrapped { + for _, tx := range txs { p.pool[tx.Hash()] = tx } if p.added != nil { - p.added <- unwrapped + p.added <- txs } - return make([]error, len(unwrapped)) + return make([]error, len(txs)) } // Pending returns all the transactions known to the pool @@ -167,7 +163,7 @@ func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*txpool.Lazy for _, tx := range batch { pending[addr] = append(pending[addr], &txpool.LazyTransaction{ Hash: tx.Hash(), - Tx: &txpool.Transaction{Tx: tx}, + Tx: tx, Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap(), @@ -177,10 +173,16 @@ func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*txpool.Lazy return pending } +// SubscribeTransactions should return an event subscription of NewTxsEvent and +// send events to the given channel. +func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { + return p.txFeed.Subscribe(ch) +} + // SubscribeNewTxsEvent should return an event subscription of NewTxsEvent and // send events to the given channel. func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { - return p.txFeed.Subscribe(ch) + return p.SubscribeTransactions(ch, false) } // newTestTransaction create a new dummy transaction. diff --git a/eth/protocol.go b/eth/protocol.go index 0cd3f6c829..eff0fbdce5 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -105,15 +105,16 @@ var errorToString = map[int]string{ type txPool interface { // Add should add the given transactions to the pool. - Add(txs []*txpool.Transaction, local bool, sync bool) []error + Add(txs []*types.Transaction, local bool, sync bool) []error // Pending should return pending transactions. // The slice should be modifiable by the caller. Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction - // SubscribeNewTxsEvent should return an event subscription of - // NewTxsEvent and send events to the given channel. - SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + // SubscribeTransactions subscribes to new transaction events. The subscriber + // can decide whether to receive notifications only for newly seen transactions + // or also for reorged out ones. + SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription } type orderPool interface { diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 271cde6d15..79228ac93b 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -23,7 +23,6 @@ import ( "time" "github.com/XinFinOrg/XDPoSChain/common" - "github.com/XinFinOrg/XDPoSChain/core/txpool" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/crypto" "github.com/XinFinOrg/XDPoSChain/eth/downloader" @@ -132,10 +131,10 @@ func testSendTransactions(t *testing.T, protocol int) { // Fill the pool with big transactions. const txsize = txsyncPackSize / 10 - alltxs := make([]*txpool.Transaction, 100) + alltxs := make([]*types.Transaction, 100) for nonce := range alltxs { tx := newTestTransaction(testAccount, uint64(nonce), txsize) - alltxs[nonce] = &txpool.Transaction{Tx: tx} + alltxs[nonce] = tx } pm.txpool.Add(alltxs, false, false) @@ -145,7 +144,7 @@ func testSendTransactions(t *testing.T, protocol int) { defer p.close() seen := make(map[common.Hash]bool) for _, tx := range alltxs { - seen[tx.Tx.Hash()] = false + seen[tx.Hash()] = false } for n := 0; n < len(alltxs) && !t.Failed(); { var txs []*types.Transaction diff --git a/eth/sync.go b/eth/sync.go index 0b46d763ab..8eadbfa332 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -49,7 +49,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) { for _, batch := range pending { for _, lazy := range batch { if tx := lazy.Resolve(); tx != nil { - txs = append(txs, tx.Tx) + txs = append(txs, tx) } } } diff --git a/miner/ordering.go b/miner/ordering.go index 6ee17187c7..d8b3dc7dd1 100644 --- a/miner/ordering.go +++ b/miner/ordering.go @@ -66,15 +66,15 @@ func (s txByPriceAndTime) Len() int { func (s txByPriceAndTime) Less(i, j int) bool { i_price := s.txs[i].fees - if tx := s.txs[i].tx.Resolve(); tx != nil && tx.Tx.To() != nil { - if _, ok := s.payersSwap[*tx.Tx.To()]; ok { + if tx := s.txs[i].tx.Resolve(); tx != nil && tx.To() != nil { + if _, ok := s.payersSwap[*tx.To()]; ok { i_price = common.TRC21GasPrice } } j_price := s.txs[j].fees - if tx := s.txs[j].tx.Resolve(); tx != nil && tx.Tx.To() != nil { - if _, ok := s.payersSwap[*tx.Tx.To()]; ok { + if tx := s.txs[j].tx.Resolve(); tx != nil && tx.To() != nil { + if _, ok := s.payersSwap[*tx.To()]; ok { j_price = common.TRC21GasPrice } } @@ -130,8 +130,8 @@ func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address] for from, accTxs := range txs { var normalTxs []*txpool.LazyTransaction for _, lazyTx := range accTxs { - if tx := lazyTx.Resolve(); tx.Tx.IsSpecialTransaction() { - specialTxs = append(specialTxs, tx.Tx) + if tx := lazyTx.Resolve(); tx.IsSpecialTransaction() { + specialTxs = append(specialTxs, tx) } else { normalTxs = append(normalTxs, lazyTx) } diff --git a/miner/ordering_test.go b/miner/ordering_test.go index 46da07c90e..18a88df863 100644 --- a/miner/ordering_test.go +++ b/miner/ordering_test.go @@ -88,7 +88,7 @@ func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) { } groups[addr] = append(groups[addr], &txpool.LazyTransaction{ Hash: tx.Hash(), - Tx: &txpool.Transaction{Tx: tx}, + Tx: tx, Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap(), @@ -101,7 +101,7 @@ func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) { txs := types.Transactions{} for tx := txset.Peek(); tx != nil; tx = txset.Peek() { - txs = append(txs, tx.Tx.Tx) + txs = append(txs, tx.Tx) txset.Shift() } if len(txs) != expectedCount { @@ -153,7 +153,7 @@ func TestTransactionTimeSort(t *testing.T) { groups[addr] = append(groups[addr], &txpool.LazyTransaction{ Hash: tx.Hash(), - Tx: &txpool.Transaction{Tx: tx}, + Tx: tx, Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap(), @@ -164,7 +164,7 @@ func TestTransactionTimeSort(t *testing.T) { txs := types.Transactions{} for tx := txset.Peek(); tx != nil; tx = txset.Peek() { - txs = append(txs, tx.Tx.Tx) + txs = append(txs, tx.Tx) txset.Shift() } if len(txs) != len(keys) { @@ -193,11 +193,11 @@ func TestNewTransactionsByPriceAndNonce_SpecialSeparation(t *testing.T) { genNormalTx := func(nonce uint64, key *ecdsa.PrivateKey) *txpool.LazyTransaction { tx, _ := types.SignTx(types.NewTransaction(nonce, common.HexToAddress("0x1234567890123456789012345678901234567890"), big.NewInt(1), 21000, big.NewInt(1), nil), signer, key) - return &txpool.LazyTransaction{Tx: &txpool.Transaction{Tx: tx}, Hash: tx.Hash(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap()} + return &txpool.LazyTransaction{Tx: tx, Hash: tx.Hash(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap()} } genSpecialTx := func(nonce uint64, key *ecdsa.PrivateKey) *txpool.LazyTransaction { tx, _ := types.SignTx(types.NewTransaction(nonce, common.BlockSignersBinary, big.NewInt(1), 21000, big.NewInt(1), nil), signer, key) - return &txpool.LazyTransaction{Tx: &txpool.Transaction{Tx: tx}, Hash: tx.Hash(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap()} + return &txpool.LazyTransaction{Tx: tx, Hash: tx.Hash(), Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap()} } testCases := []struct { @@ -250,7 +250,7 @@ func TestNewTransactionsByPriceAndNonce_SpecialSeparation(t *testing.T) { normalCount := 0 for tx := txset.Peek(); tx != nil; tx = txset.Peek() { resolved := tx.Resolve() - if resolved == nil || resolved.Tx.To() == nil || *resolved.Tx.To() == common.BlockSignersBinary { + if resolved == nil || resolved.To() == nil || *resolved.To() == common.BlockSignersBinary { t.Errorf("txset contains special or nil-to tx: %v", resolved) } normalCount++ diff --git a/miner/worker.go b/miner/worker.go index 6503e6295a..daeb01cefe 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -186,7 +186,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com } if worker.announceTxs { // Subscribe NewTxsEvent for tx pool - worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) + worker.txsSub = eth.TxPool().SubscribeTransactions(worker.txsCh, true) } // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) @@ -375,7 +375,7 @@ func (w *worker) update() { acc, _ := types.Sender(w.current.signer, tx) txs[acc] = append(txs[acc], &txpool.LazyTransaction{ Hash: tx.Hash(), - Tx: &txpool.Transaction{Tx: tx}, + Tx: tx, Time: tx.Time(), GasFeeCap: tx.GasFeeCap(), GasTipCap: tx.GasTipCap(), @@ -1111,10 +1111,10 @@ func (w *Work) commitTransactions(mux *event.TypeMux, balanceFee map[common.Addr break } warped := lazyTx.Resolve() - if warped == nil || warped.Tx == nil { + if warped == nil { break } - tx := warped.Tx + tx := warped to := tx.To() if w.header.Number.Uint64() >= common.DenylistHFNumber { from := tx.From()