diff --git a/core/filtermaps/chain_view.go b/core/filtermaps/chain_view.go index bc0de47ded..a8cf53b1c0 100644 --- a/core/filtermaps/chain_view.go +++ b/core/filtermaps/chain_view.go @@ -83,7 +83,11 @@ func (cv *ChainView) getReceipts(number uint64) types.Receipts { if number > cv.headNumber { panic("invalid block number") } - return cv.chain.GetReceiptsByHash(cv.blockHash(number)) + blockHash := cv.blockHash(number) + if blockHash == (common.Hash{}) { + log.Error("Chain view: block hash unavailable", "number", number, "head", cv.headNumber) + } + return cv.chain.GetReceiptsByHash(blockHash) } // limitedView returns a new chain view that is a truncated version of the parent view. diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 026b3b4f38..6732dc85ea 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -17,6 +17,7 @@ package filtermaps import ( + "errors" "math" "time" @@ -49,18 +50,15 @@ func (f *FilterMaps) indexerLoop() { continue } if err := f.init(); err != nil { - log.Error("Error initializing log index; reverting to unindexed mode", "error", err) - f.reset() - f.disabled = true - close(f.disabledCh) + f.disableForError("initialization", err) + f.reset() // remove broken index from DB return } } if !f.targetHeadIndexed() { - if !f.tryIndexHead() { - // either shutdown or unexpected error; in the latter case ensure - // that proper shutdown is still possible. - f.processSingleEvent(true) + if err := f.tryIndexHead(); err != nil && err != errChainUpdate { + f.disableForError("head rendering", err) + return } } else { if f.finalBlock != f.lastFinal { @@ -69,13 +67,36 @@ func (f *FilterMaps) indexerLoop() { } f.lastFinal = f.finalBlock } - if f.tryIndexTail() && f.tryUnindexTail() { - f.waitForNewHead() + if done, err := f.tryIndexTail(); err != nil { + f.disableForError("tail rendering", err) + return + } else if !done { + continue } + if done, err := f.tryUnindexTail(); err != nil { + f.disableForError("tail unindexing", err) + return + } else if !done { + continue + } + // tail indexing/unindexing is done; if head is also indexed then + // wait here until there is a new head + f.waitForNewHead() } } } +// disableForError is called when the indexer encounters a database error, for example a +// missing receipt. We can't continue operating when the database is broken, so the +// indexer goes into disabled state. +// Note that the partial index is left in disk; maybe a client update can fix the +// issue without reindexing. +func (f *FilterMaps) disableForError(op string, err error) { + log.Error("Log index "+op+" failed, reverting to unindexed mode", "error", err) + f.disabled = true + close(f.disabledCh) +} + type targetUpdate struct { targetView *ChainView historyCutoff, finalBlock uint64 @@ -116,14 +137,15 @@ func (f *FilterMaps) SetBlockProcessing(blockProcessing bool) { // WaitIdle blocks until the indexer is in an idle state while synced up to the // latest targetView. func (f *FilterMaps) WaitIdle() { - if f.disabled { - f.closeWg.Wait() - return - } for { ch := make(chan bool) - f.waitIdleCh <- ch - if <-ch { + select { + case f.waitIdleCh <- ch: + if <-ch { + return + } + case <-f.disabledCh: + f.closeWg.Wait() return } } @@ -196,16 +218,16 @@ func (f *FilterMaps) setTarget(target targetUpdate) { f.finalBlock = target.finalBlock } -// tryIndexHead tries to render head maps according to the current targetView -// and returns true if successful. -func (f *FilterMaps) tryIndexHead() bool { +// tryIndexHead tries to render head maps according to the current targetView. +// Should be called when targetHeadIndexed returns false. If this function +// returns no error then either stop is true or head indexing is finished. +func (f *FilterMaps) tryIndexHead() error { headRenderer, err := f.renderMapsBefore(math.MaxUint32) if err != nil { - log.Error("Error creating log index head renderer", "error", err) - return false + return err } if headRenderer == nil { - return true + return errors.New("head indexer has nothing to do") // tryIndexHead should be called when head is not indexed } if !f.startedHeadIndex { f.lastLogHeadIndex = time.Now() @@ -230,8 +252,7 @@ func (f *FilterMaps) tryIndexHead() bool { f.lastLogHeadIndex = time.Now() } }); err != nil { - log.Error("Log index head rendering failed", "error", err) - return false + return err } if f.loggedHeadIndex && f.indexedRange.hasIndexedBlocks() { log.Info("Log index head rendering finished", @@ -240,7 +261,7 @@ func (f *FilterMaps) tryIndexHead() bool { "elapsed", common.PrettyDuration(time.Since(f.startedHeadIndexAt))) } f.loggedHeadIndex, f.startedHeadIndex = false, false - return true + return nil } // tryIndexTail tries to render tail epochs until the tail target block is @@ -248,7 +269,7 @@ func (f *FilterMaps) tryIndexHead() bool { // Note that tail indexing is only started if the log index head is fully // rendered according to targetView and is suspended as soon as the targetView // is changed. -func (f *FilterMaps) tryIndexTail() bool { +func (f *FilterMaps) tryIndexTail() (bool, error) { for { firstEpoch := f.indexedRange.maps.First() >> f.logMapsPerEpoch if firstEpoch == 0 || !f.needTailEpoch(firstEpoch-1) { @@ -256,7 +277,7 @@ func (f *FilterMaps) tryIndexTail() bool { } f.processEvents() if f.stop || !f.targetHeadIndexed() { - return false + return false, nil } // resume process if tail rendering was interrupted because of head rendering tailRenderer := f.tailRenderer @@ -268,8 +289,7 @@ func (f *FilterMaps) tryIndexTail() bool { var err error tailRenderer, err = f.renderMapsBefore(f.indexedRange.maps.First()) if err != nil { - log.Error("Error creating log index tail renderer", "error", err) - return false + return false, err } } if tailRenderer == nil { @@ -302,13 +322,16 @@ func (f *FilterMaps) tryIndexTail() bool { f.lastLogTailIndex = time.Now() } }) - if err != nil && f.needTailEpoch(firstEpoch-1) { + if err != nil && !f.needTailEpoch(firstEpoch-1) { // stop silently if cutoff point has move beyond epoch boundary while rendering - log.Error("Log index tail rendering failed", "error", err) + return true, nil + } + if err != nil { + return false, err } if !done { f.tailRenderer = tailRenderer // only keep tail renderer if interrupted by stopCb - return false + return false, nil } } if f.loggedTailIndex && f.indexedRange.hasIndexedBlocks() { @@ -318,14 +341,14 @@ func (f *FilterMaps) tryIndexTail() bool { "elapsed", common.PrettyDuration(time.Since(f.startedTailIndexAt))) f.loggedTailIndex = false } - return true + return true, nil } // tryUnindexTail removes entire epochs of log index data as long as the first // fully indexed block is at least as old as the tail target. // Note that unindexing is very quick as it only removes continuous ranges of // data from the database and is also called while running head indexing. -func (f *FilterMaps) tryUnindexTail() bool { +func (f *FilterMaps) tryUnindexTail() (bool, error) { for { firstEpoch := (f.indexedRange.maps.First() - f.indexedRange.tailPartialEpoch) >> f.logMapsPerEpoch if f.needTailEpoch(firstEpoch) { @@ -333,7 +356,7 @@ func (f *FilterMaps) tryUnindexTail() bool { } f.processEvents() if f.stop { - return false + return false, nil } if !f.startedTailUnindex { f.startedTailUnindexAt = time.Now() @@ -343,7 +366,7 @@ func (f *FilterMaps) tryUnindexTail() bool { } if err := f.deleteTailEpoch(firstEpoch); err != nil { log.Error("Log index tail epoch unindexing failed", "error", err) - return false + return false, err } } if f.startedTailUnindex && f.indexedRange.hasIndexedBlocks() { @@ -354,7 +377,7 @@ func (f *FilterMaps) tryUnindexTail() bool { "elapsed", common.PrettyDuration(time.Since(f.startedTailUnindexAt))) f.startedTailUnindex = false } - return true + return true, nil } // needTailEpoch returns true if the given tail epoch needs to be kept