update Pending() method, interface definitions and callsites to include a tx count

This commit is contained in:
jonny rhea 2026-03-03 10:43:04 -06:00
parent 56f9dde1db
commit bcc8999722
11 changed files with 34 additions and 24 deletions

View file

@ -1865,11 +1865,11 @@ func (p *BlobPool) drop() {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
func (p *BlobPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) {
// If only plain transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if !filter.BlobTxs {
return nil
return nil, 0
}
// Track the amount of time waiting to retrieve the list of pending blob txs
// from the pool and the amount of time actually spent on assembling the data.
@ -1885,6 +1885,7 @@ func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*tx
pendtimeHist.Update(time.Since(execStart).Nanoseconds())
}()
var count int
pending := make(map[common.Address][]*txpool.LazyTransaction, len(p.index))
for addr, txs := range p.index {
lazies := make([]*txpool.LazyTransaction, 0, len(txs))
@ -1930,9 +1931,10 @@ func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*tx
}
if len(lazies) > 0 {
pending[addr] = lazies
count += len(lazies)
}
}
return pending
return pending, count
}
// updateStorageMetrics retrieves a bunch of stats from the data store and pushes

View file

@ -2122,7 +2122,7 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p := pool.Pending(txpool.PendingFilter{
p, _ := pool.Pending(txpool.PendingFilter{
MinTip: uint256.NewInt(1),
BaseFee: chain.basefee,
BlobFee: chain.blobfee,

View file

@ -494,15 +494,16 @@ func (pool *LegacyPool) ContentFrom(addr common.Address) ([]*types.Transaction,
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
func (pool *LegacyPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) {
// If only blob transactions are requested, this pool is unsuitable as it
// contains none, don't even bother.
if filter.BlobTxs {
return nil
return nil, 0
}
pool.mu.Lock()
defer pool.mu.Unlock()
var count int
pending := make(map[common.Address][]*txpool.LazyTransaction, len(pool.pending))
for addr, list := range pool.pending {
txs := list.Flatten()
@ -539,9 +540,10 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
}
}
pending[addr] = lazies
count += len(lazies)
}
}
return pending
return pending, count
}
// ValidateTxBasics checks whether a transaction is valid according to the consensus

View file

@ -154,7 +154,7 @@ type SubPool interface {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
Pending(filter PendingFilter) map[common.Address][]*LazyTransaction
Pending(filter PendingFilter) (map[common.Address][]*LazyTransaction, int)
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions

View file

@ -359,14 +359,17 @@ func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error {
//
// The transactions can also be pre-filtered by the dynamic fee components to
// reduce allocations and load on downstream subsystems.
func (p *TxPool) Pending(filter PendingFilter) map[common.Address][]*LazyTransaction {
func (p *TxPool) Pending(filter PendingFilter) (map[common.Address][]*LazyTransaction, int) {
var count int
txs := make(map[common.Address][]*LazyTransaction)
for _, subpool := range p.subpools {
for addr, set := range subpool.Pending(filter) {
txs[addr] = set
set, n := subpool.Pending(filter)
for addr, list := range set {
txs[addr] = list
}
count += n
}
return txs
return txs, count
}
// SubscribeTransactions registers a subscription for new transaction events,

View file

@ -347,7 +347,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
}
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {
pending := b.eth.txPool.Pending(txpool.PendingFilter{})
pending, _ := b.eth.txPool.Pending(txpool.PendingFilter{})
var txs types.Transactions
for _, batch := range pending {
for _, lazy := range batch {

View file

@ -283,7 +283,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
Method: "newPayloadV" + fmt.Sprintf("%d", version),
})
// Mark the payload as canon
// Mark the payload as canon
_, err = c.engineAPI.newPayload(npCtx, *payload, blobHashes, beaconRoot, requests, false)
npSpanEnd(&err)
if err != nil {
@ -366,7 +366,7 @@ func (c *SimulatedBeacon) Rollback() {
func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {
// Ensure no pending transactions.
c.eth.TxPool().Sync()
if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 {
if pending, _ := c.eth.TxPool().Pending(txpool.PendingFilter{}); len(pending) != 0 {
return errors.New("pending block dirty")
}
@ -380,7 +380,7 @@ func (c *SimulatedBeacon) Fork(parentHash common.Hash) error {
// AdjustTime creates a new block with an adjusted timestamp.
func (c *SimulatedBeacon) AdjustTime(adjustment time.Duration) error {
if len(c.eth.TxPool().Pending(txpool.PendingFilter{})) != 0 {
if pending, _ := c.eth.TxPool().Pending(txpool.PendingFilter{}); len(pending) != 0 {
return errors.New("could not adjust time on non-empty block")
}
parent := c.eth.BlockChain().CurrentBlock()

View file

@ -86,7 +86,7 @@ type txPool interface {
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction
Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int)
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions

View file

@ -128,10 +128,11 @@ func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error {
}
// Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction {
func (p *testTxPool) Pending(filter txpool.PendingFilter) (map[common.Address][]*txpool.LazyTransaction, int) {
p.lock.RLock()
defer p.lock.RUnlock()
var count int
batches := make(map[common.Address][]*types.Transaction)
for _, tx := range p.pool {
from, _ := types.Sender(types.HomesteadSigner{}, tx)
@ -152,9 +153,10 @@ func (p *testTxPool) Pending(filter txpool.PendingFilter) map[common.Address][]*
Gas: tx.Gas(),
BlobGas: tx.BlobGas(),
})
count++
}
}
return pending
return pending, count
}
// SubscribeTransactions should return an event subscription of NewTxsEvent and

View file

@ -25,7 +25,8 @@ import (
// syncTransactions starts sending all currently pending transactions to the given peer.
func (h *handler) syncTransactions(p *eth.Peer) {
var hashes []common.Hash
for _, batch := range h.txpool.Pending(txpool.PendingFilter{BlobTxs: false}) {
pending, _ := h.txpool.Pending(txpool.PendingFilter{BlobTxs: false})
for _, batch := range pending {
for _, tx := range batch {
hashes = append(hashes, tx.Hash)
}

View file

@ -549,7 +549,7 @@ func (miner *Miner) fillTransactions(ctx context.Context, interrupt *atomic.Int3
filter.GasLimitCap = params.MaxTxGas
}
filter.BlobTxs = false
pendingPlainTxs := miner.txpool.Pending(filter)
pendingPlainTxs, plainTxCount := miner.txpool.Pending(filter)
filter.BlobTxs = true
if miner.chainConfig.IsOsaka(env.header.Number, env.header.Time) {
@ -557,10 +557,10 @@ func (miner *Miner) fillTransactions(ctx context.Context, interrupt *atomic.Int3
} else {
filter.BlobVersion = types.BlobSidecarVersion0
}
pendingBlobTxs := miner.txpool.Pending(filter)
pendingBlobTxs, blobTxCount := miner.txpool.Pending(filter)
span.SetAttributes(
telemetry.Int64Attribute("pending.plain.count", int64(len(pendingPlainTxs))),
telemetry.Int64Attribute("pending.blob.count", int64(len(pendingBlobTxs))),
telemetry.Int64Attribute("pending.plain.count", int64(plainTxCount)),
telemetry.Int64Attribute("pending.blob.count", int64(blobTxCount)),
)
// Split the pending transactions into locals and remotes.