mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-02-26 15:47:21 +00:00
core/filtermaps: fix log indexer init conditions (#31455)
This PR adds an extra condition to the log indexer initialization in order to avoid initializing with block 0 as target head. Previously this caused the indexer to initialize without a checkpoint. Later, when the real chain head was set, it indexed the entire history, then unindexed most of it if only the recent history was supposed to be indexed. Now the init only happens when there is an actual synced chain head and therefore the index is initialized at the most recent checkpoint and only the last year is indexed according to the default parameters. During checkpoint initialization the best available checkpoint is also checked against the history cutoff point and fails if the indexing would have to start from a block older than the cutoff. If initialization fails then the indexer reverts to unindexed mode instead of retrying because the the failure conditions cannot be expected to recover later.
This commit is contained in:
parent
fd4049dc1e
commit
cbe902d5da
4 changed files with 45 additions and 14 deletions
|
|
@ -53,7 +53,8 @@ type FilterMaps struct {
|
|||
// This is configured by the --history.logs.disable Geth flag.
|
||||
// We chose to implement disabling this way because it requires less special
|
||||
// case logic in eth/filters.
|
||||
disabled bool
|
||||
disabled bool
|
||||
disabledCh chan struct{} // closed by indexer if disabled
|
||||
|
||||
closeCh chan struct{}
|
||||
closeWg sync.WaitGroup
|
||||
|
|
@ -196,6 +197,7 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
|
|||
blockProcessingCh: make(chan bool, 1),
|
||||
history: config.History,
|
||||
disabled: config.Disabled,
|
||||
disabledCh: make(chan struct{}),
|
||||
exportFileName: config.ExportFileName,
|
||||
Params: params,
|
||||
indexedRange: filterMapsRange{
|
||||
|
|
@ -206,6 +208,8 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
|
|||
maps: common.NewRange(rs.MapsFirst, rs.MapsAfterLast-rs.MapsFirst),
|
||||
tailPartialEpoch: rs.TailPartialEpoch,
|
||||
},
|
||||
historyCutoff: historyCutoff,
|
||||
finalBlock: finalBlock,
|
||||
matcherSyncCh: make(chan *FilterMapsMatcherBackend),
|
||||
matchers: make(map[*FilterMapsMatcherBackend]struct{}),
|
||||
filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps),
|
||||
|
|
@ -278,8 +282,13 @@ func (f *FilterMaps) initChainView(chainView *ChainView) *ChainView {
|
|||
}
|
||||
|
||||
// reset un-initializes the FilterMaps structure and removes all related data from
|
||||
// the database. The function returns true if everything was successfully removed.
|
||||
func (f *FilterMaps) reset() bool {
|
||||
// the database.
|
||||
// Note that in case of leveldb database the fallback implementation of DeleteRange
|
||||
// might take a long time to finish and deleting the entire database may be
|
||||
// interrupted by a shutdown. Deleting the filterMapsRange entry first does
|
||||
// guarantee though that the next init() will not return successfully until the
|
||||
// entire database has been cleaned.
|
||||
func (f *FilterMaps) reset() {
|
||||
f.indexLock.Lock()
|
||||
f.indexedRange = filterMapsRange{}
|
||||
f.indexedView = nil
|
||||
|
|
@ -292,11 +301,16 @@ func (f *FilterMaps) reset() bool {
|
|||
// deleting the range first ensures that resetDb will be called again at next
|
||||
// startup and any leftover data will be removed even if it cannot finish now.
|
||||
rawdb.DeleteFilterMapsRange(f.db)
|
||||
return f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database")
|
||||
f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database")
|
||||
}
|
||||
|
||||
// init initializes an empty log index according to the current targetView.
|
||||
func (f *FilterMaps) init() error {
|
||||
// ensure that there is no remaining data in the filter maps key range
|
||||
if !f.safeDeleteRange(rawdb.DeleteFilterMapsDb, "Resetting log index database") {
|
||||
return errors.New("could not reset log index database")
|
||||
}
|
||||
|
||||
f.indexLock.Lock()
|
||||
defer f.indexLock.Unlock()
|
||||
|
||||
|
|
@ -317,6 +331,13 @@ func (f *FilterMaps) init() error {
|
|||
bestIdx, bestLen = idx, max
|
||||
}
|
||||
}
|
||||
var initBlockNumber uint64
|
||||
if bestLen > 0 {
|
||||
initBlockNumber = checkpoints[bestIdx][bestLen-1].BlockNumber
|
||||
}
|
||||
if initBlockNumber < f.historyCutoff {
|
||||
return errors.New("cannot start indexing before history cutoff point")
|
||||
}
|
||||
batch := f.db.NewBatch()
|
||||
for epoch := range bestLen {
|
||||
cp := checkpoints[bestIdx][epoch]
|
||||
|
|
|
|||
|
|
@ -36,20 +36,25 @@ func (f *FilterMaps) indexerLoop() {
|
|||
|
||||
if f.disabled {
|
||||
f.reset()
|
||||
close(f.disabledCh)
|
||||
return
|
||||
}
|
||||
log.Info("Started log indexer")
|
||||
|
||||
for !f.stop {
|
||||
if !f.indexedRange.initialized {
|
||||
if err := f.init(); err != nil {
|
||||
log.Error("Error initializing log index", "error", err)
|
||||
// unexpected error; there is not a lot we can do here, maybe it
|
||||
// recovers, maybe not. Calling event processing here ensures
|
||||
// that we can still properly shutdown in case of an infinite loop.
|
||||
if f.targetView.headNumber == 0 {
|
||||
// initialize when chain head is available
|
||||
f.processSingleEvent(true)
|
||||
continue
|
||||
}
|
||||
if err := f.init(); err != nil {
|
||||
log.Error("Error initializing log index; reverting to unindexed mode", "error", err)
|
||||
f.reset()
|
||||
f.disabled = true
|
||||
close(f.disabledCh)
|
||||
return
|
||||
}
|
||||
}
|
||||
if !f.targetHeadIndexed() {
|
||||
if !f.tryIndexHead() {
|
||||
|
|
|
|||
|
|
@ -141,10 +141,6 @@ func (fm *FilterMapsMatcherBackend) synced() {
|
|||
// range that has not been changed and has been consistent with all states of the
|
||||
// chain since the previous SyncLogIndex or the creation of the matcher backend.
|
||||
func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange, error) {
|
||||
if fm.f.disabled {
|
||||
return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil
|
||||
}
|
||||
|
||||
syncCh := make(chan SyncRange, 1)
|
||||
fm.f.matchersLock.Lock()
|
||||
fm.syncCh = syncCh
|
||||
|
|
@ -154,12 +150,16 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
|
|||
case fm.f.matcherSyncCh <- fm:
|
||||
case <-ctx.Done():
|
||||
return SyncRange{}, ctx.Err()
|
||||
case <-fm.f.disabledCh:
|
||||
return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil
|
||||
}
|
||||
select {
|
||||
case vr := <-syncCh:
|
||||
return vr, nil
|
||||
case <-ctx.Done():
|
||||
return SyncRange{}, ctx.Err()
|
||||
case <-fm.f.disabledCh:
|
||||
return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -241,7 +241,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
|
|||
}
|
||||
fmConfig := filtermaps.Config{History: config.LogHistory, Disabled: config.LogNoHistory, ExportFileName: config.LogExportCheckpoints}
|
||||
chainView := eth.newChainView(eth.blockchain.CurrentBlock())
|
||||
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, chainView, 0, 0, filtermaps.DefaultParams, fmConfig)
|
||||
historyCutoff := eth.blockchain.HistoryPruningCutoff()
|
||||
var finalBlock uint64
|
||||
if fb := eth.blockchain.CurrentFinalBlock(); fb != nil {
|
||||
finalBlock = fb.Number.Uint64()
|
||||
}
|
||||
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, chainView, historyCutoff, finalBlock, filtermaps.DefaultParams, fmConfig)
|
||||
eth.closeFilterMaps = make(chan chan struct{})
|
||||
|
||||
if config.BlobPool.Datadir != "" {
|
||||
|
|
|
|||
Loading…
Reference in a new issue