core/filtermaps: add metrics (#31511)

This PR adds metrics related to map rendering and pattern matching to
the `core/filtermaps` package.
This commit is contained in:
Felföldi Zsolt 2025-04-01 14:29:20 +02:00 committed by GitHub
parent 4add312c8a
commit 7e3170fb5c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 54 additions and 1 deletions

View file

@ -30,6 +30,23 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
var (
mapCountGauge = metrics.NewRegisteredGauge("filtermaps/maps/count", nil) // actual number of rendered maps
mapLogValueMeter = metrics.NewRegisteredMeter("filtermaps/maps/logvalues", nil) // number of log values processed
mapBlockMeter = metrics.NewRegisteredMeter("filtermaps/maps/blocks", nil) // number of block delimiters processed
mapRenderTimer = metrics.NewRegisteredTimer("filtermaps/maps/rendertime", nil) // time elapsed while rendering a single map
mapWriteTimer = metrics.NewRegisteredTimer("filtermaps/maps/writetime", nil) // time elapsed while writing a batch of finished maps to db
matchRequestTimer = metrics.NewRegisteredTimer("filtermaps/match/requesttime", nil) // processing time a matching request in a single epoch
matchEpochTimer = metrics.NewRegisteredTimer("filtermaps/match/epochtime", nil) // total processing time a matching request
matchBaseRowAccessMeter = metrics.NewRegisteredMeter("filtermaps/match/baserowaccess", nil) // number of accessed rows on layer 0
matchBaseRowSizeMeter = metrics.NewRegisteredMeter("filtermaps/match/baserowsize", nil) // size of accessed rows on layer 0
matchExtRowAccessMeter = metrics.NewRegisteredMeter("filtermaps/match/extrowaccess", nil) // number of accessed rows on higher layers
matchExtRowSizeMeter = metrics.NewRegisteredMeter("filtermaps/match/extrowsize", nil) // size of accessed rows on higher layers
matchLogLookup = metrics.NewRegisteredMeter("filtermaps/match/loglookup", nil) // number of log lookups based on potential matches
matchAllMeter = metrics.NewRegisteredMeter("filtermaps/match/matchall", nil) // number of requests returned with ErrMatchAll
)
const (
@ -429,8 +446,12 @@ func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, ne
TailPartialEpoch: newRange.tailPartialEpoch,
}
rawdb.WriteFilterMapsRange(batch, rs)
if !isTempRange {
mapCountGauge.Update(int64(newRange.maps.Count() + newRange.tailPartialEpoch))
}
} else {
rawdb.DeleteFilterMapsRange(batch)
mapCountGauge.Update(0)
}
}

View file

@ -21,6 +21,7 @@ import (
"fmt"
"math"
"sort"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
@ -301,6 +302,11 @@ func (r *mapRenderer) run(stopCb func() bool, writeCb func()) (bool, error) {
// renderCurrentMap renders a single map.
func (r *mapRenderer) renderCurrentMap(stopCb func() bool) (bool, error) {
var (
totalTime time.Duration
logValuesProcessed, blocksProcessed int64
)
start := time.Now()
if !r.iterator.updateChainView(r.f.targetView) {
return false, errChainUpdate
}
@ -316,9 +322,11 @@ func (r *mapRenderer) renderCurrentMap(stopCb func() bool) (bool, error) {
for r.iterator.lvIndex < uint64(r.currentMap.mapIndex+1)<<r.f.logValuesPerMap && !r.iterator.finished {
waitCnt++
if waitCnt >= valuesPerCallback {
totalTime += time.Since(start)
if stopCb() {
return false, nil
}
start = time.Now()
if !r.iterator.updateChainView(r.f.targetView) {
return false, errChainUpdate
}
@ -343,8 +351,10 @@ func (r *mapRenderer) renderCurrentMap(stopCb func() bool) (bool, error) {
return false, fmt.Errorf("failed to advance log iterator at %d while rendering map %d: %v", r.iterator.lvIndex, r.currentMap.mapIndex, err)
}
if !r.iterator.skipToBoundary {
logValuesProcessed++
r.currentMap.lastBlock = r.iterator.blockNumber
if r.iterator.blockStart {
blocksProcessed++
r.currentMap.blockLvPtrs = append(r.currentMap.blockLvPtrs, r.iterator.lvIndex)
}
if !r.f.testDisableSnapshots && r.renderBefore >= r.f.indexedRange.maps.AfterLast() &&
@ -358,12 +368,18 @@ func (r *mapRenderer) renderCurrentMap(stopCb func() bool) (bool, error) {
r.currentMap.headDelimiter = r.iterator.lvIndex
}
r.currentMap.lastBlockId = r.f.targetView.getBlockId(r.currentMap.lastBlock)
totalTime += time.Since(start)
mapRenderTimer.Update(totalTime)
mapLogValueMeter.Mark(logValuesProcessed)
mapBlockMeter.Mark(blocksProcessed)
return true, nil
}
// writeFinishedMaps writes rendered maps to the database and updates
// filterMapsRange and indexedView accordingly.
func (r *mapRenderer) writeFinishedMaps(pauseCb func() bool) error {
var totalTime time.Duration
start := time.Now()
if len(r.finishedMaps) == 0 {
return nil
}
@ -379,7 +395,7 @@ func (r *mapRenderer) writeFinishedMaps(pauseCb func() bool) error {
if err != nil {
return fmt.Errorf("failed to get updated rendered range: %v", err)
}
renderedView := r.f.targetView // stopCb callback might still change targetView while writing finished maps
renderedView := r.f.targetView // pauseCb callback might still change targetView while writing finished maps
batch := r.f.db.NewBatch()
var writeCnt int
@ -393,7 +409,9 @@ func (r *mapRenderer) writeFinishedMaps(pauseCb func() bool) error {
// do not exit while in partially written state but do allow processing
// events and pausing while block processing is in progress
r.f.indexLock.Unlock()
totalTime += time.Since(start)
pauseCb()
start = time.Now()
r.f.indexLock.Lock()
batch = r.f.db.NewBatch()
}
@ -477,6 +495,8 @@ func (r *mapRenderer) writeFinishedMaps(pauseCb func() bool) error {
if err := batch.Write(); err != nil {
log.Crit("Error writing log index update batch", "error", err)
}
totalTime += time.Since(start)
mapWriteTimer.Update(totalTime)
return nil
}

View file

@ -125,6 +125,7 @@ func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock
start := time.Now()
res, err := m.process()
matchRequestTimer.Update(time.Since(start))
if doRuntimeStats {
log.Info("Log search finished", "elapsed", time.Since(start))
@ -202,6 +203,7 @@ func (m *matcherEnv) process() ([]*types.Log, error) {
logs = append(logs, tasks[waitEpoch].logs...)
if err := tasks[waitEpoch].err; err != nil {
if err == ErrMatchAll {
matchAllMeter.Mark(1)
return logs, err
}
return logs, fmt.Errorf("failed to process log index epoch %d: %v", waitEpoch, err)
@ -220,6 +222,7 @@ func (m *matcherEnv) process() ([]*types.Log, error) {
// processEpoch returns the potentially matching logs from the given epoch.
func (m *matcherEnv) processEpoch(epochIndex uint32) ([]*types.Log, error) {
start := time.Now()
var logs []*types.Log
// create a list of map indices to process
fm, lm := epochIndex<<m.params.logMapsPerEpoch, (epochIndex+1)<<m.params.logMapsPerEpoch-1
@ -254,6 +257,7 @@ func (m *matcherEnv) processEpoch(epochIndex uint32) ([]*types.Log, error) {
logs = append(logs, mlogs...)
}
m.getLogStats.addAmount(st, int64(len(logs)))
matchEpochTimer.Update(time.Since(start))
return logs, nil
}
@ -273,6 +277,7 @@ func (m *matcherEnv) getLogsFromMatches(matches potentialMatches) ([]*types.Log,
if log != nil {
logs = append(logs, log)
}
matchLogLookup.Mark(1)
}
return logs, nil
}
@ -381,6 +386,13 @@ func (m *singleMatcherInstance) getMatchesForLayer(ctx context.Context, layerInd
m.stats.setState(&st, stNone)
return nil, fmt.Errorf("failed to retrieve filter map %d row %d: %v", mapIndex, rowIndex, err)
}
if layerIndex == 0 {
matchBaseRowAccessMeter.Mark(1)
matchBaseRowSizeMeter.Mark(int64(len(filterRow)))
} else {
matchExtRowAccessMeter.Mark(1)
matchExtRowSizeMeter.Mark(int64(len(filterRow)))
}
m.stats.addAmount(st, int64(len(filterRow)))
m.stats.setState(&st, stOther)
filterRows = append(filterRows, filterRow)