From c8a9a9c0917dd57d077a79044e65dbbdd421458b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Fri, 28 Mar 2025 16:17:28 +0100 Subject: [PATCH] core/filtermaps: revert to unindexed mode in case of indexing error (#31500) This PR changes log indexer error handling so that if an indexing error happens then it disables the indexer and reverts to unindexed more without resetting the database (except in case of a failed database init). Resetting the database on the first error would probably be overkill as a client update might fix this without having to reindex the entire history. It would also make debugging very hard. On the other hand, these errors do not resolve themselves automatically so constantly retrying makes no sense either. With these changes a new attempt to resume indexing is made every time the client is restarted. The PR also fixes https://github.com/ethereum/go-ethereum/issues/31491 which originated from the tail indexer trying to resume processing a failed map renderer. --------- Co-authored-by: Felix Lange --- core/filtermaps/chain_view.go | 6 ++- core/filtermaps/indexer.go | 97 ++++++++++++++++++++++------------- 2 files changed, 65 insertions(+), 38 deletions(-) diff --git a/core/filtermaps/chain_view.go b/core/filtermaps/chain_view.go index bc0de47ded..a8cf53b1c0 100644 --- a/core/filtermaps/chain_view.go +++ b/core/filtermaps/chain_view.go @@ -83,7 +83,11 @@ func (cv *ChainView) getReceipts(number uint64) types.Receipts { if number > cv.headNumber { panic("invalid block number") } - return cv.chain.GetReceiptsByHash(cv.blockHash(number)) + blockHash := cv.blockHash(number) + if blockHash == (common.Hash{}) { + log.Error("Chain view: block hash unavailable", "number", number, "head", cv.headNumber) + } + return cv.chain.GetReceiptsByHash(blockHash) } // limitedView returns a new chain view that is a truncated version of the parent view. diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 026b3b4f38..6732dc85ea 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -17,6 +17,7 @@ package filtermaps import ( + "errors" "math" "time" @@ -49,18 +50,15 @@ func (f *FilterMaps) indexerLoop() { continue } if err := f.init(); err != nil { - log.Error("Error initializing log index; reverting to unindexed mode", "error", err) - f.reset() - f.disabled = true - close(f.disabledCh) + f.disableForError("initialization", err) + f.reset() // remove broken index from DB return } } if !f.targetHeadIndexed() { - if !f.tryIndexHead() { - // either shutdown or unexpected error; in the latter case ensure - // that proper shutdown is still possible. - f.processSingleEvent(true) + if err := f.tryIndexHead(); err != nil && err != errChainUpdate { + f.disableForError("head rendering", err) + return } } else { if f.finalBlock != f.lastFinal { @@ -69,13 +67,36 @@ func (f *FilterMaps) indexerLoop() { } f.lastFinal = f.finalBlock } - if f.tryIndexTail() && f.tryUnindexTail() { - f.waitForNewHead() + if done, err := f.tryIndexTail(); err != nil { + f.disableForError("tail rendering", err) + return + } else if !done { + continue } + if done, err := f.tryUnindexTail(); err != nil { + f.disableForError("tail unindexing", err) + return + } else if !done { + continue + } + // tail indexing/unindexing is done; if head is also indexed then + // wait here until there is a new head + f.waitForNewHead() } } } +// disableForError is called when the indexer encounters a database error, for example a +// missing receipt. We can't continue operating when the database is broken, so the +// indexer goes into disabled state. +// Note that the partial index is left in disk; maybe a client update can fix the +// issue without reindexing. +func (f *FilterMaps) disableForError(op string, err error) { + log.Error("Log index "+op+" failed, reverting to unindexed mode", "error", err) + f.disabled = true + close(f.disabledCh) +} + type targetUpdate struct { targetView *ChainView historyCutoff, finalBlock uint64 @@ -116,14 +137,15 @@ func (f *FilterMaps) SetBlockProcessing(blockProcessing bool) { // WaitIdle blocks until the indexer is in an idle state while synced up to the // latest targetView. func (f *FilterMaps) WaitIdle() { - if f.disabled { - f.closeWg.Wait() - return - } for { ch := make(chan bool) - f.waitIdleCh <- ch - if <-ch { + select { + case f.waitIdleCh <- ch: + if <-ch { + return + } + case <-f.disabledCh: + f.closeWg.Wait() return } } @@ -196,16 +218,16 @@ func (f *FilterMaps) setTarget(target targetUpdate) { f.finalBlock = target.finalBlock } -// tryIndexHead tries to render head maps according to the current targetView -// and returns true if successful. -func (f *FilterMaps) tryIndexHead() bool { +// tryIndexHead tries to render head maps according to the current targetView. +// Should be called when targetHeadIndexed returns false. If this function +// returns no error then either stop is true or head indexing is finished. +func (f *FilterMaps) tryIndexHead() error { headRenderer, err := f.renderMapsBefore(math.MaxUint32) if err != nil { - log.Error("Error creating log index head renderer", "error", err) - return false + return err } if headRenderer == nil { - return true + return errors.New("head indexer has nothing to do") // tryIndexHead should be called when head is not indexed } if !f.startedHeadIndex { f.lastLogHeadIndex = time.Now() @@ -230,8 +252,7 @@ func (f *FilterMaps) tryIndexHead() bool { f.lastLogHeadIndex = time.Now() } }); err != nil { - log.Error("Log index head rendering failed", "error", err) - return false + return err } if f.loggedHeadIndex && f.indexedRange.hasIndexedBlocks() { log.Info("Log index head rendering finished", @@ -240,7 +261,7 @@ func (f *FilterMaps) tryIndexHead() bool { "elapsed", common.PrettyDuration(time.Since(f.startedHeadIndexAt))) } f.loggedHeadIndex, f.startedHeadIndex = false, false - return true + return nil } // tryIndexTail tries to render tail epochs until the tail target block is @@ -248,7 +269,7 @@ func (f *FilterMaps) tryIndexHead() bool { // Note that tail indexing is only started if the log index head is fully // rendered according to targetView and is suspended as soon as the targetView // is changed. -func (f *FilterMaps) tryIndexTail() bool { +func (f *FilterMaps) tryIndexTail() (bool, error) { for { firstEpoch := f.indexedRange.maps.First() >> f.logMapsPerEpoch if firstEpoch == 0 || !f.needTailEpoch(firstEpoch-1) { @@ -256,7 +277,7 @@ func (f *FilterMaps) tryIndexTail() bool { } f.processEvents() if f.stop || !f.targetHeadIndexed() { - return false + return false, nil } // resume process if tail rendering was interrupted because of head rendering tailRenderer := f.tailRenderer @@ -268,8 +289,7 @@ func (f *FilterMaps) tryIndexTail() bool { var err error tailRenderer, err = f.renderMapsBefore(f.indexedRange.maps.First()) if err != nil { - log.Error("Error creating log index tail renderer", "error", err) - return false + return false, err } } if tailRenderer == nil { @@ -302,13 +322,16 @@ func (f *FilterMaps) tryIndexTail() bool { f.lastLogTailIndex = time.Now() } }) - if err != nil && f.needTailEpoch(firstEpoch-1) { + if err != nil && !f.needTailEpoch(firstEpoch-1) { // stop silently if cutoff point has move beyond epoch boundary while rendering - log.Error("Log index tail rendering failed", "error", err) + return true, nil + } + if err != nil { + return false, err } if !done { f.tailRenderer = tailRenderer // only keep tail renderer if interrupted by stopCb - return false + return false, nil } } if f.loggedTailIndex && f.indexedRange.hasIndexedBlocks() { @@ -318,14 +341,14 @@ func (f *FilterMaps) tryIndexTail() bool { "elapsed", common.PrettyDuration(time.Since(f.startedTailIndexAt))) f.loggedTailIndex = false } - return true + return true, nil } // tryUnindexTail removes entire epochs of log index data as long as the first // fully indexed block is at least as old as the tail target. // Note that unindexing is very quick as it only removes continuous ranges of // data from the database and is also called while running head indexing. -func (f *FilterMaps) tryUnindexTail() bool { +func (f *FilterMaps) tryUnindexTail() (bool, error) { for { firstEpoch := (f.indexedRange.maps.First() - f.indexedRange.tailPartialEpoch) >> f.logMapsPerEpoch if f.needTailEpoch(firstEpoch) { @@ -333,7 +356,7 @@ func (f *FilterMaps) tryUnindexTail() bool { } f.processEvents() if f.stop { - return false + return false, nil } if !f.startedTailUnindex { f.startedTailUnindexAt = time.Now() @@ -343,7 +366,7 @@ func (f *FilterMaps) tryUnindexTail() bool { } if err := f.deleteTailEpoch(firstEpoch); err != nil { log.Error("Log index tail epoch unindexing failed", "error", err) - return false + return false, err } } if f.startedTailUnindex && f.indexedRange.hasIndexedBlocks() { @@ -354,7 +377,7 @@ func (f *FilterMaps) tryUnindexTail() bool { "elapsed", common.PrettyDuration(time.Since(f.startedTailUnindexAt))) f.startedTailUnindex = false } - return true + return true, nil } // needTailEpoch returns true if the given tail epoch needs to be kept