From 14f15430bb99646f516e239271f21e4a52584666 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Mon, 21 Apr 2025 09:27:24 +0200 Subject: [PATCH] core/filtermaps: clone cached slices, fix tempRange (#31680) This PR ensures that caching a slice or a slice of slices will never affect the original version by always cloning a slice fetched from cache if it is not used in a guaranteed read only way. --- core/filtermaps/filtermaps.go | 9 ++++-- core/filtermaps/indexer.go | 3 ++ core/filtermaps/indexer_test.go | 52 +++++++++++++++++++++++++++++++++ core/filtermaps/map_renderer.go | 6 ++-- 4 files changed, 65 insertions(+), 5 deletions(-) diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index a617de8968..920167ca8d 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -128,6 +128,7 @@ type FilterMaps struct { // test hooks testDisableSnapshots, testSnapshotUsed bool + testProcessEventsHook func() } // filterMap is a full or partial in-memory representation of a filter map where @@ -573,7 +574,7 @@ func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32, baseLayerOnly bo } f.baseRowsCache.Add(baseMapRowIndex, baseRows) } - baseRow := baseRows[mapIndex&(f.baseRowGroupLength-1)] + baseRow := slices.Clone(baseRows[mapIndex&(f.baseRowGroupLength-1)]) if baseLayerOnly { return baseRow, nil } @@ -610,7 +611,9 @@ func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ethdb.Batch, mapIndices []u if uint32(len(mapIndices)) != f.baseRowGroupLength { // skip base rows read if all rows are replaced var ok bool baseRows, ok = f.baseRowsCache.Get(baseMapRowIndex) - if !ok { + if ok { + baseRows = slices.Clone(baseRows) + } else { var err error baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth) if err != nil { @@ -656,7 +659,7 @@ func (f *FilterMaps) mapRowIndex(mapIndex, rowIndex uint32) uint64 { // called from outside the indexerLoop goroutine. func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) { if blockNumber >= f.indexedRange.blocks.AfterLast() && f.indexedRange.headIndexed { - return f.indexedRange.headDelimiter, nil + return f.indexedRange.headDelimiter + 1, nil } if lvPointer, ok := f.lvPointerCache.Get(blockNumber); ok { return lvPointer, nil diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 383ec078c9..787197345a 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -165,6 +165,9 @@ func (f *FilterMaps) waitForNewHead() { // processEvents processes all events, blocking only if a block processing is // happening and indexing should be suspended. func (f *FilterMaps) processEvents() { + if f.testProcessEventsHook != nil { + f.testProcessEventsHook() + } for f.processSingleEvent(f.blockProcessing) { } } diff --git a/core/filtermaps/indexer_test.go b/core/filtermaps/indexer_test.go index 4dddd27087..e60130ba4b 100644 --- a/core/filtermaps/indexer_test.go +++ b/core/filtermaps/indexer_test.go @@ -219,6 +219,58 @@ func testIndexerMatcherView(t *testing.T, concurrentRead bool) { } } +func TestLogsByIndex(t *testing.T) { + ts := newTestSetup(t) + defer func() { + ts.fm.testProcessEventsHook = nil + ts.close() + }() + + ts.chain.addBlocks(1000, 10, 3, 4, true) + ts.setHistory(0, false) + ts.fm.WaitIdle() + firstLog := make([]uint64, 1001) // first valid log position per block + lastLog := make([]uint64, 1001) // last valid log position per block + for i := uint64(0); i <= ts.fm.indexedRange.headDelimiter; i++ { + log, err := ts.fm.getLogByLvIndex(i) + if err != nil { + t.Fatalf("Error getting log by index %d: %v", i, err) + } + if log != nil { + if firstLog[log.BlockNumber] == 0 { + firstLog[log.BlockNumber] = i + } + lastLog[log.BlockNumber] = i + } + } + var failed bool + ts.fm.testProcessEventsHook = func() { + if ts.fm.indexedRange.blocks.IsEmpty() { + return + } + if lvi := firstLog[ts.fm.indexedRange.blocks.First()]; lvi != 0 { + log, err := ts.fm.getLogByLvIndex(lvi) + if log == nil || err != nil { + t.Errorf("Error getting first log of indexed block range: %v", err) + failed = true + } + } + if lvi := lastLog[ts.fm.indexedRange.blocks.Last()]; lvi != 0 { + log, err := ts.fm.getLogByLvIndex(lvi) + if log == nil || err != nil { + t.Errorf("Error getting last log of indexed block range: %v", err) + failed = true + } + } + } + chain := ts.chain.getCanonicalChain() + for i := 0; i < 1000 && !failed; i++ { + head := rand.Intn(len(chain)) + ts.chain.setCanonicalChain(chain[:head+1]) + ts.fm.WaitIdle() + } +} + func TestIndexerCompareDb(t *testing.T) { ts := newTestSetup(t) defer ts.close() diff --git a/core/filtermaps/map_renderer.go b/core/filtermaps/map_renderer.go index 7c2aa8dc32..f59a01c032 100644 --- a/core/filtermaps/map_renderer.go +++ b/core/filtermaps/map_renderer.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "math" + "slices" "sort" "time" @@ -107,7 +108,7 @@ func (f *FilterMaps) renderMapsFromSnapshot(cp *renderedMap) (*mapRenderer, erro filterMap: cp.filterMap.fullCopy(), mapIndex: cp.mapIndex, lastBlock: cp.lastBlock, - blockLvPtrs: cp.blockLvPtrs, + blockLvPtrs: slices.Clone(cp.blockLvPtrs), }, finishedMaps: make(map[uint32]*renderedMap), finished: common.NewRange(cp.mapIndex, 0), @@ -244,7 +245,7 @@ func (f *FilterMaps) loadHeadSnapshot() error { } } f.renderSnapshots.Add(f.indexedRange.blocks.Last(), &renderedMap{ - filterMap: fm, + filterMap: fm.fullCopy(), mapIndex: f.indexedRange.maps.Last(), lastBlock: f.indexedRange.blocks.Last(), lastBlockId: f.indexedView.BlockId(f.indexedRange.blocks.Last()), @@ -536,6 +537,7 @@ func (r *mapRenderer) getTempRange() (filterMapsRange, error) { } else { tempRange.blocks.SetAfterLast(0) } + tempRange.headIndexed = false tempRange.headDelimiter = 0 } return tempRange, nil