core, metrics: switch some invalid counters to gauges (#20047)

This commit is contained in:
Daniel Liu 2024-05-10 19:25:45 +08:00
parent 67825d860b
commit 742a7f9348
2 changed files with 58 additions and 20 deletions

View file

@ -110,9 +110,9 @@ var (
invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil)
underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil)
pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil)
queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil)
localCounter = metrics.NewRegisteredCounter("txpool/local", nil)
pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
)
// TxStatus is the current status of a transaction as seen by the pool.
@ -730,7 +730,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
}
}
if local || pool.locals.contains(from) {
localCounter.Inc(1)
localGauge.Inc(1)
}
pool.journalTx(from, tx)
@ -760,7 +760,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
queuedReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the queued counter
queuedCounter.Inc(1)
queuedGauge.Inc(1)
}
if pool.all.Get(hash) == nil {
pool.all.Add(tx)
@ -809,7 +809,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
pendingReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the pending counter
pendingCounter.Inc(1)
pendingGauge.Inc(1)
}
// Failsafe to work around direct pending inserts (tests)
if pool.all.Get(hash) == nil {
@ -840,7 +840,7 @@ func (pool *TxPool) promoteSpecialTx(addr common.Address, tx *types.Transaction)
pendingReplaceMeter.Mark(1)
} else {
// Nothing was replaced, bump the pending counter
pendingCounter.Inc(1)
pendingGauge.Inc(1)
}
list.txs.Put(tx)
if cost := tx.Cost(); list.costcap.Cmp(cost) < 0 {
@ -980,7 +980,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
pool.priced.Removed(1)
}
if pool.locals.contains(addr) {
localCounter.Dec(1)
localGauge.Dec(1)
}
// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
@ -997,7 +997,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// Update the account nonce if needed
pool.pendingNonces.setIfLower(addr, tx.Nonce())
// Reduce the pending counter
pendingCounter.Dec(int64(1 + len(invalids)))
pendingGauge.Dec(int64(1 + len(invalids)))
return
}
}
@ -1005,7 +1005,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
if future := pool.queue[addr]; future != nil {
if removed, _ := future.Remove(tx); removed {
// Reduce the queued counter
queuedCounter.Dec(1)
queuedGauge.Dec(1)
}
if future.Empty() {
delete(pool.queue, addr)
@ -1313,7 +1313,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
promoted = append(promoted, tx)
}
}
queuedCounter.Dec(int64(len(readies)))
queuedGauge.Dec(int64(len(readies)))
// Drop all transactions over the allowed limit
var caps types.Transactions
@ -1328,9 +1328,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
}
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
if pool.locals.contains(addr) {
localCounter.Dec(int64(len(forwards) + len(drops) + len(caps)))
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
@ -1389,9 +1389,9 @@ func (pool *TxPool) truncatePending() {
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pool.priced.Removed(len(caps))
pendingCounter.Dec(int64(len(caps)))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(offenders[i]) {
localCounter.Dec(int64(len(caps)))
localGauge.Dec(int64(len(caps)))
}
pending--
}
@ -1416,9 +1416,9 @@ func (pool *TxPool) truncatePending() {
log.Trace("Removed fairness-exceeding pending transaction", "hash", hash)
}
pool.priced.Removed(len(caps))
pendingCounter.Dec(int64(len(caps)))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(addr) {
localCounter.Dec(int64(len(caps)))
localGauge.Dec(int64(len(caps)))
}
pending--
}
@ -1506,9 +1506,9 @@ func (pool *TxPool) demoteUnexecutables() {
log.Trace("Demoting pending transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
pendingCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
if pool.locals.contains(addr) {
localCounter.Dec(int64(len(olds) + len(drops) + len(invalids)))
localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
}
// If there's a gap in front, alert (should never happen) and postpone all transactions
if list.Len() > 0 && list.txs.Get(nonce) == nil {
@ -1518,7 +1518,7 @@ func (pool *TxPool) demoteUnexecutables() {
log.Warn("Demoting invalidated transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
pendingCounter.Dec(int64(len(gapped)))
pendingGauge.Dec(int64(len(gapped)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {

View file

@ -6,6 +6,8 @@ import "sync/atomic"
type Gauge interface {
Snapshot() Gauge
Update(int64)
Dec(int64)
Inc(int64)
Value() int64
}
@ -65,6 +67,16 @@ func (GaugeSnapshot) Update(int64) {
panic("Update called on a GaugeSnapshot")
}
// Dec panics.
func (GaugeSnapshot) Dec(int64) {
panic("Dec called on a GaugeSnapshot")
}
// Inc panics.
func (GaugeSnapshot) Inc(int64) {
panic("Inc called on a GaugeSnapshot")
}
// Value returns the value at the time the snapshot was taken.
func (g GaugeSnapshot) Value() int64 { return int64(g) }
@ -77,6 +89,12 @@ func (NilGauge) Snapshot() Gauge { return NilGauge{} }
// Update is a no-op.
func (NilGauge) Update(v int64) {}
// Dec is a no-op.
func (NilGauge) Dec(i int64) {}
// Inc is a no-op.
func (NilGauge) Inc(i int64) {}
// Value is a no-op.
func (NilGauge) Value() int64 { return 0 }
@ -101,6 +119,16 @@ func (g *StandardGauge) Value() int64 {
return atomic.LoadInt64(&g.value)
}
// Dec decrements the gauge's current value by the given amount.
func (g *StandardGauge) Dec(i int64) {
atomic.AddInt64(&g.value, -i)
}
// Inc increments the gauge's current value by the given amount.
func (g *StandardGauge) Inc(i int64) {
atomic.AddInt64(&g.value, i)
}
// FunctionalGauge returns value from given function
type FunctionalGauge struct {
value func() int64
@ -118,3 +146,13 @@ func (g FunctionalGauge) Snapshot() Gauge { return GaugeSnapshot(g.Value()) }
func (FunctionalGauge) Update(int64) {
panic("Update called on a FunctionalGauge")
}
// Dec panics.
func (FunctionalGauge) Dec(int64) {
panic("Dec called on a FunctionalGauge")
}
// Inc panics.
func (FunctionalGauge) Inc(int64) {
panic("Inc called on a FunctionalGauge")
}