mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-02-26 15:47:21 +00:00
core/filtermaps: allow log search while head indexing (#31429)
This PR changes the matcher syncing conditions so that it is possible to run a search while head indexing is in progress. Previously it was a requirement to have the head indexed in order to perform matcher sync before and after a search. This was unnecessarily strict as the purpose was just to avoid syncing the valid range with the temporary shortened indexed range applied while updating existing head maps. Now the sync condition explicitly checks whether the indexer has a temporary indexed range with some head maps being partially updated. It also fixes a deadlock that happened when matcher synchronization was attempted in the event handler called from the `writeFinishedMaps` periodical callback.
This commit is contained in:
parent
0a8f41e2cb
commit
9fc2bbe1ce
4 changed files with 22 additions and 12 deletions
|
|
@ -70,6 +70,7 @@ type FilterMaps struct {
|
|||
indexLock sync.RWMutex
|
||||
indexedRange filterMapsRange
|
||||
indexedView *ChainView // always consistent with the log index
|
||||
hasTempRange bool
|
||||
|
||||
// also accessed by indexer and matcher backend but no locking needed.
|
||||
filterMapCache *lru.Cache[uint32, filterMap]
|
||||
|
|
@ -94,7 +95,7 @@ type FilterMaps struct {
|
|||
ptrTailUnindexMap uint32
|
||||
|
||||
targetView *ChainView
|
||||
matcherSyncRequest *FilterMapsMatcherBackend
|
||||
matcherSyncRequests []*FilterMapsMatcherBackend
|
||||
historyCutoff uint64
|
||||
finalBlock, lastFinal uint64
|
||||
lastFinalEpoch uint32
|
||||
|
|
@ -330,7 +331,7 @@ func (f *FilterMaps) init() error {
|
|||
fmr.blocks = common.NewRange(cp.BlockNumber+1, 0)
|
||||
fmr.maps = common.NewRange(uint32(bestLen)<<f.logMapsPerEpoch, 0)
|
||||
}
|
||||
f.setRange(batch, f.targetView, fmr)
|
||||
f.setRange(batch, f.targetView, fmr, false)
|
||||
return batch.Write()
|
||||
}
|
||||
|
||||
|
|
@ -373,9 +374,10 @@ func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool {
|
|||
// setRange updates the indexed chain view and covered range and also adds the
|
||||
// changes to the given batch.
|
||||
// Note that this function assumes that the index write lock is being held.
|
||||
func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, newRange filterMapsRange) {
|
||||
func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, newRange filterMapsRange, isTempRange bool) {
|
||||
f.indexedView = newView
|
||||
f.indexedRange = newRange
|
||||
f.hasTempRange = isTempRange
|
||||
f.updateMatchersValidRange()
|
||||
if newRange.initialized {
|
||||
rs := rawdb.FilterMapsRange{
|
||||
|
|
@ -666,7 +668,7 @@ func (f *FilterMaps) deleteTailEpoch(epoch uint32) error {
|
|||
} else {
|
||||
return errors.New("invalid tail epoch number")
|
||||
}
|
||||
f.setRange(f.db, f.indexedView, fmr)
|
||||
f.setRange(f.db, f.indexedView, fmr, false)
|
||||
first := f.mapRowIndex(firstMap, 0)
|
||||
count := f.mapRowIndex(firstMap+f.mapsPerEpoch, 0) - first
|
||||
rawdb.DeleteFilterMapRows(f.db, common.NewRange(first, count))
|
||||
|
|
|
|||
|
|
@ -136,15 +136,18 @@ func (f *FilterMaps) processEvents() {
|
|||
// processSingleEvent processes a single event either in a blocking or
|
||||
// non-blocking manner.
|
||||
func (f *FilterMaps) processSingleEvent(blocking bool) bool {
|
||||
if f.matcherSyncRequest != nil && f.targetHeadIndexed() {
|
||||
f.matcherSyncRequest.synced()
|
||||
f.matcherSyncRequest = nil
|
||||
if !f.hasTempRange {
|
||||
for _, mb := range f.matcherSyncRequests {
|
||||
mb.synced()
|
||||
}
|
||||
f.matcherSyncRequests = nil
|
||||
}
|
||||
if blocking {
|
||||
select {
|
||||
case target := <-f.targetCh:
|
||||
f.setTarget(target)
|
||||
case f.matcherSyncRequest = <-f.matcherSyncCh:
|
||||
case mb := <-f.matcherSyncCh:
|
||||
f.matcherSyncRequests = append(f.matcherSyncRequests, mb)
|
||||
case f.blockProcessing = <-f.blockProcessingCh:
|
||||
case <-f.closeCh:
|
||||
f.stop = true
|
||||
|
|
@ -160,7 +163,8 @@ func (f *FilterMaps) processSingleEvent(blocking bool) bool {
|
|||
select {
|
||||
case target := <-f.targetCh:
|
||||
f.setTarget(target)
|
||||
case f.matcherSyncRequest = <-f.matcherSyncCh:
|
||||
case mb := <-f.matcherSyncCh:
|
||||
f.matcherSyncRequests = append(f.matcherSyncRequests, mb)
|
||||
case f.blockProcessing = <-f.blockProcessingCh:
|
||||
case <-f.closeCh:
|
||||
f.stop = true
|
||||
|
|
|
|||
|
|
@ -392,12 +392,16 @@ func (r *mapRenderer) writeFinishedMaps(pauseCb func() bool) error {
|
|||
}
|
||||
// do not exit while in partially written state but do allow processing
|
||||
// events and pausing while block processing is in progress
|
||||
r.f.indexLock.Unlock()
|
||||
pauseCb()
|
||||
r.f.indexLock.Lock()
|
||||
batch = r.f.db.NewBatch()
|
||||
}
|
||||
}
|
||||
|
||||
r.f.setRange(batch, r.f.indexedView, tempRange)
|
||||
if tempRange != r.f.indexedRange {
|
||||
r.f.setRange(batch, r.f.indexedView, tempRange, true)
|
||||
}
|
||||
// add or update filter rows
|
||||
for rowIndex := uint32(0); rowIndex < r.f.mapHeight; rowIndex++ {
|
||||
var (
|
||||
|
|
@ -469,7 +473,7 @@ func (r *mapRenderer) writeFinishedMaps(pauseCb func() bool) error {
|
|||
}
|
||||
r.finishedMaps = make(map[uint32]*renderedMap)
|
||||
r.finished.SetFirst(r.finished.AfterLast())
|
||||
r.f.setRange(batch, renderedView, newRange)
|
||||
r.f.setRange(batch, renderedView, newRange, false)
|
||||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Error writing log index update batch", "error", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -125,7 +125,7 @@ func (fm *FilterMapsMatcherBackend) synced() {
|
|||
indexedBlocks.SetAfterLast(indexedBlocks.Last()) // remove partially indexed last block
|
||||
}
|
||||
fm.syncCh <- SyncRange{
|
||||
HeadNumber: fm.f.indexedView.headNumber,
|
||||
HeadNumber: fm.f.targetView.headNumber,
|
||||
ValidBlocks: fm.validBlocks,
|
||||
IndexedBlocks: indexedBlocks,
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue