all: move main transaction pool into a subpool #27463 (#1890)

This commit is contained in:
Daniel Liu 2026-01-05 18:13:50 +08:00 committed by GitHub
parent b6b1c4b779
commit b3d354a897
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 3051 additions and 2224 deletions

View file

@ -40,7 +40,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/txpool/legacypool"
"github.com/XinFinOrg/XDPoSChain/core/vm"
"github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/eth"
@ -201,20 +201,20 @@ var (
Name: "txpool-journal",
Aliases: []string{"txpool.journal"},
Usage: "Disk journal for local transaction to survive node restarts",
Value: txpool.DefaultConfig.Journal,
Value: ethconfig.Defaults.TxPool.Journal,
Category: flags.TxPoolCategory,
}
TxPoolRejournalFlag = &cli.DurationFlag{
Name: "txpool-rejournal",
Aliases: []string{"txpool.rejournal"},
Usage: "Time interval to regenerate the local transaction journal",
Value: txpool.DefaultConfig.Rejournal,
Value: ethconfig.Defaults.TxPool.Rejournal,
Category: flags.TxPoolCategory,
}
TxPoolPriceLimitFlag = &cli.Uint64Flag{
Name: "txpool-pricelimit",
Aliases: []string{"txpool.pricelimit"},
Usage: "Minimum gas price limit to enforce for acceptance into the pool",
Usage: "Minimum gas price tip to enforce for acceptance into the pool",
Value: ethconfig.Defaults.TxPool.PriceLimit,
Category: flags.TxPoolCategory,
}
@ -1398,7 +1398,7 @@ func setGPO(ctx *cli.Context, cfg *gasprice.Config) {
}
}
func setTxPool(ctx *cli.Context, cfg *txpool.Config) {
func setTxPool(ctx *cli.Context, cfg *legacypool.Config) {
if ctx.IsSet(TxPoolNoLocalsFlag.Name) {
cfg.NoLocals = ctx.Bool(TxPoolNoLocalsFlag.Name)
}

View file

@ -93,7 +93,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool,
return err
}
// Add tx signed to local tx pool.
err = pool.AddLocal(txSigned)
err = pool.Add([]*txpool.Transaction{{Tx: txSigned}}, true, true)[0]
if err != nil {
log.Error("Fail to add tx sign to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce)
return err
@ -121,7 +121,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool,
return err
}
// Add tx signed to local tx pool.
err = pool.AddLocal(txSigned)
err = pool.Add([]*txpool.Transaction{{Tx: txSigned}}, true, true)[0]
if err != nil {
log.Error("Fail to add tx secret to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce)
return err
@ -150,7 +150,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *txpool.TxPool,
return err
}
// Add tx to pool.
err = pool.AddLocal(txSigned)
err = pool.Add([]*txpool.Transaction{{Tx: txSigned}}, true, true)[0]
if err != nil {
log.Error("Fail to add tx opening to local pool.", "error", err, "number", block.NumberU64(), "hash", block.Hash().Hex(), "from", account.Address, "nonce", nonce)
return err

65
core/txpool/errors.go Normal file
View file

@ -0,0 +1,65 @@
// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
import "errors"
var (
// ErrAlreadyKnown is returned if the transactions is already contained
// within the pool.
ErrAlreadyKnown = errors.New("already known")
// ErrInvalidSender is returned if the transaction contains an invalid signature.
ErrInvalidSender = errors.New("invalid sender")
// ErrUnderpriced is returned if a transaction's gas price is below the minimum
// configured for the transaction pool.
ErrUnderpriced = errors.New("transaction underpriced")
// ErrTxPoolOverflow is returned if the transaction pool is full and can't accept
// another remote transaction.
ErrTxPoolOverflow = errors.New("txpool is full")
// ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced
// with a different one without the required price bump.
ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
// ErrGasLimit is returned if a transaction's requested gas limit exceeds the
// maximum allowance of the current block.
ErrGasLimit = errors.New("exceeds block gas limit")
// ErrNegativeValue is a sanity error to ensure no one is able to specify a
// transaction with a negative value.
ErrNegativeValue = errors.New("negative value")
// ErrOversizedData is returned if the input data of a transaction is greater
// than some meaningful limit a user might use. This is not a consensus error
// making the transaction invalid, rather a DOS protection.
ErrOversizedData = errors.New("oversized data")
// ErrFutureReplacePending is returned if a future transaction replaces a pending
// transaction. Future transactions should only be able to replace other future transactions.
ErrFutureReplacePending = errors.New("future transaction tries to replace pending")
ErrZeroGasPrice = errors.New("zero gas price")
ErrUnderMinGasPrice = errors.New("under min gas price")
ErrDuplicateSpecialTransaction = errors.New("duplicate a special transaction")
ErrMinDeploySMC = errors.New("smart contract creation cost is under allowance")
)

View file

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"errors"

File diff suppressed because it is too large Load diff

View file

@ -13,7 +13,7 @@
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"crypto/ecdsa"
@ -34,7 +34,7 @@ func pricedValuedTransaction(nonce uint64, value int64, gaslimit uint64, gaspric
return tx
}
func count(t *testing.T, pool *TxPool) (pending int, queued int) {
func count(t *testing.T, pool *LegacyPool) (pending int, queued int) {
t.Helper()
pending, queued = pool.stats()
if err := validatePoolInternals(pool); err != nil {
@ -43,7 +43,7 @@ func count(t *testing.T, pool *TxPool) (pending int, queued int) {
return pending, queued
}
func fillPool(t testing.TB, pool *TxPool) {
func fillPool(t testing.TB, pool *LegacyPool) {
t.Helper()
// Create a number of test accounts, fund them and make transactions
executableTxs := types.Transactions{}
@ -57,8 +57,8 @@ func fillPool(t testing.TB, pool *TxPool) {
}
}
// Import the batch and verify that limits have been enforced
pool.AddRemotesSync(executableTxs)
pool.AddRemotesSync(nonExecutableTxs)
pool.addRemotesSync(executableTxs)
pool.addRemotesSync(nonExecutableTxs)
pending, queued := pool.Stats()
slots := pool.all.Slots()
// sanity-check that the test prerequisites are ok (pending full)
@ -80,12 +80,13 @@ func TestTransactionFutureAttack(t *testing.T) {
// Create the pool to test the limit enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
config := testTxPoolConfig
config.GlobalQueue = 100
config.GlobalSlots = 100
pool := New(config, eip1559Config, blockchain)
defer pool.Stop()
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock())
defer pool.Close()
fillPool(t, pool)
pending, _ := pool.Stats()
// Now, future transaction attack starts, let's add a bunch of expensive non-executables, and see if the pending-count drops
@ -97,7 +98,7 @@ func TestTransactionFutureAttack(t *testing.T) {
futureTxs = append(futureTxs, pricedTransaction(1000+uint64(j), 100000, big.NewInt(500), key))
}
for i := 0; i < 5; i++ {
pool.AddRemotesSync(futureTxs)
pool.addRemotesSync(futureTxs)
newPending, newQueued := count(t, pool)
t.Logf("pending: %d queued: %d, all: %d\n", newPending, newQueued, pool.all.Slots())
}
@ -116,9 +117,10 @@ func TestTransactionFuture1559(t *testing.T) {
t.Parallel()
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, eip1559Config, blockchain)
defer pool.Stop()
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
defer pool.Close()
// Create a number of test accounts, fund them and make transactions
fillPool(t, pool)
@ -132,7 +134,7 @@ func TestTransactionFuture1559(t *testing.T) {
for j := 0; j < int(pool.config.GlobalSlots+pool.config.GlobalQueue); j++ {
futureTxs = append(futureTxs, dynamicFeeTx(1000+uint64(j), 100000, big.NewInt(200), big.NewInt(101), key))
}
pool.AddRemotesSync(futureTxs)
pool.addRemotesSync(futureTxs)
}
newPending, _ := pool.Stats()
// Pending should not have been touched
@ -148,9 +150,10 @@ func TestTransactionZAttack(t *testing.T) {
t.Parallel()
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()))
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, eip1559Config, blockchain)
defer pool.Stop()
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
defer pool.Close()
// Create a number of test accounts, fund them and make transactions
fillPool(t, pool)
@ -182,7 +185,7 @@ func TestTransactionZAttack(t *testing.T) {
key, _ := crypto.GenerateKey()
pool.currentState.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(100000000000), tracing.BalanceChangeUnspecified)
futureTxs = append(futureTxs, pricedTransaction(1000+uint64(j), 21000, big.NewInt(500), key))
pool.AddRemotesSync(futureTxs)
pool.addRemotesSync(futureTxs)
}
overDraftTxs := types.Transactions{}
@ -193,11 +196,11 @@ func TestTransactionZAttack(t *testing.T) {
overDraftTxs = append(overDraftTxs, pricedValuedTransaction(uint64(j), 600000000000, 21000, big.NewInt(500), key))
}
}
pool.AddRemotesSync(overDraftTxs)
pool.AddRemotesSync(overDraftTxs)
pool.AddRemotesSync(overDraftTxs)
pool.AddRemotesSync(overDraftTxs)
pool.AddRemotesSync(overDraftTxs)
pool.addRemotesSync(overDraftTxs)
pool.addRemotesSync(overDraftTxs)
pool.addRemotesSync(overDraftTxs)
pool.addRemotesSync(overDraftTxs)
pool.addRemotesSync(overDraftTxs)
newPending, newQueued := count(t, pool)
newIvPending := countInvalidPending()
@ -218,12 +221,13 @@ func TestTransactionZAttack(t *testing.T) {
func BenchmarkFutureAttack(b *testing.B) {
// Create the pool to test the limit enforcement with
statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()))
blockchain := newTestBlockChain(1000000, statedb, new(event.Feed))
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
config := testTxPoolConfig
config.GlobalQueue = 100
config.GlobalSlots = 100
pool := New(config, eip1559Config, blockchain)
defer pool.Stop()
pool := New(config, blockchain)
pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock())
defer pool.Close()
fillPool(b, pool)
key, _ := crypto.GenerateKey()
@ -235,6 +239,6 @@ func BenchmarkFutureAttack(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < 5; i++ {
pool.AddRemotesSync(futureTxs)
pool.addRemotesSync(futureTxs)
}
}

View file

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"errors"
@ -31,6 +31,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
@ -627,13 +628,13 @@ func (pool *LendingPool) validateTx(tx *types.LendingTransaction, local bool) er
}
// Heuristic limit, reject transactions over 32KB to prevent DOS attacks
if tx.Size() > 32*1024 {
return ErrOversizedData
return txpool.ErrOversizedData
}
// Make sure the transaction is signed properly
from, err := types.LendingSender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
return txpool.ErrInvalidSender
}
err = pool.validateLending(tx)
if err != nil {
@ -869,18 +870,18 @@ func (pool *LendingPool) addTxsLocked(txs []*types.LendingTransaction, local boo
// Status returns the status (unknown/pending/queued) of a batch of transactions
// identified by their hashes.
func (pool *LendingPool) Status(hashes []common.Hash) []TxStatus {
func (pool *LendingPool) Status(hashes []common.Hash) []txpool.TxStatus {
pool.mu.RLock()
defer pool.mu.RUnlock()
status := make([]TxStatus, len(hashes))
status := make([]txpool.TxStatus, len(hashes))
for i, hash := range hashes {
if tx := pool.all[hash]; tx != nil {
from, _ := types.LendingSender(pool.signer, tx) // already validated
if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil {
status[i] = TxStatusPending
status[i] = txpool.TxStatusPending
} else {
status[i] = TxStatusQueued
status[i] = txpool.TxStatusQueued
}
}
}

View file

@ -1,4 +1,4 @@
package txpool
package legacypool
import (
"math/big"

View file

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"io"

View file

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"container/heap"

View file

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"container/heap"

View file

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"math/big"

View file

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"sync"

View file

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"errors"
@ -31,6 +31,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
@ -529,13 +530,13 @@ func (pool *OrderPool) validateTx(tx *types.OrderTransaction, local bool) error
}
// Heuristic limit, reject transactions over 32KB to prevent DOS attacks
if tx.Size() > 32*1024 {
return ErrOversizedData
return txpool.ErrOversizedData
}
// Make sure the transaction is signed properly
from, err := types.OrderSender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
return txpool.ErrInvalidSender
}
err = pool.validateOrder(tx)
if err != nil {
@ -778,18 +779,18 @@ func (pool *OrderPool) addTxsLocked(txs []*types.OrderTransaction, local bool) [
// Status returns the status (unknown/pending/queued) of a batch of transactions
// identified by their hashes.
func (pool *OrderPool) Status(hashes []common.Hash) []TxStatus {
func (pool *OrderPool) Status(hashes []common.Hash) []txpool.TxStatus {
pool.mu.RLock()
defer pool.mu.RUnlock()
status := make([]TxStatus, len(hashes))
status := make([]txpool.TxStatus, len(hashes))
for i, hash := range hashes {
if tx := pool.all[hash]; tx != nil {
from, _ := types.OrderSender(pool.signer, tx) // already validated
if pool.pending[from] != nil && pool.pending[from].txs.items[tx.Nonce()] != nil {
status[i] = TxStatusPending
status[i] = txpool.TxStatusPending
} else {
status[i] = TxStatusQueued
status[i] = txpool.TxStatusQueued
}
}
}

View file

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"io"

View file

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
package legacypool
import (
"container/heap"

112
core/txpool/subpool.go Normal file
View file

@ -0,0 +1,112 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package txpool
import (
"math/big"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
)
// Transaction is a helper struct to group together a canonical transaction with
// satellite data items that are needed by the pool but are not part of the chain.
type Transaction struct {
Tx *types.Transaction // Canonical transaction
}
// SubPool represents a specialized transaction pool that lives on its own (e.g.
// blob pool). Since independent of how many specialized pools we have, they do
// need to be updated in lockstep and assemble into one coherent view for block
// production, this interface defines the common methods that allow the primary
// transaction pool to manage the subpools.
type SubPool interface {
// Filter is a selector used to decide whether a transaction should be added
// to this particular subpool.
Filter(tx *types.Transaction) bool
// Init sets the base parameters of the subpool, allowing it to load any saved
// transactions from disk and also permitting internal maintenance routines to
// start up.
//
// These should not be passed as a constructor argument - nor should the pools
// start by themselves - in order to keep multiple subpools in lockstep with
// one another.
Init(gasTip *big.Int, head *types.Header) error
// Close terminates any background processing threads and releases any held
// resources.
Close() error
// Reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
Reset(oldHead, newHead *types.Header)
// SetGasTip updates the minimum price required by the subpool for a new
// transaction, and drops all transactions below this threshold.
SetGasTip(tip *big.Int) error
// Has returns an indicator whether subpool has a transaction cached with the
// given hash.
Has(hash common.Hash) bool
// Get returns a transaction if it is contained in the pool, or nil otherwise.
Get(hash common.Hash) *Transaction
// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
Add(txs []*Transaction, local bool, sync bool) []error
// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
Pending(enforceTips bool) map[common.Address][]*types.Transaction
// SubscribeTransactions subscribes to new transaction events.
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
Nonce(addr common.Address) uint64
// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
Stats() (int, int)
// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and sorted by nonce.
Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction)
// ContentFrom retrieves the data content of the transaction pool, returning the
// pending as well as queued transactions of this address, grouped by nonce.
ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction)
// Locals retrieves the accounts currently considered local by the pool.
Locals() []common.Address
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
Status(hash common.Hash) TxStatus
// SetSigner sets the function to identify signer accounts.
SetSigner(f func(address common.Address) bool)
// IsSigner checks if the given address is a signer.
IsSigner(addr common.Address) bool
}

File diff suppressed because it is too large Load diff

View file

@ -765,9 +765,9 @@ func (s *TxByPriceAndTime) Pop() interface{} {
// transactions in a profit-maximizing sorted order, while supporting removing
// entire batches of transactions for non-executable accounts.
type TransactionsByPriceAndNonce struct {
txs map[common.Address]Transactions // Per account nonce-sorted list of transactions
heads TxByPriceAndTime // Next transaction for each unique account (price heap)
signer Signer // Signer for the set of transactions
txs map[common.Address][]*Transaction // Per account nonce-sorted list of transactions
heads TxByPriceAndTime // Next transaction for each unique account (price heap)
signer Signer // Signer for the set of transactions
}
// NewTransactionsByPriceAndNonce creates a transaction set that can retrieve
@ -777,7 +777,7 @@ type TransactionsByPriceAndNonce struct {
// if after providing it to the constructor.
//
// It also classifies special txs and normal txs
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions, payersSwap map[common.Address]*big.Int) (*TransactionsByPriceAndNonce, Transactions) {
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address][]*Transaction, payersSwap map[common.Address]*big.Int) (*TransactionsByPriceAndNonce, Transactions) {
// Initialize a price and received time based heap with the head transactions
heads := TxByPriceAndTime{}
heads.payersSwap = payersSwap

View file

@ -57,9 +57,9 @@ func MakeSigner(config *params.ChainConfig, blockNumber *big.Int) Signer {
}
// LatestSigner returns the 'most permissive' Signer available for the given chain
// configuration. Specifically, this enables support of EIP-155 replay protection and
// EIP-2930 access list transactions when their respective forks are scheduled to occur at
// any block number in the chain config.
// configuration. Specifically, this enables support of all types of transactions
// when their respective forks are scheduled to occur at any block number (or time)
// in the chain config.
//
// Use this in transaction-handling code where the current block number is unknown. If you
// have the current block number available, use MakeSigner instead.

View file

@ -272,7 +272,7 @@ func TestTransactionPriceNonceSort(t *testing.T) {
signer := HomesteadSigner{}
// Generate a batch of transactions with overlapping values, but shifted nonces
groups := map[common.Address]Transactions{}
groups := map[common.Address][]*Transaction{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
for i := 0; i < 25; i++ {
@ -382,7 +382,7 @@ func TestTransactionTimeSort(t *testing.T) {
signer := HomesteadSigner{}
// Generate a batch of transactions with overlapping prices, but different creation times
groups := map[common.Address]Transactions{}
groups := map[common.Address][]*Transaction{}
for start, key := range keys {
addr := crypto.PubkeyToAddress(key.PublicKey)
@ -733,11 +733,11 @@ func TestNewTransactionsByPriceAndNonce_SpecialSeparation(t *testing.T) {
}
testCases := []struct {
name string
normalCount int
specialCount int
expectNormal int
expectSpecial int
name string
normalCount int
specialCount int
expectNormal int
expectSpecial int
}{
{"no transactions", 0, 0, 0, 0},
{"only 1 normal", 1, 0, 1, 0},
@ -762,7 +762,7 @@ func TestNewTransactionsByPriceAndNonce_SpecialSeparation(t *testing.T) {
for i := 0; i < tc.specialCount; i++ {
txs = append(txs, genSpecialTx(uint64(tc.normalCount+i), key))
}
group := map[common.Address]Transactions{}
group := map[common.Address][]*Transaction{}
if len(txs) > 0 {
group[addr] = txs
}

View file

@ -305,7 +305,7 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
}
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
return b.eth.txPool.Add([]*txpool.Transaction{{Tx: signedTx}}, true, false)[0]
}
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
@ -318,7 +318,10 @@ func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
}
func (b *EthAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transaction {
return b.eth.txPool.Get(hash)
if tx := b.eth.txPool.Get(hash); tx != nil {
return tx.Tx
}
return nil
}
func (b *EthAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
@ -330,20 +333,24 @@ func (b *EthAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (
return b.eth.txPool.Nonce(addr), nil
}
func (b *EthAPIBackend) Stats() (pending int, queued int) {
func (b *EthAPIBackend) Stats() (runnable int, blocked int) {
return b.eth.txPool.Stats()
}
func (b *EthAPIBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
return b.eth.TxPool().Content()
func (b *EthAPIBackend) TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
return b.eth.txPool.Content()
}
func (b *EthAPIBackend) TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions) {
return b.eth.TxPool().ContentFrom(addr)
func (b *EthAPIBackend) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
return b.eth.txPool.ContentFrom(addr)
}
func (b *EthAPIBackend) TxPool() *txpool.TxPool {
return b.eth.txPool
}
func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.eth.TxPool().SubscribeNewTxsEvent(ch)
return b.eth.txPool.SubscribeNewTxsEvent(ch)
}
func (b *EthAPIBackend) Downloader() *downloader.Downloader {
@ -644,7 +651,3 @@ func (b *EthAPIBackend) IsStaking() bool {
func (b *EthAPIBackend) BlockChain() *core.BlockChain {
return b.eth.blockchain
}
func (b *EthAPIBackend) TxPool() *txpool.TxPool {
return b.eth.txPool
}

View file

@ -41,6 +41,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/core/bloombits"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/txpool/legacypool"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/core/vm"
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
@ -72,9 +73,10 @@ type Ethereum struct {
shutdownChan chan bool // Channel for shutting down the ethereum
// Handlers
txPool *txpool.TxPool
orderPool *txpool.OrderPool
lendingPool *txpool.LendingPool
txPool *txpool.TxPool
orderPool *legacypool.OrderPool
lendingPool *legacypool.LendingPool
blockchain *core.BlockChain
protocolManager *ProtocolManager
@ -265,9 +267,15 @@ func New(stack *node.Node, config *ethconfig.Config, XDCXServ *XDCx.XDCX, lendin
if config.TxPool.Journal != "" {
config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
}
eth.txPool = txpool.New(config.TxPool, eth.chainConfig, eth.blockchain)
eth.orderPool = txpool.NewOrderPool(eth.chainConfig, eth.blockchain)
eth.lendingPool = txpool.NewLendingPool(eth.chainConfig, eth.blockchain)
legacyPool := legacypool.New(config.TxPool, eth.blockchain)
eth.txPool, err = txpool.New(new(big.Int).SetUint64(config.TxPool.PriceLimit), eth.blockchain, []txpool.SubPool{legacyPool})
if err != nil {
return nil, err
}
eth.orderPool = legacypool.NewOrderPool(eth.chainConfig, eth.blockchain)
eth.lendingPool = legacypool.NewLendingPool(eth.chainConfig, eth.blockchain)
if eth.protocolManager, err = NewProtocolManagerEx(eth.chainConfig, config.SyncMode, networkID, eth.eventMux, eth.txPool, eth.orderPool, eth.lendingPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
@ -564,7 +572,7 @@ func (e *Ethereum) Stop() error {
e.blockchain.Stop()
e.protocolManager.Stop()
e.txPool.Stop()
e.txPool.Close()
e.miner.Stop()
e.eventMux.Stop()
@ -582,7 +590,7 @@ func (e *Ethereum) GetXDCX() *XDCx.XDCX {
return e.XDCX
}
func (e *Ethereum) OrderPool() *txpool.OrderPool {
func (e *Ethereum) OrderPool() *legacypool.OrderPool {
return e.orderPool
}
@ -591,6 +599,6 @@ func (e *Ethereum) GetXDCXLending() *XDCxlending.Lending {
}
// LendingPool geth eth lending pool
func (e *Ethereum) LendingPool() *txpool.LendingPool {
func (e *Ethereum) LendingPool() *legacypool.LendingPool {
return e.lendingPool
}

View file

@ -24,7 +24,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/hexutil"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/txpool/legacypool"
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
"github.com/XinFinOrg/XDPoSChain/eth/gasprice"
"github.com/XinFinOrg/XDPoSChain/params"
@ -52,7 +52,7 @@ var Defaults = Config{
FilterLogCacheSize: 32,
GasPrice: big.NewInt(0.25 * params.Shannon),
TxPool: txpool.DefaultConfig,
TxPool: legacypool.DefaultConfig,
RPCGasCap: 50000000,
RPCEVMTimeout: 5 * time.Second,
GPO: FullNodeGPO,
@ -99,7 +99,7 @@ type Config struct {
GasPrice *big.Int
// Transaction pool options
TxPool txpool.Config
TxPool legacypool.Config
// Gas Price Oracle options
GPO gasprice.Config

View file

@ -9,7 +9,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/hexutil"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/txpool/legacypool"
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
"github.com/XinFinOrg/XDPoSChain/eth/gasprice"
)
@ -39,7 +39,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
MinerThreads int `toml:",omitempty"`
ExtraData hexutil.Bytes `toml:",omitempty"`
GasPrice *big.Int
TxPool txpool.Config
TxPool legacypool.Config
GPO gasprice.Config
EnablePreimageRecording bool
VMTrace string
@ -103,7 +103,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
MinerThreads *int `toml:",omitempty"`
ExtraData *hexutil.Bytes `toml:",omitempty"`
GasPrice *big.Int
TxPool *txpool.Config
TxPool *legacypool.Config
GPO *gasprice.Config
EnablePreimageRecording *bool
VMTrace *string

View file

@ -31,6 +31,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/consensus/misc"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/eth/bft"
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
@ -779,7 +780,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.knownTxs.Add(tx.Hash(), struct{}{})
}
}
pm.txpool.AddRemotes(txs)
warped := make([]*txpool.Transaction, len(txs))
for i := range txs {
warped[i] = &txpool.Transaction{Tx: txs[i]}
}
pm.txpool.Add(warped, false, false)
case msg.Code == OrderTxMsg:
// Transactions arrived, make sure we have a valid and fresh chain to handle them

View file

@ -31,6 +31,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/consensus/ethash"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/core/vm"
"github.com/XinFinOrg/XDPoSChain/crypto"
@ -68,7 +69,9 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
panic(err)
}
pm, err := NewProtocolManager(gspec.Config, mode, ethconfig.Defaults.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db)
txpool := newTestTxPool()
txpool.added = newtx
pm, err := NewProtocolManager(gspec.Config, mode, ethconfig.Defaults.NetworkId, evmux, txpool, engine, blockchain, db)
if err != nil {
return nil, nil, err
}
@ -90,32 +93,68 @@ func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks i
// testTxPool is a fake, helper transaction pool for testing purposes
type testTxPool struct {
txFeed event.Feed
pool []*types.Transaction // Collection of all transactions
added chan<- []*types.Transaction // Notification channel for new transactions
pool map[common.Hash]*types.Transaction // Hash map of collected transactions
lock sync.RWMutex // Protects the transaction pool
txFeed event.Feed
lock sync.RWMutex // Protects the transaction pool
added chan<- []*types.Transaction // Notification channel for new transactions
}
// AddRemotes appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil
func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error {
// newTestTxPool creates a mock transaction pool.
func newTestTxPool() *testTxPool {
return &testTxPool{
pool: make(map[common.Hash]*types.Transaction),
}
}
// Has returns an indicator whether txpool has a transaction
// cached with the given hash.
func (p *testTxPool) Has(hash common.Hash) bool {
p.lock.Lock()
defer p.lock.Unlock()
p.pool = append(p.pool, txs...)
if p.added != nil {
p.added <- txs
return p.pool[hash] != nil
}
// Get retrieves the transaction from local txpool with given
// tx hash.
func (p *testTxPool) Get(hash common.Hash) *txpool.Transaction {
p.lock.Lock()
defer p.lock.Unlock()
if tx := p.pool[hash]; tx != nil {
return &txpool.Transaction{Tx: tx}
}
return make([]error, len(txs))
return nil
}
// Add appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil
func (p *testTxPool) Add(txs []*txpool.Transaction, local bool, sync bool) []error {
unwrapped := make([]*types.Transaction, len(txs))
for i, tx := range txs {
unwrapped[i] = tx.Tx
}
p.lock.Lock()
defer p.lock.Unlock()
for _, tx := range unwrapped {
p.pool[tx.Hash()] = tx
}
if p.added != nil {
p.added <- unwrapped
}
return make([]error, len(unwrapped))
}
// Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(enforceTips bool) map[common.Address]types.Transactions {
func (p *testTxPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction {
p.lock.RLock()
defer p.lock.RUnlock()
batches := make(map[common.Address]types.Transactions)
batches := make(map[common.Address][]*types.Transaction)
for _, tx := range p.pool {
from, _ := types.Sender(types.HomesteadSigner{}, tx)
batches[from] = append(batches[from], tx)
@ -126,6 +165,8 @@ func (p *testTxPool) Pending(enforceTips bool) map[common.Address]types.Transact
return batches
}
// SubscribeNewTxsEvent should return an event subscription of NewTxsEvent and
// send events to the given channel.
func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return p.txFeed.Subscribe(ch)
}

View file

@ -23,6 +23,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/rlp"
@ -103,12 +104,12 @@ var errorToString = map[int]string{
}
type txPool interface {
// AddRemotes should add the given transactions to the pool.
AddRemotes([]*types.Transaction) []error
// Add should add the given transactions to the pool.
Add(txs []*txpool.Transaction, local bool, sync bool) []error
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending(enforceTips bool) map[common.Address]types.Transactions
Pending(enforceTips bool) map[common.Address][]*types.Transaction
// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.

View file

@ -23,6 +23,7 @@ import (
"time"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
@ -131,11 +132,12 @@ func testSendTransactions(t *testing.T, protocol int) {
// Fill the pool with big transactions.
const txsize = txsyncPackSize / 10
alltxs := make([]*types.Transaction, 100)
alltxs := make([]*txpool.Transaction, 100)
for nonce := range alltxs {
alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), txsize)
tx := newTestTransaction(testAccount, uint64(nonce), txsize)
alltxs[nonce] = &txpool.Transaction{Tx: tx}
}
pm.txpool.AddRemotes(alltxs)
pm.txpool.Add(alltxs, false, false)
// Connect several peers. They should all receive the pending transactions.
var wg sync.WaitGroup
@ -143,7 +145,7 @@ func testSendTransactions(t *testing.T, protocol int) {
defer p.close()
seen := make(map[common.Hash]bool)
for _, tx := range alltxs {
seen[tx.Hash()] = false
seen[tx.Tx.Hash()] = false
}
for n := 0; n < len(alltxs) && !t.Failed(); {
var txs []*types.Transaction

50
event/multisub.go Normal file
View file

@ -0,0 +1,50 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package event
// JoinSubscriptions joins multiple subscriptions to be able to track them as
// one entity and collectively cancel them or consume any errors from them.
func JoinSubscriptions(subs ...Subscription) Subscription {
return NewSubscription(func(unsubbed <-chan struct{}) error {
// Unsubscribe all subscriptions before returning
defer func() {
for _, sub := range subs {
sub.Unsubscribe()
}
}()
// Wait for an error on any of the subscriptions and propagate up
errc := make(chan error, len(subs))
for i := range subs {
go func(sub Subscription) {
select {
case err := <-sub.Err():
if err != nil {
errc <- err
}
case <-unsubbed:
}
}(subs[i])
}
select {
case err := <-errc:
return err
case <-unsubbed:
return nil
}
})
}

175
event/multisub_test.go Normal file
View file

@ -0,0 +1,175 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package event
import (
"testing"
"time"
)
func TestMultisub(t *testing.T) {
// Create a double subscription and ensure events propagate through
var (
feed1 Feed
feed2 Feed
)
sink1 := make(chan int, 1)
sink2 := make(chan int, 1)
sub1 := feed1.Subscribe(sink1)
sub2 := feed2.Subscribe(sink2)
sub := JoinSubscriptions(sub1, sub2)
feed1.Send(1)
select {
case n := <-sink1:
if n != 1 {
t.Errorf("sink 1 delivery mismatch: have %d, want %d", n, 1)
}
default:
t.Error("sink 1 missing delivery")
}
feed2.Send(2)
select {
case n := <-sink2:
if n != 2 {
t.Errorf("sink 2 delivery mismatch: have %d, want %d", n, 2)
}
default:
t.Error("sink 2 missing delivery")
}
// Unsubscribe and ensure no more events are delivered
sub.Unsubscribe()
select {
case <-sub.Err():
case <-time.After(50 * time.Millisecond):
t.Error("multisub didn't propagate closure")
}
feed1.Send(11)
select {
case n := <-sink1:
t.Errorf("sink 1 unexpected delivery: %d", n)
default:
}
feed2.Send(22)
select {
case n := <-sink2:
t.Errorf("sink 2 unexpected delivery: %d", n)
default:
}
}
func TestMultisubPartialUnsubscribe(t *testing.T) {
// Create a double subscription but terminate one half, ensuring no error
// is propagated yet up to the outer subscription
var (
feed1 Feed
feed2 Feed
)
sink1 := make(chan int, 1)
sink2 := make(chan int, 1)
sub1 := feed1.Subscribe(sink1)
sub2 := feed2.Subscribe(sink2)
sub := JoinSubscriptions(sub1, sub2)
sub1.Unsubscribe()
select {
case <-sub.Err():
t.Error("multisub propagated closure")
case <-time.After(50 * time.Millisecond):
}
// Ensure that events cross only the second feed
feed1.Send(1)
select {
case n := <-sink1:
t.Errorf("sink 1 unexpected delivery: %d", n)
default:
}
feed2.Send(2)
select {
case n := <-sink2:
if n != 2 {
t.Errorf("sink 2 delivery mismatch: have %d, want %d", n, 2)
}
default:
t.Error("sink 2 missing delivery")
}
// Unsubscribe and ensure no more events are delivered
sub.Unsubscribe()
select {
case <-sub.Err():
case <-time.After(50 * time.Millisecond):
t.Error("multisub didn't propagate closure")
}
feed1.Send(11)
select {
case n := <-sink1:
t.Errorf("sink 1 unexpected delivery: %d", n)
default:
}
feed2.Send(22)
select {
case n := <-sink2:
t.Errorf("sink 2 unexpected delivery: %d", n)
default:
}
}
func TestMultisubFullUnsubscribe(t *testing.T) {
// Create a double subscription and terminate the multi sub, ensuring an
// error is propagated up.
var (
feed1 Feed
feed2 Feed
)
sink1 := make(chan int, 1)
sink2 := make(chan int, 1)
sub1 := feed1.Subscribe(sink1)
sub2 := feed2.Subscribe(sink2)
sub := JoinSubscriptions(sub1, sub2)
sub.Unsubscribe()
select {
case <-sub.Err():
case <-time.After(50 * time.Millisecond):
t.Error("multisub didn't propagate closure")
}
// Ensure no more events are delivered
feed1.Send(1)
select {
case n := <-sink1:
t.Errorf("sink 1 unexpected delivery: %d", n)
default:
}
feed2.Send(2)
select {
case n := <-sink2:
t.Errorf("sink 2 unexpected delivery: %d", n)
default:
}
}

View file

@ -86,8 +86,8 @@ type Backend interface {
GetPoolTransaction(txHash common.Hash) *types.Transaction
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
Stats() (pending int, queued int)
TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions)
TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions)
TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction)
TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction)
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
ChainConfig() *params.ChainConfig

View file

@ -370,10 +370,10 @@ func (b *backendMock) GetPoolNonce(ctx context.Context, addr common.Address) (ui
return 0, nil
}
func (b *backendMock) Stats() (pending int, queued int) { return 0, 0 }
func (b *backendMock) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
func (b *backendMock) TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
return nil, nil
}
func (b *backendMock) TxPoolContentFrom(addr common.Address) (types.Transactions, types.Transactions) {
func (b *backendMock) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
return nil, nil
}
func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil }

View file

@ -22,7 +22,7 @@ const (
EthCategory = "ETHEREUM"
LightCategory = "LIGHT CLIENT"
DevCategory = "DEVELOPER CHAIN"
TxPoolCategory = "TRANSACTION POOL"
TxPoolCategory = "TRANSACTION POOL (EVM)"
PerfCategory = "PERFORMANCE TUNING"
AccountCategory = "ACCOUNT"
APICategory = "API AND CONSOLE"

View file

@ -29,6 +29,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/txpool"
"github.com/XinFinOrg/XDPoSChain/core/txpool/legacypool"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
"github.com/XinFinOrg/XDPoSChain/ethdb"
@ -44,8 +45,8 @@ type Backend interface {
TxPool() *txpool.TxPool
ChainDb() ethdb.Database
GetXDCX() *XDCx.XDCX
OrderPool() *txpool.OrderPool
LendingPool() *txpool.LendingPool
OrderPool() *legacypool.OrderPool
LendingPool() *legacypool.LendingPool
GetXDCXLending() *XDCxlending.Lending
}

View file

@ -355,7 +355,7 @@ func (w *worker) update() {
// be automatically eliminated.
if atomic.LoadInt32(&w.mining) == 0 {
w.currentMu.Lock()
txs := make(map[common.Address]types.Transactions)
txs := make(map[common.Address][]*types.Transaction, len(ev.Txs))
for _, tx := range ev.Txs {
acc, _ := types.Sender(w.current.signer, tx)
txs[acc] = append(txs[acc], tx)