forked from forks/go-ethereum
core/filtermaps: fix map renderer reorg issue (#31642)
This PR fixes a bug in the map renderer that sometimes used an obsolete block log value pointer to initialize the iterator for rendering from a snapshot. This bug was triggered by chain reorgs and sometimes caused indexing errors and invalid search results. A few other conditions are also made safer that were not reported to cause issues yet but could potentially be unsafe in some corner cases. A new unit test is also added that reproduced the bug but passes with the new fixes. Fixes https://github.com/ethereum/go-ethereum/issues/31593 Might also fix https://github.com/ethereum/go-ethereum/issues/31589 though this issue has not been reproduced yet, but it appears to be related to a log index database corruption around a specific block, similarly to the other issue. Note that running this branch resets and regenerates the log index database. For this purpose a `Version` field has been added to `rawdb.FilterMapsRange` which will also make this easier in the future if a breaking database change is needed or the existing one is considered potentially broken due to a bug, like in this case.
This commit is contained in:
parent
e3e9d7ccb6
commit
ebb3eb29d3
5 changed files with 153 additions and 33 deletions
|
|
@ -50,6 +50,7 @@ var (
|
|||
)
|
||||
|
||||
const (
|
||||
databaseVersion = 1 // reindexed if database version does not match
|
||||
cachedLastBlocks = 1000 // last block of map pointers
|
||||
cachedLvPointers = 1000 // first log value pointer of block pointers
|
||||
cachedBaseRows = 100 // groups of base layer filter row data
|
||||
|
|
@ -138,13 +139,25 @@ type FilterMaps struct {
|
|||
// as transparent (uncached/unchanged).
|
||||
type filterMap []FilterRow
|
||||
|
||||
// copy returns a copy of the given filter map. Note that the row slices are
|
||||
// copied but their contents are not. This permits extending the rows further
|
||||
// fastCopy returns a copy of the given filter map. Note that the row slices are
|
||||
// copied but their contents are not. This permits appending to the rows further
|
||||
// (which happens during map rendering) without affecting the validity of
|
||||
// copies made for snapshots during rendering.
|
||||
func (fm filterMap) copy() filterMap {
|
||||
// Appending to the rows of both the original map and the fast copy, or two fast
|
||||
// copies of the same map would result in data corruption, therefore a fast copy
|
||||
// should always be used in a read only way.
|
||||
func (fm filterMap) fastCopy() filterMap {
|
||||
return slices.Clone(fm)
|
||||
}
|
||||
|
||||
// fullCopy returns a copy of the given filter map, also making a copy of each
|
||||
// individual filter row, ensuring that a modification to either one will never
|
||||
// affect the other.
|
||||
func (fm filterMap) fullCopy() filterMap {
|
||||
c := make(filterMap, len(fm))
|
||||
copy(c, fm)
|
||||
for i, row := range fm {
|
||||
c[i] = slices.Clone(row)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
|
|
@ -207,8 +220,9 @@ type Config struct {
|
|||
// NewFilterMaps creates a new FilterMaps and starts the indexer.
|
||||
func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, finalBlock uint64, params Params, config Config) *FilterMaps {
|
||||
rs, initialized, err := rawdb.ReadFilterMapsRange(db)
|
||||
if err != nil {
|
||||
log.Error("Error reading log index range", "error", err)
|
||||
if err != nil || rs.Version != databaseVersion {
|
||||
rs, initialized = rawdb.FilterMapsRange{}, false
|
||||
log.Warn("Invalid log index database version; resetting log index")
|
||||
}
|
||||
params.deriveFields()
|
||||
f := &FilterMaps{
|
||||
|
|
@ -437,6 +451,7 @@ func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, ne
|
|||
f.updateMatchersValidRange()
|
||||
if newRange.initialized {
|
||||
rs := rawdb.FilterMapsRange{
|
||||
Version: databaseVersion,
|
||||
HeadIndexed: newRange.headIndexed,
|
||||
HeadDelimiter: newRange.headDelimiter,
|
||||
BlocksFirst: newRange.blocks.First(),
|
||||
|
|
|
|||
|
|
@ -17,8 +17,10 @@
|
|||
package filtermaps
|
||||
|
||||
import (
|
||||
"context"
|
||||
crand "crypto/rand"
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"sync"
|
||||
|
|
@ -31,6 +33,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
var testParams = Params{
|
||||
|
|
@ -104,6 +107,7 @@ func TestIndexerRandomRange(t *testing.T) {
|
|||
fork, head = rand.Intn(len(forks)), rand.Intn(1001)
|
||||
ts.chain.setCanonicalChain(forks[fork][:head+1])
|
||||
case 2:
|
||||
checkSnapshot = false
|
||||
if head < 1000 {
|
||||
checkSnapshot = !noHistory && head != 0 // no snapshot generated for block 0
|
||||
// add blocks after the current head
|
||||
|
|
@ -158,6 +162,63 @@ func TestIndexerRandomRange(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestIndexerMatcherView(t *testing.T) {
|
||||
testIndexerMatcherView(t, false)
|
||||
}
|
||||
|
||||
func TestIndexerMatcherViewWithConcurrentRead(t *testing.T) {
|
||||
testIndexerMatcherView(t, true)
|
||||
}
|
||||
|
||||
func testIndexerMatcherView(t *testing.T, concurrentRead bool) {
|
||||
ts := newTestSetup(t)
|
||||
defer ts.close()
|
||||
|
||||
forks := make([][]common.Hash, 20)
|
||||
hashes := make([]common.Hash, 20)
|
||||
ts.chain.addBlocks(100, 5, 2, 4, true)
|
||||
ts.setHistory(0, false)
|
||||
for i := range forks {
|
||||
if i != 0 {
|
||||
ts.chain.setHead(100 - i)
|
||||
ts.chain.addBlocks(i, 5, 2, 4, true)
|
||||
}
|
||||
ts.fm.WaitIdle()
|
||||
forks[i] = ts.chain.getCanonicalChain()
|
||||
hashes[i] = ts.matcherViewHash()
|
||||
}
|
||||
fork := len(forks) - 1
|
||||
for i := 0; i < 5000; i++ {
|
||||
oldFork := fork
|
||||
fork = rand.Intn(len(forks))
|
||||
stopCh := make(chan chan struct{})
|
||||
if concurrentRead {
|
||||
go func() {
|
||||
for {
|
||||
ts.matcherViewHash()
|
||||
select {
|
||||
case ch := <-stopCh:
|
||||
close(ch)
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
ts.chain.setCanonicalChain(forks[fork])
|
||||
ts.fm.WaitIdle()
|
||||
if concurrentRead {
|
||||
ch := make(chan struct{})
|
||||
stopCh <- ch
|
||||
<-ch
|
||||
}
|
||||
hash := ts.matcherViewHash()
|
||||
if hash != hashes[fork] {
|
||||
t.Fatalf("Matcher view hash mismatch when switching from for %d to %d", oldFork, fork)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexerCompareDb(t *testing.T) {
|
||||
ts := newTestSetup(t)
|
||||
defer ts.close()
|
||||
|
|
@ -291,6 +352,55 @@ func (ts *testSetup) fmDbHash() common.Hash {
|
|||
return result
|
||||
}
|
||||
|
||||
func (ts *testSetup) matcherViewHash() common.Hash {
|
||||
mb := ts.fm.NewMatcherBackend()
|
||||
defer mb.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
params := mb.GetParams()
|
||||
hasher := sha256.New()
|
||||
var headPtr uint64
|
||||
for b := uint64(0); ; b++ {
|
||||
lvptr, err := mb.GetBlockLvPointer(ctx, b)
|
||||
if err != nil || (b > 0 && lvptr == headPtr) {
|
||||
break
|
||||
}
|
||||
var enc [8]byte
|
||||
binary.LittleEndian.PutUint64(enc[:], lvptr)
|
||||
hasher.Write(enc[:])
|
||||
headPtr = lvptr
|
||||
}
|
||||
headMap := uint32(headPtr >> params.logValuesPerMap)
|
||||
var enc [12]byte
|
||||
for r := uint32(0); r < params.mapHeight; r++ {
|
||||
binary.LittleEndian.PutUint32(enc[:4], r)
|
||||
for m := uint32(0); m <= headMap; m++ {
|
||||
binary.LittleEndian.PutUint32(enc[4:8], m)
|
||||
row, _ := mb.GetFilterMapRow(ctx, m, r, false)
|
||||
for _, v := range row {
|
||||
binary.LittleEndian.PutUint32(enc[8:], v)
|
||||
hasher.Write(enc[:])
|
||||
}
|
||||
}
|
||||
}
|
||||
var hash common.Hash
|
||||
hasher.Sum(hash[:0])
|
||||
for i := 0; i < 50; i++ {
|
||||
hasher.Reset()
|
||||
hasher.Write(hash[:])
|
||||
lvptr := binary.LittleEndian.Uint64(hash[:8]) % headPtr
|
||||
if log, _ := mb.GetLogByLvIndex(ctx, lvptr); log != nil {
|
||||
enc, err := rlp.EncodeToBytes(log)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
hasher.Write(enc)
|
||||
}
|
||||
hasher.Sum(hash[:0])
|
||||
}
|
||||
return hash
|
||||
}
|
||||
|
||||
func (ts *testSetup) close() {
|
||||
if ts.fm != nil {
|
||||
ts.fm.Stop()
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ func (f *FilterMaps) renderMapsBefore(renderBefore uint32) (*mapRenderer, error)
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if snapshot := f.lastCanonicalSnapshotBefore(renderBefore); snapshot != nil && snapshot.mapIndex >= nextMap {
|
||||
if snapshot := f.lastCanonicalSnapshotOfMap(nextMap); snapshot != nil {
|
||||
return f.renderMapsFromSnapshot(snapshot)
|
||||
}
|
||||
if nextMap >= renderBefore {
|
||||
|
|
@ -97,14 +97,14 @@ func (f *FilterMaps) renderMapsBefore(renderBefore uint32) (*mapRenderer, error)
|
|||
// snapshot made at a block boundary.
|
||||
func (f *FilterMaps) renderMapsFromSnapshot(cp *renderedMap) (*mapRenderer, error) {
|
||||
f.testSnapshotUsed = true
|
||||
iter, err := f.newLogIteratorFromBlockDelimiter(cp.lastBlock)
|
||||
iter, err := f.newLogIteratorFromBlockDelimiter(cp.lastBlock, cp.headDelimiter)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create log iterator from block delimiter %d: %v", cp.lastBlock, err)
|
||||
}
|
||||
return &mapRenderer{
|
||||
f: f,
|
||||
currentMap: &renderedMap{
|
||||
filterMap: cp.filterMap.copy(),
|
||||
filterMap: cp.filterMap.fullCopy(),
|
||||
mapIndex: cp.mapIndex,
|
||||
lastBlock: cp.lastBlock,
|
||||
blockLvPtrs: cp.blockLvPtrs,
|
||||
|
|
@ -137,14 +137,14 @@ func (f *FilterMaps) renderMapsFromMapBoundary(firstMap, renderBefore uint32, st
|
|||
}, nil
|
||||
}
|
||||
|
||||
// lastCanonicalSnapshotBefore returns the latest cached snapshot that matches
|
||||
// the current targetView.
|
||||
func (f *FilterMaps) lastCanonicalSnapshotBefore(renderBefore uint32) *renderedMap {
|
||||
// lastCanonicalSnapshotOfMap returns the latest cached snapshot of the given map
|
||||
// that is also consistent with the current targetView.
|
||||
func (f *FilterMaps) lastCanonicalSnapshotOfMap(mapIndex uint32) *renderedMap {
|
||||
var best *renderedMap
|
||||
for _, blockNumber := range f.renderSnapshots.Keys() {
|
||||
if cp, _ := f.renderSnapshots.Get(blockNumber); cp != nil && blockNumber < f.indexedRange.blocks.AfterLast() &&
|
||||
blockNumber <= f.targetView.headNumber && f.targetView.getBlockId(blockNumber) == cp.lastBlockId &&
|
||||
cp.mapIndex < renderBefore && (best == nil || blockNumber > best.lastBlock) {
|
||||
cp.mapIndex == mapIndex && (best == nil || blockNumber > best.lastBlock) {
|
||||
best = cp
|
||||
}
|
||||
}
|
||||
|
|
@ -171,10 +171,9 @@ func (f *FilterMaps) lastCanonicalMapBoundaryBefore(renderBefore uint32) (nextMa
|
|||
if err != nil {
|
||||
return 0, 0, 0, fmt.Errorf("failed to retrieve last block of reverse iterated map %d: %v", mapIndex, err)
|
||||
}
|
||||
if lastBlock >= f.indexedView.headNumber || lastBlock >= f.targetView.headNumber ||
|
||||
lastBlockId != f.targetView.getBlockId(lastBlock) {
|
||||
// map is not full or inconsistent with targetView; roll back
|
||||
continue
|
||||
if (f.indexedRange.headIndexed && mapIndex >= f.indexedRange.maps.Last()) ||
|
||||
lastBlock >= f.targetView.headNumber || lastBlockId != f.targetView.getBlockId(lastBlock) {
|
||||
continue // map is not full or inconsistent with targetView; roll back
|
||||
}
|
||||
lvPtr, err := f.getBlockLvPointer(lastBlock)
|
||||
if err != nil {
|
||||
|
|
@ -257,11 +256,14 @@ func (f *FilterMaps) loadHeadSnapshot() error {
|
|||
|
||||
// makeSnapshot creates a snapshot of the current state of the rendered map.
|
||||
func (r *mapRenderer) makeSnapshot() {
|
||||
r.f.renderSnapshots.Add(r.iterator.blockNumber, &renderedMap{
|
||||
filterMap: r.currentMap.filterMap.copy(),
|
||||
if r.iterator.blockNumber != r.currentMap.lastBlock || r.iterator.chainView != r.f.targetView {
|
||||
panic("iterator state inconsistent with current rendered map")
|
||||
}
|
||||
r.f.renderSnapshots.Add(r.currentMap.lastBlock, &renderedMap{
|
||||
filterMap: r.currentMap.filterMap.fastCopy(),
|
||||
mapIndex: r.currentMap.mapIndex,
|
||||
lastBlock: r.iterator.blockNumber,
|
||||
lastBlockId: r.f.targetView.getBlockId(r.currentMap.lastBlock),
|
||||
lastBlock: r.currentMap.lastBlock,
|
||||
lastBlockId: r.iterator.chainView.getBlockId(r.currentMap.lastBlock),
|
||||
blockLvPtrs: r.currentMap.blockLvPtrs,
|
||||
finished: true,
|
||||
headDelimiter: r.iterator.lvIndex,
|
||||
|
|
@ -661,24 +663,13 @@ var errUnindexedRange = errors.New("unindexed range")
|
|||
// newLogIteratorFromBlockDelimiter creates a logIterator starting at the
|
||||
// given block's first log value entry (the block delimiter), according to the
|
||||
// current targetView.
|
||||
func (f *FilterMaps) newLogIteratorFromBlockDelimiter(blockNumber uint64) (*logIterator, error) {
|
||||
func (f *FilterMaps) newLogIteratorFromBlockDelimiter(blockNumber, lvIndex uint64) (*logIterator, error) {
|
||||
if blockNumber > f.targetView.headNumber {
|
||||
return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", blockNumber, f.targetView.headNumber)
|
||||
}
|
||||
if !f.indexedRange.blocks.Includes(blockNumber) {
|
||||
return nil, errUnindexedRange
|
||||
}
|
||||
var lvIndex uint64
|
||||
if f.indexedRange.headIndexed && blockNumber+1 == f.indexedRange.blocks.AfterLast() {
|
||||
lvIndex = f.indexedRange.headDelimiter
|
||||
} else {
|
||||
var err error
|
||||
lvIndex, err = f.getBlockLvPointer(blockNumber + 1)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to retrieve log value pointer of block %d after delimiter: %v", blockNumber+1, err)
|
||||
}
|
||||
lvIndex--
|
||||
}
|
||||
finished := blockNumber == f.targetView.headNumber
|
||||
l := &logIterator{
|
||||
chainView: f.targetView,
|
||||
|
|
|
|||
|
|
@ -75,6 +75,9 @@ func (fm *FilterMapsMatcherBackend) Close() {
|
|||
// on write.
|
||||
// GetFilterMapRow implements MatcherBackend.
|
||||
func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32, baseLayerOnly bool) (FilterRow, error) {
|
||||
fm.f.indexLock.RLock()
|
||||
defer fm.f.indexLock.RUnlock()
|
||||
|
||||
return fm.f.getFilterMapRow(mapIndex, rowIndex, baseLayerOnly)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -434,6 +434,7 @@ func DeleteBlockLvPointers(db ethdb.KeyValueStore, blocks common.Range[uint64],
|
|||
// FilterMapsRange is a storage representation of the block range covered by the
|
||||
// filter maps structure and the corresponting log value index range.
|
||||
type FilterMapsRange struct {
|
||||
Version uint32
|
||||
HeadIndexed bool
|
||||
HeadDelimiter uint64
|
||||
BlocksFirst, BlocksAfterLast uint64
|
||||
|
|
|
|||
Loading…
Reference in a new issue