core/filtermaps: remove filter base row cache, add group read (#31852)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Docker Image (push) Waiting to run

This PR changes the database access of the base part of filter rows that
are stored in groups of 32 adjacent maps for improved database storage
size and data access efficiency.
Before this grouped storage was introduced, filter rows were not cached
because the access pattern of either the index rendering or the search
does not really benefit from caching. Also no mutex was necessary for
filter row access. Storing adjacent rows in groups complicated the
situation as a search typically required reading all or most of adjacent
rows of a group, so in order to implement the single row read operation
without having to read the entire group up to 32 times, a cache for the
base row groups was added. This also introduced data race issues for
concurrenct read/write in the same group which was avoided by locking
the `indexLock` mutex. Unfortunately this also led to slowed down or
temporarily blocked search operations when indexing was in progress.
This PR returns to the original concept of uncached, no-mutex filter map
access by increasing read efficiency in a better way; similiarly to
write operations that already operate on groups of filter maps, now
`getFilterMapRow` is also replaced by `getFilterMapRows` that accepts a
single `rowIndex` and a list of `mapIndices`. It slightly complicates
`singleMatcherInstance.getMatchesForLayer` which now has to collect
groups of map indices accessed in the same row, but in exchange it
guarantees maximum read efficiency while avoiding read/write mutex
interference.

Note: a follow-up refactoring is WIP that further changes the database
access scheme by prodiving an immutable index view to the matcher, makes
the whole indexer more straightforward with no callbacks, and entirely
removes the concept of matcher syncing with `validBlocks` and the
resulting multiple retry logic in `eth/filters/filter.go`. This might
take a bit longer to finish though and in the meantime this change could
hopefully already solve the blocked request issues.
This commit is contained in:
Felföldi Zsolt 2025-06-03 12:54:13 +02:00 committed by GitHub
parent a7d9b52eaf
commit 91900e79ea
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 102 additions and 86 deletions

View file

@ -53,7 +53,6 @@ const (
databaseVersion = 2 // reindexed if database version does not match
cachedLastBlocks = 1000 // last block of map pointers
cachedLvPointers = 1000 // first log value pointer of block pointers
cachedBaseRows = 100 // groups of base layer filter row data
cachedFilterMaps = 3 // complete filter maps (cached by map renderer)
cachedRenderSnapshots = 8 // saved map renderer data at block boundaries
)
@ -101,7 +100,6 @@ type FilterMaps struct {
filterMapCache *lru.Cache[uint32, filterMap]
lastBlockCache *lru.Cache[uint32, lastBlockOfMap]
lvPointerCache *lru.Cache[uint64, uint64]
baseRowsCache *lru.Cache[uint64, [][]uint32]
// the matchers set and the fields of FilterMapsMatcherBackend instances are
// read and written both by exported functions and the indexer.
@ -264,7 +262,6 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps),
lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks),
lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers),
baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows),
renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots),
}
f.checkRevertRange() // revert maps that are inconsistent with the current chain view
@ -348,7 +345,6 @@ func (f *FilterMaps) reset() {
f.renderSnapshots.Purge()
f.lastBlockCache.Purge()
f.lvPointerCache.Purge()
f.baseRowsCache.Purge()
f.indexLock.Unlock()
// deleting the range first ensures that resetDb will be called again at next
// startup and any leftover data will be removed even if it cannot finish now.
@ -565,47 +561,69 @@ func (f *FilterMaps) getFilterMap(mapIndex uint32) (filterMap, error) {
}
fm := make(filterMap, f.mapHeight)
for rowIndex := range fm {
var err error
fm[rowIndex], err = f.getFilterMapRow(mapIndex, uint32(rowIndex), false)
rows, err := f.getFilterMapRows([]uint32{mapIndex}, uint32(rowIndex), false)
if err != nil {
return nil, fmt.Errorf("failed to load filter map %d from database: %v", mapIndex, err)
}
fm[rowIndex] = rows[0]
}
f.filterMapCache.Add(mapIndex, fm)
return fm, nil
}
// getFilterMapRow fetches the given filter map row. If baseLayerOnly is true
// then only the first baseRowLength entries are returned.
func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error) {
baseMapRowIndex := f.mapRowIndex(mapIndex&-f.baseRowGroupLength, rowIndex)
baseRows, ok := f.baseRowsCache.Get(baseMapRowIndex)
if !ok {
var err error
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
if err != nil {
return nil, fmt.Errorf("failed to retrieve filter map %d base rows %d: %v", mapIndex, rowIndex, err)
// getFilterMapRows fetches a set of filter map rows at the corresponding map
// indices and a shared row index. If baseLayerOnly is true then only the first
// baseRowLength entries are returned.
func (f *FilterMaps) getFilterMapRows(mapIndices []uint32, rowIndex uint32, baseLayerOnly bool) ([]FilterRow, error) {
rows := make([]FilterRow, len(mapIndices))
var ptr int
for len(mapIndices) > ptr {
baseRowGroup := mapIndices[ptr] / f.baseRowGroupLength
groupLength := 1
for ptr+groupLength < len(mapIndices) && mapIndices[ptr+groupLength]/f.baseRowGroupLength == baseRowGroup {
groupLength++
}
f.baseRowsCache.Add(baseMapRowIndex, baseRows)
if err := f.getFilterMapRowsOfGroup(rows[ptr:ptr+groupLength], mapIndices[ptr:ptr+groupLength], rowIndex, baseLayerOnly); err != nil {
return nil, err
}
ptr += groupLength
}
baseRow := slices.Clone(baseRows[mapIndex&(f.baseRowGroupLength-1)])
if baseLayerOnly {
return baseRow, nil
}
extRow, err := rawdb.ReadFilterMapExtRow(f.db, f.mapRowIndex(mapIndex, rowIndex), f.logMapWidth)
return rows, nil
}
// getFilterMapRowsOfGroup fetches a set of filter map rows at map indices
// belonging to the same base row group.
func (f *FilterMaps) getFilterMapRowsOfGroup(target []FilterRow, mapIndices []uint32, rowIndex uint32, baseLayerOnly bool) error {
baseRowGroup := mapIndices[0] / f.baseRowGroupLength
baseMapRowIndex := f.mapRowIndex(baseRowGroup*f.baseRowGroupLength, rowIndex)
baseRows, err := rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
if err != nil {
return nil, fmt.Errorf("failed to retrieve filter map %d extended row %d: %v", mapIndex, rowIndex, err)
return fmt.Errorf("failed to retrieve base row group %d of row %d: %v", baseRowGroup, rowIndex, err)
}
return FilterRow(append(baseRow, extRow...)), nil
for i, mapIndex := range mapIndices {
if mapIndex/f.baseRowGroupLength != baseRowGroup {
panic("mapIndices are not in the same base row group")
}
row := baseRows[mapIndex&(f.baseRowGroupLength-1)]
if !baseLayerOnly {
extRow, err := rawdb.ReadFilterMapExtRow(f.db, f.mapRowIndex(mapIndex, rowIndex), f.logMapWidth)
if err != nil {
return fmt.Errorf("failed to retrieve filter map %d extended row %d: %v", mapIndex, rowIndex, err)
}
row = append(row, extRow...)
}
target[i] = row
}
return nil
}
// storeFilterMapRows stores a set of filter map rows at the corresponding map
// indices and a shared row index.
func (f *FilterMaps) storeFilterMapRows(batch ethdb.Batch, mapIndices []uint32, rowIndex uint32, rows []FilterRow) error {
for len(mapIndices) > 0 {
baseMapIndex := mapIndices[0] & -f.baseRowGroupLength
baseRowGroup := mapIndices[0] / f.baseRowGroupLength
groupLength := 1
for groupLength < len(mapIndices) && mapIndices[groupLength]&-f.baseRowGroupLength == baseMapIndex {
for groupLength < len(mapIndices) && mapIndices[groupLength]/f.baseRowGroupLength == baseRowGroup {
groupLength++
}
if err := f.storeFilterMapRowsOfGroup(batch, mapIndices[:groupLength], rowIndex, rows[:groupLength]); err != nil {
@ -619,26 +637,20 @@ func (f *FilterMaps) storeFilterMapRows(batch ethdb.Batch, mapIndices []uint32,
// storeFilterMapRowsOfGroup stores a set of filter map rows at map indices
// belonging to the same base row group.
func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ethdb.Batch, mapIndices []uint32, rowIndex uint32, rows []FilterRow) error {
baseMapIndex := mapIndices[0] & -f.baseRowGroupLength
baseMapRowIndex := f.mapRowIndex(baseMapIndex, rowIndex)
baseRowGroup := mapIndices[0] / f.baseRowGroupLength
baseMapRowIndex := f.mapRowIndex(baseRowGroup*f.baseRowGroupLength, rowIndex)
var baseRows [][]uint32
if uint32(len(mapIndices)) != f.baseRowGroupLength { // skip base rows read if all rows are replaced
var ok bool
baseRows, ok = f.baseRowsCache.Get(baseMapRowIndex)
if ok {
baseRows = slices.Clone(baseRows)
} else {
var err error
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
if err != nil {
return fmt.Errorf("failed to retrieve filter map %d base rows %d for modification: %v", mapIndices[0]&-f.baseRowGroupLength, rowIndex, err)
}
var err error
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
if err != nil {
return fmt.Errorf("failed to retrieve base row group %d of row %d for modification: %v", baseRowGroup, rowIndex, err)
}
} else {
baseRows = make([][]uint32, f.baseRowGroupLength)
}
for i, mapIndex := range mapIndices {
if mapIndex&-f.baseRowGroupLength != baseMapIndex {
if mapIndex/f.baseRowGroupLength != baseRowGroup {
panic("mapIndices are not in the same base row group")
}
baseRow := []uint32(rows[i])
@ -650,7 +662,6 @@ func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ethdb.Batch, mapIndices []u
baseRows[mapIndex&(f.baseRowGroupLength-1)] = baseRow
rawdb.WriteFilterMapExtRow(batch, f.mapRowIndex(mapIndex, rowIndex), extRow, f.logMapWidth)
}
f.baseRowsCache.Add(baseMapRowIndex, baseRows)
rawdb.WriteFilterMapBaseRows(batch, baseMapRowIndex, baseRows, f.logMapWidth)
return nil
}

View file

@ -428,10 +428,12 @@ func (ts *testSetup) matcherViewHash() common.Hash {
binary.LittleEndian.PutUint32(enc[:4], r)
for m := uint32(0); m <= headMap; m++ {
binary.LittleEndian.PutUint32(enc[4:8], m)
row, _ := mb.GetFilterMapRow(ctx, m, r, false)
for _, v := range row {
binary.LittleEndian.PutUint32(enc[8:], v)
hasher.Write(enc[:])
rows, _ := mb.GetFilterMapRows(ctx, []uint32{m}, r, false)
for _, row := range rows {
for _, v := range row {
binary.LittleEndian.PutUint32(enc[8:], v)
hasher.Write(enc[:])
}
}
}
}

View file

@ -20,7 +20,6 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
@ -45,7 +44,7 @@ var ErrMatchAll = errors.New("match all patterns not supported")
type MatcherBackend interface {
GetParams() *Params
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error)
GetFilterMapRows(ctx context.Context, mapIndices []uint32, rowIndex uint32, baseLayerOnly bool) ([]FilterRow, error)
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
SyncLogIndex(ctx context.Context) (SyncRange, error)
Close()
@ -365,50 +364,57 @@ func (m *singleMatcherInstance) getMatchesForLayer(ctx context.Context, layerInd
var st int
m.stats.setState(&st, stOther)
params := m.backend.GetParams()
maskedMapIndex, rowIndex := uint32(math.MaxUint32), uint32(0)
for _, mapIndex := range m.mapIndices {
filterRows, ok := m.filterRows[mapIndex]
if !ok {
continue
}
if mm := params.maskedMapIndex(mapIndex, layerIndex); mm != maskedMapIndex {
// only recalculate rowIndex when necessary
maskedMapIndex = mm
rowIndex = params.rowIndex(mapIndex, layerIndex, m.value)
var ptr int
for len(m.mapIndices) > ptr {
// find next group of map indices mapped onto the same row
maskedMapIndex := params.maskedMapIndex(m.mapIndices[ptr], layerIndex)
rowIndex := params.rowIndex(m.mapIndices[ptr], layerIndex, m.value)
groupLength := 1
for ptr+groupLength < len(m.mapIndices) && params.maskedMapIndex(m.mapIndices[ptr+groupLength], layerIndex) == maskedMapIndex {
groupLength++
}
if layerIndex == 0 {
m.stats.setState(&st, stFetchFirst)
} else {
m.stats.setState(&st, stFetchMore)
}
filterRow, err := m.backend.GetFilterMapRow(ctx, mapIndex, rowIndex, layerIndex == 0)
groupRows, err := m.backend.GetFilterMapRows(ctx, m.mapIndices[ptr:ptr+groupLength], rowIndex, layerIndex == 0)
if err != nil {
m.stats.setState(&st, stNone)
return nil, fmt.Errorf("failed to retrieve filter map %d row %d: %v", mapIndex, rowIndex, err)
return nil, fmt.Errorf("failed to retrieve filter map %d row %d: %v", m.mapIndices[ptr], rowIndex, err)
}
if layerIndex == 0 {
matchBaseRowAccessMeter.Mark(1)
matchBaseRowSizeMeter.Mark(int64(len(filterRow)))
} else {
matchExtRowAccessMeter.Mark(1)
matchExtRowSizeMeter.Mark(int64(len(filterRow)))
}
m.stats.addAmount(st, int64(len(filterRow)))
m.stats.setState(&st, stOther)
filterRows = append(filterRows, filterRow)
if uint32(len(filterRow)) < params.maxRowLength(layerIndex) {
m.stats.setState(&st, stProcess)
matches := params.potentialMatches(filterRows, mapIndex, m.value)
m.stats.addAmount(st, int64(len(matches)))
results = append(results, matcherResult{
mapIndex: mapIndex,
matches: matches,
})
m.stats.setState(&st, stOther)
delete(m.filterRows, mapIndex)
} else {
m.filterRows[mapIndex] = filterRows
for i := range groupLength {
mapIndex := m.mapIndices[ptr+i]
filterRow := groupRows[i]
filterRows, ok := m.filterRows[mapIndex]
if !ok {
panic("dropped map in mapIndices")
}
if layerIndex == 0 {
matchBaseRowAccessMeter.Mark(1)
matchBaseRowSizeMeter.Mark(int64(len(filterRow)))
} else {
matchExtRowAccessMeter.Mark(1)
matchExtRowSizeMeter.Mark(int64(len(filterRow)))
}
m.stats.addAmount(st, int64(len(filterRow)))
filterRows = append(filterRows, filterRow)
if uint32(len(filterRow)) < params.maxRowLength(layerIndex) {
m.stats.setState(&st, stProcess)
matches := params.potentialMatches(filterRows, mapIndex, m.value)
m.stats.addAmount(st, int64(len(matches)))
results = append(results, matcherResult{
mapIndex: mapIndex,
matches: matches,
})
m.stats.setState(&st, stOther)
delete(m.filterRows, mapIndex)
} else {
m.filterRows[mapIndex] = filterRows
}
}
ptr += groupLength
}
m.cleanMapIndices()
m.stats.setState(&st, stNone)

View file

@ -67,18 +67,15 @@ func (fm *FilterMapsMatcherBackend) Close() {
delete(fm.f.matchers, fm)
}
// GetFilterMapRow returns the given row of the given map. If the row is empty
// GetFilterMapRows returns the given row of the given map. If the row is empty
// then a non-nil zero length row is returned. If baseLayerOnly is true then
// only the first baseRowLength entries of the row are guaranteed to be
// returned.
// Note that the returned slices should not be modified, they should be copied
// on write.
// GetFilterMapRow implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error) {
fm.f.indexLock.RLock()
defer fm.f.indexLock.RUnlock()
return fm.f.getFilterMapRow(mapIndex, rowIndex, baseLayerOnly)
// GetFilterMapRows implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetFilterMapRows(ctx context.Context, mapIndices []uint32, rowIndex uint32, baseLayerOnly bool) ([]FilterRow, error) {
return fm.f.getFilterMapRows(mapIndices, rowIndex, baseLayerOnly)
}
// GetBlockLvPointer returns the starting log value index where the log values