diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index 667d7c7623..cb155e9a8b 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -53,7 +53,6 @@ const ( databaseVersion = 2 // reindexed if database version does not match cachedLastBlocks = 1000 // last block of map pointers cachedLvPointers = 1000 // first log value pointer of block pointers - cachedBaseRows = 100 // groups of base layer filter row data cachedFilterMaps = 3 // complete filter maps (cached by map renderer) cachedRenderSnapshots = 8 // saved map renderer data at block boundaries ) @@ -101,7 +100,6 @@ type FilterMaps struct { filterMapCache *lru.Cache[uint32, filterMap] lastBlockCache *lru.Cache[uint32, lastBlockOfMap] lvPointerCache *lru.Cache[uint64, uint64] - baseRowsCache *lru.Cache[uint64, [][]uint32] // the matchers set and the fields of FilterMapsMatcherBackend instances are // read and written both by exported functions and the indexer. @@ -264,7 +262,6 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps), lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks), lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers), - baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows), renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots), } f.checkRevertRange() // revert maps that are inconsistent with the current chain view @@ -348,7 +345,6 @@ func (f *FilterMaps) reset() { f.renderSnapshots.Purge() f.lastBlockCache.Purge() f.lvPointerCache.Purge() - f.baseRowsCache.Purge() f.indexLock.Unlock() // deleting the range first ensures that resetDb will be called again at next // startup and any leftover data will be removed even if it cannot finish now. @@ -565,47 +561,69 @@ func (f *FilterMaps) getFilterMap(mapIndex uint32) (filterMap, error) { } fm := make(filterMap, f.mapHeight) for rowIndex := range fm { - var err error - fm[rowIndex], err = f.getFilterMapRow(mapIndex, uint32(rowIndex), false) + rows, err := f.getFilterMapRows([]uint32{mapIndex}, uint32(rowIndex), false) if err != nil { return nil, fmt.Errorf("failed to load filter map %d from database: %v", mapIndex, err) } + fm[rowIndex] = rows[0] } f.filterMapCache.Add(mapIndex, fm) return fm, nil } -// getFilterMapRow fetches the given filter map row. If baseLayerOnly is true -// then only the first baseRowLength entries are returned. -func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error) { - baseMapRowIndex := f.mapRowIndex(mapIndex&-f.baseRowGroupLength, rowIndex) - baseRows, ok := f.baseRowsCache.Get(baseMapRowIndex) - if !ok { - var err error - baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth) - if err != nil { - return nil, fmt.Errorf("failed to retrieve filter map %d base rows %d: %v", mapIndex, rowIndex, err) +// getFilterMapRows fetches a set of filter map rows at the corresponding map +// indices and a shared row index. If baseLayerOnly is true then only the first +// baseRowLength entries are returned. +func (f *FilterMaps) getFilterMapRows(mapIndices []uint32, rowIndex uint32, baseLayerOnly bool) ([]FilterRow, error) { + rows := make([]FilterRow, len(mapIndices)) + var ptr int + for len(mapIndices) > ptr { + baseRowGroup := mapIndices[ptr] / f.baseRowGroupLength + groupLength := 1 + for ptr+groupLength < len(mapIndices) && mapIndices[ptr+groupLength]/f.baseRowGroupLength == baseRowGroup { + groupLength++ } - f.baseRowsCache.Add(baseMapRowIndex, baseRows) + if err := f.getFilterMapRowsOfGroup(rows[ptr:ptr+groupLength], mapIndices[ptr:ptr+groupLength], rowIndex, baseLayerOnly); err != nil { + return nil, err + } + ptr += groupLength } - baseRow := slices.Clone(baseRows[mapIndex&(f.baseRowGroupLength-1)]) - if baseLayerOnly { - return baseRow, nil - } - extRow, err := rawdb.ReadFilterMapExtRow(f.db, f.mapRowIndex(mapIndex, rowIndex), f.logMapWidth) + return rows, nil +} + +// getFilterMapRowsOfGroup fetches a set of filter map rows at map indices +// belonging to the same base row group. +func (f *FilterMaps) getFilterMapRowsOfGroup(target []FilterRow, mapIndices []uint32, rowIndex uint32, baseLayerOnly bool) error { + baseRowGroup := mapIndices[0] / f.baseRowGroupLength + baseMapRowIndex := f.mapRowIndex(baseRowGroup*f.baseRowGroupLength, rowIndex) + baseRows, err := rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth) if err != nil { - return nil, fmt.Errorf("failed to retrieve filter map %d extended row %d: %v", mapIndex, rowIndex, err) + return fmt.Errorf("failed to retrieve base row group %d of row %d: %v", baseRowGroup, rowIndex, err) } - return FilterRow(append(baseRow, extRow...)), nil + for i, mapIndex := range mapIndices { + if mapIndex/f.baseRowGroupLength != baseRowGroup { + panic("mapIndices are not in the same base row group") + } + row := baseRows[mapIndex&(f.baseRowGroupLength-1)] + if !baseLayerOnly { + extRow, err := rawdb.ReadFilterMapExtRow(f.db, f.mapRowIndex(mapIndex, rowIndex), f.logMapWidth) + if err != nil { + return fmt.Errorf("failed to retrieve filter map %d extended row %d: %v", mapIndex, rowIndex, err) + } + row = append(row, extRow...) + } + target[i] = row + } + return nil } // storeFilterMapRows stores a set of filter map rows at the corresponding map // indices and a shared row index. func (f *FilterMaps) storeFilterMapRows(batch ethdb.Batch, mapIndices []uint32, rowIndex uint32, rows []FilterRow) error { for len(mapIndices) > 0 { - baseMapIndex := mapIndices[0] & -f.baseRowGroupLength + baseRowGroup := mapIndices[0] / f.baseRowGroupLength groupLength := 1 - for groupLength < len(mapIndices) && mapIndices[groupLength]&-f.baseRowGroupLength == baseMapIndex { + for groupLength < len(mapIndices) && mapIndices[groupLength]/f.baseRowGroupLength == baseRowGroup { groupLength++ } if err := f.storeFilterMapRowsOfGroup(batch, mapIndices[:groupLength], rowIndex, rows[:groupLength]); err != nil { @@ -619,26 +637,20 @@ func (f *FilterMaps) storeFilterMapRows(batch ethdb.Batch, mapIndices []uint32, // storeFilterMapRowsOfGroup stores a set of filter map rows at map indices // belonging to the same base row group. func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ethdb.Batch, mapIndices []uint32, rowIndex uint32, rows []FilterRow) error { - baseMapIndex := mapIndices[0] & -f.baseRowGroupLength - baseMapRowIndex := f.mapRowIndex(baseMapIndex, rowIndex) + baseRowGroup := mapIndices[0] / f.baseRowGroupLength + baseMapRowIndex := f.mapRowIndex(baseRowGroup*f.baseRowGroupLength, rowIndex) var baseRows [][]uint32 if uint32(len(mapIndices)) != f.baseRowGroupLength { // skip base rows read if all rows are replaced - var ok bool - baseRows, ok = f.baseRowsCache.Get(baseMapRowIndex) - if ok { - baseRows = slices.Clone(baseRows) - } else { - var err error - baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth) - if err != nil { - return fmt.Errorf("failed to retrieve filter map %d base rows %d for modification: %v", mapIndices[0]&-f.baseRowGroupLength, rowIndex, err) - } + var err error + baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth) + if err != nil { + return fmt.Errorf("failed to retrieve base row group %d of row %d for modification: %v", baseRowGroup, rowIndex, err) } } else { baseRows = make([][]uint32, f.baseRowGroupLength) } for i, mapIndex := range mapIndices { - if mapIndex&-f.baseRowGroupLength != baseMapIndex { + if mapIndex/f.baseRowGroupLength != baseRowGroup { panic("mapIndices are not in the same base row group") } baseRow := []uint32(rows[i]) @@ -650,7 +662,6 @@ func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ethdb.Batch, mapIndices []u baseRows[mapIndex&(f.baseRowGroupLength-1)] = baseRow rawdb.WriteFilterMapExtRow(batch, f.mapRowIndex(mapIndex, rowIndex), extRow, f.logMapWidth) } - f.baseRowsCache.Add(baseMapRowIndex, baseRows) rawdb.WriteFilterMapBaseRows(batch, baseMapRowIndex, baseRows, f.logMapWidth) return nil } diff --git a/core/filtermaps/indexer_test.go b/core/filtermaps/indexer_test.go index 5ed397a90e..4144f4cccc 100644 --- a/core/filtermaps/indexer_test.go +++ b/core/filtermaps/indexer_test.go @@ -428,10 +428,12 @@ func (ts *testSetup) matcherViewHash() common.Hash { binary.LittleEndian.PutUint32(enc[:4], r) for m := uint32(0); m <= headMap; m++ { binary.LittleEndian.PutUint32(enc[4:8], m) - row, _ := mb.GetFilterMapRow(ctx, m, r, false) - for _, v := range row { - binary.LittleEndian.PutUint32(enc[8:], v) - hasher.Write(enc[:]) + rows, _ := mb.GetFilterMapRows(ctx, []uint32{m}, r, false) + for _, row := range rows { + for _, v := range row { + binary.LittleEndian.PutUint32(enc[8:], v) + hasher.Write(enc[:]) + } } } } diff --git a/core/filtermaps/matcher.go b/core/filtermaps/matcher.go index a5eeaaa5f0..82ea2ceee6 100644 --- a/core/filtermaps/matcher.go +++ b/core/filtermaps/matcher.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "math" "sync" "sync/atomic" "time" @@ -45,7 +44,7 @@ var ErrMatchAll = errors.New("match all patterns not supported") type MatcherBackend interface { GetParams() *Params GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) - GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error) + GetFilterMapRows(ctx context.Context, mapIndices []uint32, rowIndex uint32, baseLayerOnly bool) ([]FilterRow, error) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) SyncLogIndex(ctx context.Context) (SyncRange, error) Close() @@ -365,50 +364,57 @@ func (m *singleMatcherInstance) getMatchesForLayer(ctx context.Context, layerInd var st int m.stats.setState(&st, stOther) params := m.backend.GetParams() - maskedMapIndex, rowIndex := uint32(math.MaxUint32), uint32(0) - for _, mapIndex := range m.mapIndices { - filterRows, ok := m.filterRows[mapIndex] - if !ok { - continue - } - if mm := params.maskedMapIndex(mapIndex, layerIndex); mm != maskedMapIndex { - // only recalculate rowIndex when necessary - maskedMapIndex = mm - rowIndex = params.rowIndex(mapIndex, layerIndex, m.value) + var ptr int + for len(m.mapIndices) > ptr { + // find next group of map indices mapped onto the same row + maskedMapIndex := params.maskedMapIndex(m.mapIndices[ptr], layerIndex) + rowIndex := params.rowIndex(m.mapIndices[ptr], layerIndex, m.value) + groupLength := 1 + for ptr+groupLength < len(m.mapIndices) && params.maskedMapIndex(m.mapIndices[ptr+groupLength], layerIndex) == maskedMapIndex { + groupLength++ } if layerIndex == 0 { m.stats.setState(&st, stFetchFirst) } else { m.stats.setState(&st, stFetchMore) } - filterRow, err := m.backend.GetFilterMapRow(ctx, mapIndex, rowIndex, layerIndex == 0) + groupRows, err := m.backend.GetFilterMapRows(ctx, m.mapIndices[ptr:ptr+groupLength], rowIndex, layerIndex == 0) if err != nil { m.stats.setState(&st, stNone) - return nil, fmt.Errorf("failed to retrieve filter map %d row %d: %v", mapIndex, rowIndex, err) + return nil, fmt.Errorf("failed to retrieve filter map %d row %d: %v", m.mapIndices[ptr], rowIndex, err) } - if layerIndex == 0 { - matchBaseRowAccessMeter.Mark(1) - matchBaseRowSizeMeter.Mark(int64(len(filterRow))) - } else { - matchExtRowAccessMeter.Mark(1) - matchExtRowSizeMeter.Mark(int64(len(filterRow))) - } - m.stats.addAmount(st, int64(len(filterRow))) m.stats.setState(&st, stOther) - filterRows = append(filterRows, filterRow) - if uint32(len(filterRow)) < params.maxRowLength(layerIndex) { - m.stats.setState(&st, stProcess) - matches := params.potentialMatches(filterRows, mapIndex, m.value) - m.stats.addAmount(st, int64(len(matches))) - results = append(results, matcherResult{ - mapIndex: mapIndex, - matches: matches, - }) - m.stats.setState(&st, stOther) - delete(m.filterRows, mapIndex) - } else { - m.filterRows[mapIndex] = filterRows + for i := range groupLength { + mapIndex := m.mapIndices[ptr+i] + filterRow := groupRows[i] + filterRows, ok := m.filterRows[mapIndex] + if !ok { + panic("dropped map in mapIndices") + } + if layerIndex == 0 { + matchBaseRowAccessMeter.Mark(1) + matchBaseRowSizeMeter.Mark(int64(len(filterRow))) + } else { + matchExtRowAccessMeter.Mark(1) + matchExtRowSizeMeter.Mark(int64(len(filterRow))) + } + m.stats.addAmount(st, int64(len(filterRow))) + filterRows = append(filterRows, filterRow) + if uint32(len(filterRow)) < params.maxRowLength(layerIndex) { + m.stats.setState(&st, stProcess) + matches := params.potentialMatches(filterRows, mapIndex, m.value) + m.stats.addAmount(st, int64(len(matches))) + results = append(results, matcherResult{ + mapIndex: mapIndex, + matches: matches, + }) + m.stats.setState(&st, stOther) + delete(m.filterRows, mapIndex) + } else { + m.filterRows[mapIndex] = filterRows + } } + ptr += groupLength } m.cleanMapIndices() m.stats.setState(&st, stNone) diff --git a/core/filtermaps/matcher_backend.go b/core/filtermaps/matcher_backend.go index e19a63e18b..abd0fd9283 100644 --- a/core/filtermaps/matcher_backend.go +++ b/core/filtermaps/matcher_backend.go @@ -67,18 +67,15 @@ func (fm *FilterMapsMatcherBackend) Close() { delete(fm.f.matchers, fm) } -// GetFilterMapRow returns the given row of the given map. If the row is empty +// GetFilterMapRows returns the given row of the given map. If the row is empty // then a non-nil zero length row is returned. If baseLayerOnly is true then // only the first baseRowLength entries of the row are guaranteed to be // returned. // Note that the returned slices should not be modified, they should be copied // on write. -// GetFilterMapRow implements MatcherBackend. -func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error) { - fm.f.indexLock.RLock() - defer fm.f.indexLock.RUnlock() - - return fm.f.getFilterMapRow(mapIndex, rowIndex, baseLayerOnly) +// GetFilterMapRows implements MatcherBackend. +func (fm *FilterMapsMatcherBackend) GetFilterMapRows(ctx context.Context, mapIndices []uint32, rowIndex uint32, baseLayerOnly bool) ([]FilterRow, error) { + return fm.f.getFilterMapRows(mapIndices, rowIndex, baseLayerOnly) } // GetBlockLvPointer returns the starting log value index where the log values