core/txpool/blobpool: fix gapped queue size cap (#34831)
Some checks failed
/ Linux Build (push) Has been cancelled
/ Linux Build (arm) (push) Has been cancelled
/ Keeper Build (push) Has been cancelled
/ Windows Build (push) Has been cancelled
/ Docker Image (push) Has been cancelled

the gapped queue cap was effectively per-sender rather than total — a
sender pool spread across enough distinct addresses could grow
`p.gapped` well past `maxGapped`, defeating the resource bound.

`maxGapped` was being compared against `len(p.gapped)`, which is a
`map[address][]tx` and counts unique senders, not queued txs. Switched
the check to `len(p.gappedSource)` (keyed by tx hash, so its length is
the real total). Also wired up a `blobpool/gapped/count` gauge plus
`promoted`, `evicted`, and `gappedfull` meters so queue size and churn
are actually observable in prod.
This commit is contained in:
Rahman 2026-05-02 05:29:21 -06:00 committed by GitHub
parent 7155c65abb
commit f0b21fa110
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 15 additions and 3 deletions

View file

@ -1596,9 +1596,10 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
// Store the tx in memory, and revalidate later // Store the tx in memory, and revalidate later
from, _ := types.Sender(p.signer, tx) from, _ := types.Sender(p.signer, tx)
allowance := p.gappedAllowance(from) allowance := p.gappedAllowance(from)
if allowance >= 1 && len(p.gapped) < maxGapped { if allowance >= 1 && len(p.gappedSource) < maxGapped {
p.gapped[from] = append(p.gapped[from], tx) p.gapped[from] = append(p.gapped[from], tx)
p.gappedSource[tx.Hash()] = from p.gappedSource[tx.Hash()] = from
gappedGauge.Update(int64(len(p.gappedSource)))
log.Trace("added tx to gapped blob queue", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from])) log.Trace("added tx to gapped blob queue", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from]))
return nil return nil
} else { } else {
@ -1606,6 +1607,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
// transactions by keeping the old and dropping this one. // transactions by keeping the old and dropping this one.
// Thus replacing a gapped transaction with another gapped transaction // Thus replacing a gapped transaction with another gapped transaction
// is discouraged. // is discouraged.
addGappedFullMeter.Mark(1)
log.Trace("no gapped blob queue allowance", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from])) log.Trace("no gapped blob queue allowance", "allowance", allowance, "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from]))
} }
case errors.Is(err, core.ErrInsufficientFunds): case errors.Is(err, core.ErrInsufficientFunds):
@ -1791,6 +1793,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
// We do not recurse here, but continue to loop instead. // We do not recurse here, but continue to loop instead.
// We are under lock, so we can add the transaction directly. // We are under lock, so we can add the transaction directly.
if err := p.addLocked(tx, false); err == nil { if err := p.addLocked(tx, false); err == nil {
gappedPromotedMeter.Mark(1)
log.Trace("Gapped blob transaction added to pool", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from])) log.Trace("Gapped blob transaction added to pool", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "qlen", len(p.gapped[from]))
} else { } else {
log.Trace("Gapped blob transaction not accepted", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "err", err) log.Trace("Gapped blob transaction not accepted", "hash", tx.Hash(), "from", from, "nonce", tx.Nonce(), "err", err)
@ -1802,6 +1805,7 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
} else { } else {
p.gapped[from] = gtxs p.gapped[from] = gtxs
} }
gappedGauge.Update(int64(len(p.gappedSource)))
} }
return nil return nil
} }
@ -2069,8 +2073,9 @@ func (p *BlobPool) evictGapped() {
keep = append(keep, gtx) keep = append(keep, gtx)
} }
} }
if len(keep) < len(txs) { if evicted := len(txs) - len(keep); evicted > 0 {
log.Trace("Evicting old gapped blob transactions", "count", len(txs)-len(keep), "from", from) gappedEvictedMeter.Mark(int64(evicted))
log.Trace("Evicting old gapped blob transactions", "count", evicted, "from", from)
} }
if len(keep) == 0 { if len(keep) == 0 {
delete(p.gapped, from) delete(p.gapped, from)
@ -2078,6 +2083,7 @@ func (p *BlobPool) evictGapped() {
p.gapped[from] = keep p.gapped[from] = keep
} }
} }
gappedGauge.Update(int64(len(p.gappedSource)))
} }
// isAnnouncable checks whether a transaction is announcable based on its // isAnnouncable checks whether a transaction is announcable based on its

View file

@ -97,9 +97,15 @@ var (
addUnderpricedMeter = metrics.NewRegisteredMeter("blobpool/add/underpriced", nil) // Gas tip too low, neutral addUnderpricedMeter = metrics.NewRegisteredMeter("blobpool/add/underpriced", nil) // Gas tip too low, neutral
addStaleMeter = metrics.NewRegisteredMeter("blobpool/add/stale", nil) // Nonce already filled, reject, bad-ish addStaleMeter = metrics.NewRegisteredMeter("blobpool/add/stale", nil) // Nonce already filled, reject, bad-ish
addGappedMeter = metrics.NewRegisteredMeter("blobpool/add/gapped", nil) // Nonce gapped, reject, bad-ish addGappedMeter = metrics.NewRegisteredMeter("blobpool/add/gapped", nil) // Nonce gapped, reject, bad-ish
addGappedFullMeter = metrics.NewRegisteredMeter("blobpool/add/gappedfull", nil) // Gapped queue full, reject, neutral
addOverdraftedMeter = metrics.NewRegisteredMeter("blobpool/add/overdrafted", nil) // Balance exceeded, reject, neutral addOverdraftedMeter = metrics.NewRegisteredMeter("blobpool/add/overdrafted", nil) // Balance exceeded, reject, neutral
addOvercappedMeter = metrics.NewRegisteredMeter("blobpool/add/overcapped", nil) // Per-account cap exceeded, reject, neutral addOvercappedMeter = metrics.NewRegisteredMeter("blobpool/add/overcapped", nil) // Per-account cap exceeded, reject, neutral
addNoreplaceMeter = metrics.NewRegisteredMeter("blobpool/add/noreplace", nil) // Replacement fees or tips too low, neutral addNoreplaceMeter = metrics.NewRegisteredMeter("blobpool/add/noreplace", nil) // Replacement fees or tips too low, neutral
addNonExclusiveMeter = metrics.NewRegisteredMeter("blobpool/add/nonexclusive", nil) // Plain transaction from same account exists, reject, neutral addNonExclusiveMeter = metrics.NewRegisteredMeter("blobpool/add/nonexclusive", nil) // Plain transaction from same account exists, reject, neutral
addValidMeter = metrics.NewRegisteredMeter("blobpool/add/valid", nil) // Valid transaction, add, neutral addValidMeter = metrics.NewRegisteredMeter("blobpool/add/valid", nil) // Valid transaction, add, neutral
// Gapped queue metrics for observability
gappedGauge = metrics.NewRegisteredGauge("blobpool/gapped/count", nil) // Current gapped queue size
gappedPromotedMeter = metrics.NewRegisteredMeter("blobpool/gapped/promoted", nil) // Gapped txs successfully promoted to pool
gappedEvictedMeter = metrics.NewRegisteredMeter("blobpool/gapped/evicted", nil) // Gapped txs evicted due to timeout/stale
) )