diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 27fdd00016..7155a67a9b 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1865,11 +1865,11 @@ func (p *BlobPool) drop() { // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. -func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { +func (p *BlobPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) { // If only plain transactions are requested, this pool is unsuitable as it // contains none, don't even bother. if !filter.BlobTxs { - return nil + return nil, 0 } // Track the amount of time waiting to retrieve the list of pending blob txs // from the pool and the amount of time actually spent on assembling the data. @@ -1885,6 +1885,7 @@ func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*tx pendtimeHist.Update(time.Since(execStart).Nanoseconds()) }() + var count int pending := make(map[common.Address][]*txpool.LazyTransaction, len(p.index)) for addr, txs := range p.index { lazies := make([]*txpool.LazyTransaction, 0, len(txs)) @@ -1930,9 +1931,10 @@ func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*tx } if len(lazies) > 0 { pending[addr] = lazies + count += len(lazies) } } - return pending + return pending, count } // updateStorageMetrics retrieves a bunch of stats from the data store and pushes diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 6580a339e3..ba96bea8ed 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -2122,7 +2122,7 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) { b.ReportAllocs() for i := 0; i < b.N; i++ { - p := pool.Pending(txpool.PendingFilter{ + p, _ := pool.Pending(txpool.PendingFilter{ MinTip: uint256.NewInt(1), BaseFee: chain.basefee, BlobFee: chain.blobfee, diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 36970c820e..25c4b13166 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -494,15 +494,16 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction, // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. -func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { +func (pool *LegacyPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) { // If only blob transactions are requested, this pool is unsuitable as it // contains none, don't even bother. if filter.BlobTxs { - return nil + return nil, 0 } pool.mu.Lock() defer pool.mu.Unlock() + var count int pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending)) for addr, list := range pool.pending { txs := list.Flatten() @@ -539,9 +540,10 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] } } pending[addr] = lazies + count += len(lazies) } } - return pending + return pending, count } // ValidateTxBasics checks whether a transaction is valid according to the consensus diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index db099ddf98..4cc1b193d6 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -154,7 +154,7 @@ type SubPool interface { // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. - Pending(filter PendingFilter) map[common.Address][]*LazyTransaction + Pending(filter PendingFilter) (map[common.Address][]*LazyTransaction, int) // SubscribeTransactions subscribes to new transaction events. The subscriber // can decide whether to receive notifications only for newly seen transactions diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index a314a83f1b..25647e0cce 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -359,14 +359,17 @@ func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error { // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. -func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransaction { +func (p *TxPool) Pending(filter PendingFilter) (map[common.Address][]*LazyTransaction, int) { + var count int txs := make(map[common.Address][]*LazyTransaction) for _, subpool := range p.subpools { - for addr, set := range subpool.Pending(filter) { - txs[addr] = set + set, n := subpool.Pending(filter) + for addr, list := range set { + txs[addr] = list } + count += n } - return txs + return txs, count } // SubscribeTransactions registers a subscription for new transaction events, diff --git a/eth/api_backend.go b/eth/api_backend.go index 3f826b7861..726d8316a0 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -347,7 +347,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) } func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { - pending := b.eth.txPool.Pending(txpool.PendingFilter{}) + pending, _ := b.eth.txPool.Pending(txpool.PendingFilter{}) var txs types.Transactions for _, batch := range pending { for _, lazy := range batch { diff --git a/eth/catalyst/simulated_beacon.go b/eth/catalyst/simulated_beacon.go index d1dae5251f..6bf9846993 100644 --- a/eth/catalyst/simulated_beacon.go +++ b/eth/catalyst/simulated_beacon.go @@ -283,7 +283,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u Method: "newPayloadV" + fmt.Sprintf("%d", version), }) - // Mark the payload as canon + // Mark the payload as canon _, err = c.engineAPI.newPayload(npCtx, *payload, blobHashes, beaconRoot, requests, false) npSpanEnd(&err) if err != nil { @@ -366,7 +366,7 @@ func (c *SimulatedBeacon) Rollback() { func (c *SimulatedBeacon) Fork(parentHash common.Hash) error { // Ensure no pending transactions. c.eth.TxPool().Sync() - if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 { + if pending, _ := c.eth.TxPool().Pending(txpool.PendingFilter{}); len(pending) != 0 { return errors.New("pending block dirty") } @@ -380,7 +380,7 @@ func (c *SimulatedBeacon) Fork(parentHash common.Hash) error { // AdjustTime creates a new block with an adjusted timestamp. func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error { - if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 { + if pending, _ := c.eth.TxPool().Pending(txpool.PendingFilter{}); len(pending) != 0 { return errors.New("could not adjust time on non-empty block") } parent := c.eth.BlockChain().CurrentBlock() diff --git a/eth/handler.go b/eth/handler.go index bb2cd5f88b..27b5e60697 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -86,7 +86,7 @@ type txPool interface { // Pending should return pending transactions. // The slice should be modifiable by the caller. - Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction + Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) // SubscribeTransactions subscribes to new transaction events. The subscriber // can decide whether to receive notifications only for newly seen transactions diff --git a/eth/handler_test.go b/eth/handler_test.go index 3470452980..fee6bae138 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -128,10 +128,11 @@ func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error { } // Pending returns all the transactions known to the pool -func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { +func (p *testTxPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) { p.lock.RLock() defer p.lock.RUnlock() + var count int batches := make(map[common.Address][]*types.Transaction) for _, tx := range p.pool { from, _ := types.Sender(types.HomesteadSigner{}, tx) @@ -152,9 +153,10 @@ func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]* Gas: tx.Gas(), BlobGas: tx.BlobGas(), }) + count++ } } - return pending + return pending, count } // SubscribeTransactions should return an event subscription of NewTxsEvent and diff --git a/eth/sync.go b/eth/sync.go index ddae8443a3..8b4bd90abf 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -25,7 +25,8 @@ import ( // syncTransactions starts sending all currently pending transactions to the given peer. func (h *handler) syncTransactions(p *eth.Peer) { var hashes []common.Hash - for _, batch := range h.txpool.Pending(txpool.PendingFilter{BlobTxs: false}) { + pending, _ := h.txpool.Pending(txpool.PendingFilter{BlobTxs: false}) + for _, batch := range pending { for _, tx := range batch { hashes = append(hashes, tx.Hash) } diff --git a/miner/worker.go b/miner/worker.go index 4788eb834e..6d7abeabd8 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -549,7 +549,7 @@ func (miner *Miner) fillTransactions(ctx context.Context, interrupt *atomic.Int3 filter.GasLimitCap = params.MaxTxGas } filter.BlobTxs = false - pendingPlainTxs := miner.txpool.Pending(filter) + pendingPlainTxs, plainTxCount := miner.txpool.Pending(filter) filter.BlobTxs = true if miner.chainConfig.IsOsaka(env.header.Number, env.header.Time) { @@ -557,10 +557,10 @@ func (miner *Miner) fillTransactions(ctx context.Context, interrupt *atomic.Int3 } else { filter.BlobVersion = types.BlobSidecarVersion0 } - pendingBlobTxs := miner.txpool.Pending(filter) + pendingBlobTxs, blobTxCount := miner.txpool.Pending(filter) span.SetAttributes( - telemetry.Int64Attribute("pending.plain.count", int64(len(pendingPlainTxs))), - telemetry.Int64Attribute("pending.blob.count", int64(len(pendingBlobTxs))), + telemetry.Int64Attribute("pending.plain.count", int64(plainTxCount)), + telemetry.Int64Attribute("pending.blob.count", int64(blobTxCount)), ) // Split the pending transactions into locals and remotes.