forked from forks/go-ethereum
core/filtermaps: fix log index initialization (#31750)
This PR fixes an initialization bug that in some cases caused the map renderer to leave the last, partially rendered map as is and resume rendering from the next map. At initialization we check whether the existing rendered maps are consistent with the current chain view and revert them if necessary. Until now this happened through an ugly hacky solution, a "limited" chain view that was supposed to trigger a rollback of some maps in the renderer logic if necessary. This whole setup worked under assumptions that just weren't true any more. As a result it always tried to revert the last map but also it did not shorten the indexed range, only set `headIndexed` to false which indicated to the renderer logic that the last map is fully populated (which it wasn't). Now an explicit rollback of any unusable (reorged) maps happens at startup, which also means that no hacky chain view is necessary, as soon as the new `FilterMaps` is returned, the indexed range and view are consistent with each other. In the first commit an extra check is also added to `writeFinishedMaps` so that if there is ever again a bug that would result in a gapped index then it will not break the db with writing the incomplete data. Instead it will return an indexing error which causes the indexer to revert to unindexed mode and print an error log instantly. Hopefully this will not ever happen in the future, but in order to test this safeguard check I manually triggered the bug with only the first commit enabled, which caused an indexing error as expected. With the second commit added (the actual fix) the same operation succeeded without any issues. Note that the database version is also bumped in this PR in order to enforce a full reindexing as any existing database might be potentially broken. Fixes https://github.com/ethereum/go-ethereum/issues/31729
This commit is contained in:
parent
341929ab96
commit
8868ad6d6e
3 changed files with 51 additions and 44 deletions
|
|
@ -135,14 +135,6 @@ func (cv *ChainView) SharedRange(cv2 *ChainView) common.Range[uint64] {
|
|||
return common.NewRange(0, sharedLen)
|
||||
}
|
||||
|
||||
// limitedView returns a new chain view that is a truncated version of the parent view.
|
||||
func (cv *ChainView) limitedView(newHead uint64) *ChainView {
|
||||
if newHead >= cv.headNumber {
|
||||
return cv
|
||||
}
|
||||
return NewChainView(cv.chain, newHead, cv.BlockHash(newHead))
|
||||
}
|
||||
|
||||
// equalViews returns true if the two chain views are equivalent.
|
||||
func equalViews(cv1, cv2 *ChainView) bool {
|
||||
if cv1 == nil || cv2 == nil {
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ var (
|
|||
)
|
||||
|
||||
const (
|
||||
databaseVersion = 1 // reindexed if database version does not match
|
||||
databaseVersion = 2 // reindexed if database version does not match
|
||||
cachedLastBlocks = 1000 // last block of map pointers
|
||||
cachedLvPointers = 1000 // first log value pointer of block pointers
|
||||
cachedBaseRows = 100 // groups of base layer filter row data
|
||||
|
|
@ -244,6 +244,8 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
|
|||
disabledCh: make(chan struct{}),
|
||||
exportFileName: config.ExportFileName,
|
||||
Params: params,
|
||||
targetView: initView,
|
||||
indexedView: initView,
|
||||
indexedRange: filterMapsRange{
|
||||
initialized: initialized,
|
||||
headIndexed: rs.HeadIndexed,
|
||||
|
|
@ -265,16 +267,8 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
|
|||
baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows),
|
||||
renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots),
|
||||
}
|
||||
f.checkRevertRange() // revert maps that are inconsistent with the current chain view
|
||||
|
||||
// Set initial indexer target.
|
||||
f.targetView = initView
|
||||
if f.indexedRange.initialized {
|
||||
f.indexedView = f.initChainView(f.targetView)
|
||||
f.indexedRange.headIndexed = f.indexedRange.blocks.AfterLast() == f.indexedView.HeadNumber()+1
|
||||
if !f.indexedRange.headIndexed {
|
||||
f.indexedRange.headDelimiter = 0
|
||||
}
|
||||
}
|
||||
if f.indexedRange.hasIndexedBlocks() {
|
||||
log.Info("Initialized log indexer",
|
||||
"first block", f.indexedRange.blocks.First(), "last block", f.indexedRange.blocks.Last(),
|
||||
|
|
@ -303,29 +297,40 @@ func (f *FilterMaps) Stop() {
|
|||
f.closeWg.Wait()
|
||||
}
|
||||
|
||||
// initChainView returns a chain view consistent with both the current target
|
||||
// view and the current state of the log index as found in the database, based
|
||||
// on the last block of stored maps.
|
||||
// Note that the returned view might be shorter than the existing index if
|
||||
// the latest maps are not consistent with targetView.
|
||||
func (f *FilterMaps) initChainView(chainView *ChainView) *ChainView {
|
||||
mapIndex := f.indexedRange.maps.AfterLast()
|
||||
for {
|
||||
var ok bool
|
||||
mapIndex, ok = f.lastMapBoundaryBefore(mapIndex)
|
||||
if !ok {
|
||||
break
|
||||
// checkRevertRange checks whether the existing index is consistent with the
|
||||
// current indexed view and reverts inconsistent maps if necessary.
|
||||
func (f *FilterMaps) checkRevertRange() {
|
||||
if f.indexedRange.maps.Count() == 0 {
|
||||
return
|
||||
}
|
||||
lastBlockNumber, lastBlockId, err := f.getLastBlockOfMap(mapIndex)
|
||||
lastMap := f.indexedRange.maps.Last()
|
||||
lastBlockNumber, lastBlockId, err := f.getLastBlockOfMap(lastMap)
|
||||
if err != nil {
|
||||
log.Error("Could not initialize indexed chain view", "error", err)
|
||||
break
|
||||
log.Error("Error initializing log index database; resetting log index", "error", err)
|
||||
f.reset()
|
||||
return
|
||||
}
|
||||
if lastBlockNumber <= chainView.HeadNumber() && chainView.BlockId(lastBlockNumber) == lastBlockId {
|
||||
return chainView.limitedView(lastBlockNumber)
|
||||
for lastBlockNumber > f.indexedView.HeadNumber() || f.indexedView.BlockId(lastBlockNumber) != lastBlockId {
|
||||
// revert last map
|
||||
if f.indexedRange.maps.Count() == 1 {
|
||||
f.reset() // reset database if no rendered maps remained
|
||||
return
|
||||
}
|
||||
lastMap--
|
||||
newRange := f.indexedRange
|
||||
newRange.maps.SetLast(lastMap)
|
||||
lastBlockNumber, lastBlockId, err = f.getLastBlockOfMap(lastMap)
|
||||
if err != nil {
|
||||
log.Error("Error initializing log index database; resetting log index", "error", err)
|
||||
f.reset()
|
||||
return
|
||||
}
|
||||
newRange.blocks.SetAfterLast(lastBlockNumber) // lastBlockNumber is probably partially indexed
|
||||
newRange.headIndexed = false
|
||||
newRange.headDelimiter = 0
|
||||
// only shorten range and leave map data; next head render will overwrite it
|
||||
f.setRange(f.db, f.indexedView, newRange, false)
|
||||
}
|
||||
return chainView.limitedView(0)
|
||||
}
|
||||
|
||||
// reset un-initializes the FilterMaps structure and removes all related data from
|
||||
|
|
|
|||
|
|
@ -468,15 +468,25 @@ func (r *mapRenderer) writeFinishedMaps(pauseCb func() bool) error {
|
|||
r.f.filterMapCache.Remove(mapIndex)
|
||||
}
|
||||
}
|
||||
var blockNumber uint64
|
||||
if r.finished.First() > 0 {
|
||||
// in order to always ensure continuous block pointers, initialize
|
||||
// blockNumber based on the last block of the previous map, then verify
|
||||
// against the first block associated with each rendered map
|
||||
lastBlock, _, err := r.f.getLastBlockOfMap(r.finished.First() - 1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get last block of previous map %d: %v", r.finished.First()-1, err)
|
||||
}
|
||||
blockNumber = lastBlock + 1
|
||||
}
|
||||
// add or update block pointers
|
||||
blockNumber := r.finishedMaps[r.finished.First()].firstBlock()
|
||||
for mapIndex := range r.finished.Iter() {
|
||||
renderedMap := r.finishedMaps[mapIndex]
|
||||
if blockNumber != renderedMap.firstBlock() {
|
||||
return fmt.Errorf("non-continuous block numbers in rendered map %d (next expected: %d first rendered: %d)", mapIndex, blockNumber, renderedMap.firstBlock())
|
||||
}
|
||||
r.f.storeLastBlockOfMap(batch, mapIndex, renderedMap.lastBlock, renderedMap.lastBlockId)
|
||||
checkWriteCnt()
|
||||
if blockNumber != renderedMap.firstBlock() {
|
||||
panic("non-continuous block numbers")
|
||||
}
|
||||
for _, lvPtr := range renderedMap.blockLvPtrs {
|
||||
r.f.storeBlockLvPointer(batch, blockNumber, lvPtr)
|
||||
checkWriteCnt()
|
||||
|
|
|
|||
Loading…
Reference in a new issue