mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-24 00:39:26 +00:00
core/filtermaps: clone cached slices, fix tempRange (#31680)
This PR ensures that caching a slice or a slice of slices will never affect the original version by always cloning a slice fetched from cache if it is not used in a guaranteed read only way.
This commit is contained in:
parent
5a7bbb423f
commit
14f15430bb
4 changed files with 65 additions and 5 deletions
|
|
@ -128,6 +128,7 @@ type FilterMaps struct {
|
||||||
|
|
||||||
// test hooks
|
// test hooks
|
||||||
testDisableSnapshots, testSnapshotUsed bool
|
testDisableSnapshots, testSnapshotUsed bool
|
||||||
|
testProcessEventsHook func()
|
||||||
}
|
}
|
||||||
|
|
||||||
// filterMap is a full or partial in-memory representation of a filter map where
|
// filterMap is a full or partial in-memory representation of a filter map where
|
||||||
|
|
@ -573,7 +574,7 @@ func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32, baseLayerOnly bo
|
||||||
}
|
}
|
||||||
f.baseRowsCache.Add(baseMapRowIndex, baseRows)
|
f.baseRowsCache.Add(baseMapRowIndex, baseRows)
|
||||||
}
|
}
|
||||||
baseRow := baseRows[mapIndex&(f.baseRowGroupLength-1)]
|
baseRow := slices.Clone(baseRows[mapIndex&(f.baseRowGroupLength-1)])
|
||||||
if baseLayerOnly {
|
if baseLayerOnly {
|
||||||
return baseRow, nil
|
return baseRow, nil
|
||||||
}
|
}
|
||||||
|
|
@ -610,7 +611,9 @@ func (f *FilterMaps) storeFilterMapRowsOfGroup(batch ethdb.Batch, mapIndices []u
|
||||||
if uint32(len(mapIndices)) != f.baseRowGroupLength { // skip base rows read if all rows are replaced
|
if uint32(len(mapIndices)) != f.baseRowGroupLength { // skip base rows read if all rows are replaced
|
||||||
var ok bool
|
var ok bool
|
||||||
baseRows, ok = f.baseRowsCache.Get(baseMapRowIndex)
|
baseRows, ok = f.baseRowsCache.Get(baseMapRowIndex)
|
||||||
if !ok {
|
if ok {
|
||||||
|
baseRows = slices.Clone(baseRows)
|
||||||
|
} else {
|
||||||
var err error
|
var err error
|
||||||
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
|
baseRows, err = rawdb.ReadFilterMapBaseRows(f.db, baseMapRowIndex, f.baseRowGroupLength, f.logMapWidth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -656,7 +659,7 @@ func (f *FilterMaps) mapRowIndex(mapIndex, rowIndex uint32) uint64 {
|
||||||
// called from outside the indexerLoop goroutine.
|
// called from outside the indexerLoop goroutine.
|
||||||
func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) {
|
func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) {
|
||||||
if blockNumber >= f.indexedRange.blocks.AfterLast() && f.indexedRange.headIndexed {
|
if blockNumber >= f.indexedRange.blocks.AfterLast() && f.indexedRange.headIndexed {
|
||||||
return f.indexedRange.headDelimiter, nil
|
return f.indexedRange.headDelimiter + 1, nil
|
||||||
}
|
}
|
||||||
if lvPointer, ok := f.lvPointerCache.Get(blockNumber); ok {
|
if lvPointer, ok := f.lvPointerCache.Get(blockNumber); ok {
|
||||||
return lvPointer, nil
|
return lvPointer, nil
|
||||||
|
|
|
||||||
|
|
@ -165,6 +165,9 @@ func (f *FilterMaps) waitForNewHead() {
|
||||||
// processEvents processes all events, blocking only if a block processing is
|
// processEvents processes all events, blocking only if a block processing is
|
||||||
// happening and indexing should be suspended.
|
// happening and indexing should be suspended.
|
||||||
func (f *FilterMaps) processEvents() {
|
func (f *FilterMaps) processEvents() {
|
||||||
|
if f.testProcessEventsHook != nil {
|
||||||
|
f.testProcessEventsHook()
|
||||||
|
}
|
||||||
for f.processSingleEvent(f.blockProcessing) {
|
for f.processSingleEvent(f.blockProcessing) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -219,6 +219,58 @@ func testIndexerMatcherView(t *testing.T, concurrentRead bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLogsByIndex(t *testing.T) {
|
||||||
|
ts := newTestSetup(t)
|
||||||
|
defer func() {
|
||||||
|
ts.fm.testProcessEventsHook = nil
|
||||||
|
ts.close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
ts.chain.addBlocks(1000, 10, 3, 4, true)
|
||||||
|
ts.setHistory(0, false)
|
||||||
|
ts.fm.WaitIdle()
|
||||||
|
firstLog := make([]uint64, 1001) // first valid log position per block
|
||||||
|
lastLog := make([]uint64, 1001) // last valid log position per block
|
||||||
|
for i := uint64(0); i <= ts.fm.indexedRange.headDelimiter; i++ {
|
||||||
|
log, err := ts.fm.getLogByLvIndex(i)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error getting log by index %d: %v", i, err)
|
||||||
|
}
|
||||||
|
if log != nil {
|
||||||
|
if firstLog[log.BlockNumber] == 0 {
|
||||||
|
firstLog[log.BlockNumber] = i
|
||||||
|
}
|
||||||
|
lastLog[log.BlockNumber] = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var failed bool
|
||||||
|
ts.fm.testProcessEventsHook = func() {
|
||||||
|
if ts.fm.indexedRange.blocks.IsEmpty() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if lvi := firstLog[ts.fm.indexedRange.blocks.First()]; lvi != 0 {
|
||||||
|
log, err := ts.fm.getLogByLvIndex(lvi)
|
||||||
|
if log == nil || err != nil {
|
||||||
|
t.Errorf("Error getting first log of indexed block range: %v", err)
|
||||||
|
failed = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lvi := lastLog[ts.fm.indexedRange.blocks.Last()]; lvi != 0 {
|
||||||
|
log, err := ts.fm.getLogByLvIndex(lvi)
|
||||||
|
if log == nil || err != nil {
|
||||||
|
t.Errorf("Error getting last log of indexed block range: %v", err)
|
||||||
|
failed = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
chain := ts.chain.getCanonicalChain()
|
||||||
|
for i := 0; i < 1000 && !failed; i++ {
|
||||||
|
head := rand.Intn(len(chain))
|
||||||
|
ts.chain.setCanonicalChain(chain[:head+1])
|
||||||
|
ts.fm.WaitIdle()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestIndexerCompareDb(t *testing.T) {
|
func TestIndexerCompareDb(t *testing.T) {
|
||||||
ts := newTestSetup(t)
|
ts := newTestSetup(t)
|
||||||
defer ts.close()
|
defer ts.close()
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"slices"
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -107,7 +108,7 @@ func (f *FilterMaps) renderMapsFromSnapshot(cp *renderedMap) (*mapRenderer, erro
|
||||||
filterMap: cp.filterMap.fullCopy(),
|
filterMap: cp.filterMap.fullCopy(),
|
||||||
mapIndex: cp.mapIndex,
|
mapIndex: cp.mapIndex,
|
||||||
lastBlock: cp.lastBlock,
|
lastBlock: cp.lastBlock,
|
||||||
blockLvPtrs: cp.blockLvPtrs,
|
blockLvPtrs: slices.Clone(cp.blockLvPtrs),
|
||||||
},
|
},
|
||||||
finishedMaps: make(map[uint32]*renderedMap),
|
finishedMaps: make(map[uint32]*renderedMap),
|
||||||
finished: common.NewRange(cp.mapIndex, 0),
|
finished: common.NewRange(cp.mapIndex, 0),
|
||||||
|
|
@ -244,7 +245,7 @@ func (f *FilterMaps) loadHeadSnapshot() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
f.renderSnapshots.Add(f.indexedRange.blocks.Last(), &renderedMap{
|
f.renderSnapshots.Add(f.indexedRange.blocks.Last(), &renderedMap{
|
||||||
filterMap: fm,
|
filterMap: fm.fullCopy(),
|
||||||
mapIndex: f.indexedRange.maps.Last(),
|
mapIndex: f.indexedRange.maps.Last(),
|
||||||
lastBlock: f.indexedRange.blocks.Last(),
|
lastBlock: f.indexedRange.blocks.Last(),
|
||||||
lastBlockId: f.indexedView.BlockId(f.indexedRange.blocks.Last()),
|
lastBlockId: f.indexedView.BlockId(f.indexedRange.blocks.Last()),
|
||||||
|
|
@ -536,6 +537,7 @@ func (r *mapRenderer) getTempRange() (filterMapsRange, error) {
|
||||||
} else {
|
} else {
|
||||||
tempRange.blocks.SetAfterLast(0)
|
tempRange.blocks.SetAfterLast(0)
|
||||||
}
|
}
|
||||||
|
tempRange.headIndexed = false
|
||||||
tempRange.headDelimiter = 0
|
tempRange.headDelimiter = 0
|
||||||
}
|
}
|
||||||
return tempRange, nil
|
return tempRange, nil
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue