diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index d74b11da04..db7ab0a426 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -53,7 +53,8 @@ type FilterMaps struct { // This is configured by the --history.logs.disable Geth flag. // We chose to implement disabling this way because it requires less special // case logic in eth/filters. - disabled bool + disabled bool + disabledCh chan struct{} // closed by indexer if disabled closeCh chan struct{} closeWg sync.WaitGroup @@ -196,6 +197,7 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f blockProcessingCh: make(chan bool, 1), history: config.History, disabled: config.Disabled, + disabledCh: make(chan struct{}), exportFileName: config.ExportFileName, Params: params, indexedRange: filterMapsRange{ @@ -206,6 +208,8 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f maps: common.NewRange(rs.MapsFirst, rs.MapsAfterLast-rs.MapsFirst), tailPartialEpoch: rs.TailPartialEpoch, }, + historyCutoff: historyCutoff, + finalBlock: finalBlock, matcherSyncCh: make(chan *FilterMapsMatcherBackend), matchers: make(map[*FilterMapsMatcherBackend]struct{}), filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps), @@ -278,8 +282,13 @@ func (f *FilterMaps) initChainView(chainView *ChainView) *ChainView { } // reset un-initializes the FilterMaps structure and removes all related data from -// the database. The function returns true if everything was successfully removed. -func (f *FilterMaps) reset() bool { +// the database. +// Note that in case of leveldb database the fallback implementation of DeleteRange +// might take a long time to finish and deleting the entire database may be +// interrupted by a shutdown. Deleting the filterMapsRange entry first does +// guarantee though that the next init() will not return successfully until the +// entire database has been cleaned. +func (f *FilterMaps) reset() { f.indexLock.Lock() f.indexedRange = filterMapsRange{} f.indexedView = nil @@ -292,11 +301,16 @@ func (f *FilterMaps) reset() bool { // deleting the range first ensures that resetDb will be called again at next // startup and any leftover data will be removed even if it cannot finish now. rawdb.DeleteFilterMapsRange(f.db) - return f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database") + f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database") } // init initializes an empty log index according to the current targetView. func (f *FilterMaps) init() error { + // ensure that there is no remaining data in the filter maps key range + if !f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database") { + return errors.New("could not reset log index database") + } + f.indexLock.Lock() defer f.indexLock.Unlock() @@ -317,6 +331,13 @@ func (f *FilterMaps) init() error { bestIdx, bestLen = idx, max } } + var initBlockNumber uint64 + if bestLen > 0 { + initBlockNumber = checkpoints[bestIdx][bestLen-1].BlockNumber + } + if initBlockNumber < f.historyCutoff { + return errors.New("cannot start indexing before history cutoff point") + } batch := f.db.NewBatch() for epoch := range bestLen { cp := checkpoints[bestIdx][epoch] diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 69f42d8b60..026b3b4f38 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -36,20 +36,25 @@ func (f *FilterMaps) indexerLoop() { if f.disabled { f.reset() + close(f.disabledCh) return } log.Info("Started log indexer") for !f.stop { if !f.indexedRange.initialized { - if err := f.init(); err != nil { - log.Error("Error initializing log index", "error", err) - // unexpected error; there is not a lot we can do here, maybe it - // recovers, maybe not. Calling event processing here ensures - // that we can still properly shutdown in case of an infinite loop. + if f.targetView.headNumber == 0 { + // initialize when chain head is available f.processSingleEvent(true) continue } + if err := f.init(); err != nil { + log.Error("Error initializing log index; reverting to unindexed mode", "error", err) + f.reset() + f.disabled = true + close(f.disabledCh) + return + } } if !f.targetHeadIndexed() { if !f.tryIndexHead() { diff --git a/core/filtermaps/matcher_backend.go b/core/filtermaps/matcher_backend.go index f24e9706cb..01bae7bb22 100644 --- a/core/filtermaps/matcher_backend.go +++ b/core/filtermaps/matcher_backend.go @@ -141,10 +141,6 @@ func (fm *FilterMapsMatcherBackend) synced() { // range that has not been changed and has been consistent with all states of the // chain since the previous SyncLogIndex or the creation of the matcher backend. func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange, error) { - if fm.f.disabled { - return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil - } - syncCh := make(chan SyncRange, 1) fm.f.matchersLock.Lock() fm.syncCh = syncCh @@ -154,12 +150,16 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange case fm.f.matcherSyncCh <- fm: case <-ctx.Done(): return SyncRange{}, ctx.Err() + case <-fm.f.disabledCh: + return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil } select { case vr := <-syncCh: return vr, nil case <-ctx.Done(): return SyncRange{}, ctx.Err() + case <-fm.f.disabledCh: + return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil } } diff --git a/eth/backend.go b/eth/backend.go index 6deedab872..909d153a2b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -241,7 +241,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } fmConfig := filtermaps.Config{History: config.LogHistory, Disabled: config.LogNoHistory, ExportFileName: config.LogExportCheckpoints} chainView := eth.newChainView(eth.blockchain.CurrentBlock()) - eth.filterMaps = filtermaps.NewFilterMaps(chainDb, chainView, 0, 0, filtermaps.DefaultParams, fmConfig) + historyCutoff := eth.blockchain.HistoryPruningCutoff() + var finalBlock uint64 + if fb := eth.blockchain.CurrentFinalBlock(); fb != nil { + finalBlock = fb.Number.Uint64() + } + eth.filterMaps = filtermaps.NewFilterMaps(chainDb, chainView, historyCutoff, finalBlock, filtermaps.DefaultParams, fmConfig) eth.closeFilterMaps = make(chan chan struct{}) if config.BlobPool.Datadir != "" {