diff --git a/core/tx_pool.go b/core/tx_pool.go index f097b25501..fab4995c36 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -110,9 +110,9 @@ var ( invalidTxMeter = metrics.NewRegisteredMeter("txpool/invalid", nil) underpricedTxMeter = metrics.NewRegisteredMeter("txpool/underpriced", nil) - pendingCounter = metrics.NewRegisteredCounter("txpool/pending", nil) - queuedCounter = metrics.NewRegisteredCounter("txpool/queued", nil) - localCounter = metrics.NewRegisteredCounter("txpool/local", nil) + pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) + queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) + localGauge = metrics.NewRegisteredGauge("txpool/local", nil) ) // TxStatus is the current status of a transaction as seen by the pool. @@ -730,7 +730,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e } } if local || pool.locals.contains(from) { - localCounter.Inc(1) + localGauge.Inc(1) } pool.journalTx(from, tx) @@ -760,7 +760,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er queuedReplaceMeter.Mark(1) } else { // Nothing was replaced, bump the queued counter - queuedCounter.Inc(1) + queuedGauge.Inc(1) } if pool.all.Get(hash) == nil { pool.all.Add(tx) @@ -809,7 +809,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pendingReplaceMeter.Mark(1) } else { // Nothing was replaced, bump the pending counter - pendingCounter.Inc(1) + pendingGauge.Inc(1) } // Failsafe to work around direct pending inserts (tests) if pool.all.Get(hash) == nil { @@ -840,7 +840,7 @@ func (pool *TxPool) promoteSpecialTx(addr common.Address, tx *types.Transaction) pendingReplaceMeter.Mark(1) } else { // Nothing was replaced, bump the pending counter - pendingCounter.Inc(1) + pendingGauge.Inc(1) } list.txs.Put(tx) if cost := tx.Cost(); list.costcap.Cmp(cost) < 0 { @@ -980,7 +980,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { pool.priced.Removed(1) } if pool.locals.contains(addr) { - localCounter.Dec(1) + localGauge.Dec(1) } // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { @@ -997,7 +997,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { // Update the account nonce if needed pool.pendingNonces.setIfLower(addr, tx.Nonce()) // Reduce the pending counter - pendingCounter.Dec(int64(1 + len(invalids))) + pendingGauge.Dec(int64(1 + len(invalids))) return } } @@ -1005,7 +1005,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { if future := pool.queue[addr]; future != nil { if removed, _ := future.Remove(tx); removed { // Reduce the queued counter - queuedCounter.Dec(1) + queuedGauge.Dec(1) } if future.Empty() { delete(pool.queue, addr) @@ -1313,7 +1313,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans promoted = append(promoted, tx) } } - queuedCounter.Dec(int64(len(readies))) + queuedGauge.Dec(int64(len(readies))) // Drop all transactions over the allowed limit var caps types.Transactions @@ -1328,9 +1328,9 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans } // Mark all the items dropped as removed pool.priced.Removed(len(forwards) + len(drops) + len(caps)) - queuedCounter.Dec(int64(len(forwards) + len(drops) + len(caps))) + queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) if pool.locals.contains(addr) { - localCounter.Dec(int64(len(forwards) + len(drops) + len(caps))) + localGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) } // Delete the entire queue entry if it became empty. if list.Empty() { @@ -1389,9 +1389,9 @@ func (pool *TxPool) truncatePending() { log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pool.priced.Removed(len(caps)) - pendingCounter.Dec(int64(len(caps))) + pendingGauge.Dec(int64(len(caps))) if pool.locals.contains(offenders[i]) { - localCounter.Dec(int64(len(caps))) + localGauge.Dec(int64(len(caps))) } pending-- } @@ -1416,9 +1416,9 @@ func (pool *TxPool) truncatePending() { log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pool.priced.Removed(len(caps)) - pendingCounter.Dec(int64(len(caps))) + pendingGauge.Dec(int64(len(caps))) if pool.locals.contains(addr) { - localCounter.Dec(int64(len(caps))) + localGauge.Dec(int64(len(caps))) } pending-- } @@ -1506,9 +1506,9 @@ func (pool *TxPool) demoteUnexecutables() { log.Trace("Demoting pending transaction", "hash", hash) pool.enqueueTx(hash, tx) } - pendingCounter.Dec(int64(len(olds) + len(drops) + len(invalids))) + pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) if pool.locals.contains(addr) { - localCounter.Dec(int64(len(olds) + len(drops) + len(invalids))) + localGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) } // If there's a gap in front, alert (should never happen) and postpone all transactions if list.Len() > 0 && list.txs.Get(nonce) == nil { @@ -1518,7 +1518,7 @@ func (pool *TxPool) demoteUnexecutables() { log.Warn("Demoting invalidated transaction", "hash", hash) pool.enqueueTx(hash, tx) } - pendingCounter.Dec(int64(len(gapped))) + pendingGauge.Dec(int64(len(gapped))) } // Delete the entire queue entry if it became empty. if list.Empty() { diff --git a/metrics/gauge.go b/metrics/gauge.go index 0fbfdb8603..b6b2758b0d 100644 --- a/metrics/gauge.go +++ b/metrics/gauge.go @@ -6,6 +6,8 @@ import "sync/atomic" type Gauge interface { Snapshot() Gauge Update(int64) + Dec(int64) + Inc(int64) Value() int64 } @@ -65,6 +67,16 @@ func (GaugeSnapshot) Update(int64) { panic("Update called on a GaugeSnapshot") } +// Dec panics. +func (GaugeSnapshot) Dec(int64) { + panic("Dec called on a GaugeSnapshot") +} + +// Inc panics. +func (GaugeSnapshot) Inc(int64) { + panic("Inc called on a GaugeSnapshot") +} + // Value returns the value at the time the snapshot was taken. func (g GaugeSnapshot) Value() int64 { return int64(g) } @@ -77,6 +89,12 @@ func (NilGauge) Snapshot() Gauge { return NilGauge{} } // Update is a no-op. func (NilGauge) Update(v int64) {} +// Dec is a no-op. +func (NilGauge) Dec(i int64) {} + +// Inc is a no-op. +func (NilGauge) Inc(i int64) {} + // Value is a no-op. func (NilGauge) Value() int64 { return 0 } @@ -101,6 +119,16 @@ func (g *StandardGauge) Value() int64 { return atomic.LoadInt64(&g.value) } +// Dec decrements the gauge's current value by the given amount. +func (g *StandardGauge) Dec(i int64) { + atomic.AddInt64(&g.value, -i) +} + +// Inc increments the gauge's current value by the given amount. +func (g *StandardGauge) Inc(i int64) { + atomic.AddInt64(&g.value, i) +} + // FunctionalGauge returns value from given function type FunctionalGauge struct { value func() int64 @@ -118,3 +146,13 @@ func (g FunctionalGauge) Snapshot() Gauge { return GaugeSnapshot(g.Value()) } func (FunctionalGauge) Update(int64) { panic("Update called on a FunctionalGauge") } + +// Dec panics. +func (FunctionalGauge) Dec(int64) { + panic("Dec called on a FunctionalGauge") +} + +// Inc panics. +func (FunctionalGauge) Inc(int64) { + panic("Inc called on a FunctionalGauge") +}