mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-03-04 10:25:04 +00:00
core/filtermaps: fix deadlock in filtermap callback (#31708)
This PR fixes a deadlock situation is deleteTailEpoch that might arise when range delete is running in iterator based fallback mode (either using leveldb database or the hashdb state storage scheme). In this case a stopCb callback is called periodically that does check events, including matcher sync requests, in which case it tries to acquire indexLock for read access, while deleteTailEpoch already held it for write access. This pull request removes the indexLock acquiring in `FilterMapsMatcherBackend.synced` as this function is only called in the indexLoop. Fixes https://github.com/ethereum/go-ethereum/issues/31700
This commit is contained in:
parent
b62756d1a3
commit
b6bdd698a0
3 changed files with 50 additions and 24 deletions
|
|
@ -85,11 +85,17 @@ type FilterMaps struct {
|
|||
// fields written by the indexer and read by matcher backend. Indexer can
|
||||
// read them without a lock and write them under indexLock write lock.
|
||||
// Matcher backend can read them under indexLock read lock.
|
||||
indexLock sync.RWMutex
|
||||
indexedRange filterMapsRange
|
||||
cleanedEpochsBefore uint32 // all unindexed data cleaned before this point
|
||||
indexedView *ChainView // always consistent with the log index
|
||||
hasTempRange bool
|
||||
indexLock sync.RWMutex
|
||||
indexedRange filterMapsRange
|
||||
indexedView *ChainView // always consistent with the log index
|
||||
hasTempRange bool
|
||||
|
||||
// cleanedEpochsBefore indicates that all unindexed data before this point
|
||||
// has been cleaned.
|
||||
//
|
||||
// This field is only accessed and modified within tryUnindexTail, so no
|
||||
// explicit locking is required.
|
||||
cleanedEpochsBefore uint32
|
||||
|
||||
// also accessed by indexer and matcher backend but no locking needed.
|
||||
filterMapCache *lru.Cache[uint32, filterMap]
|
||||
|
|
@ -248,15 +254,16 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
|
|||
},
|
||||
// deleting last unindexed epoch might have been interrupted by shutdown
|
||||
cleanedEpochsBefore: max(rs.MapsFirst>>params.logMapsPerEpoch, 1) - 1,
|
||||
historyCutoff: historyCutoff,
|
||||
finalBlock: finalBlock,
|
||||
matcherSyncCh: make(chan *FilterMapsMatcherBackend),
|
||||
matchers: make(map[*FilterMapsMatcherBackend]struct{}),
|
||||
filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps),
|
||||
lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks),
|
||||
lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers),
|
||||
baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows),
|
||||
renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots),
|
||||
|
||||
historyCutoff: historyCutoff,
|
||||
finalBlock: finalBlock,
|
||||
matcherSyncCh: make(chan *FilterMapsMatcherBackend),
|
||||
matchers: make(map[*FilterMapsMatcherBackend]struct{}),
|
||||
filterMapCache: lru.NewCache[uint32, filterMap](cachedFilterMaps),
|
||||
lastBlockCache: lru.NewCache[uint32, lastBlockOfMap](cachedLastBlocks),
|
||||
lvPointerCache: lru.NewCache[uint64, uint64](cachedLvPointers),
|
||||
baseRowsCache: lru.NewCache[uint64, [][]uint32](cachedBaseRows),
|
||||
renderSnapshots: lru.NewCache[uint64, *renderedMap](cachedRenderSnapshots),
|
||||
}
|
||||
|
||||
// Set initial indexer target.
|
||||
|
|
@ -444,6 +451,7 @@ func (f *FilterMaps) safeDeleteWithLogs(deleteFn func(db ethdb.KeyValueStore, ha
|
|||
|
||||
// setRange updates the indexed chain view and covered range and also adds the
|
||||
// changes to the given batch.
|
||||
//
|
||||
// Note that this function assumes that the index write lock is being held.
|
||||
func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, newRange filterMapsRange, isTempRange bool) {
|
||||
f.indexedView = newView
|
||||
|
|
@ -477,6 +485,7 @@ func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newView *ChainView, ne
|
|||
// Note that this function assumes that the log index structure is consistent
|
||||
// with the canonical chain at the point where the given log value index points.
|
||||
// If this is not the case then an invalid result or an error may be returned.
|
||||
//
|
||||
// Note that this function assumes that the indexer read lock is being held when
|
||||
// called from outside the indexerLoop goroutine.
|
||||
func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
|
||||
|
|
@ -655,6 +664,7 @@ func (f *FilterMaps) mapRowIndex(mapIndex, rowIndex uint32) uint64 {
|
|||
// getBlockLvPointer returns the starting log value index where the log values
|
||||
// generated by the given block are located. If blockNumber is beyond the current
|
||||
// head then the first unoccupied log value index is returned.
|
||||
//
|
||||
// Note that this function assumes that the indexer read lock is being held when
|
||||
// called from outside the indexerLoop goroutine.
|
||||
func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) {
|
||||
|
|
@ -762,7 +772,7 @@ func (f *FilterMaps) deleteTailEpoch(epoch uint32) (bool, error) {
|
|||
return false, errors.New("invalid tail epoch number")
|
||||
}
|
||||
// remove index data
|
||||
if err := f.safeDeleteWithLogs(func(db ethdb.KeyValueStore, hashScheme bool, stopCb func(bool) bool) error {
|
||||
deleteFn := func(db ethdb.KeyValueStore, hashScheme bool, stopCb func(bool) bool) error {
|
||||
first := f.mapRowIndex(firstMap, 0)
|
||||
count := f.mapRowIndex(firstMap+f.mapsPerEpoch, 0) - first
|
||||
if err := rawdb.DeleteFilterMapRows(f.db, common.NewRange(first, count), hashScheme, stopCb); err != nil {
|
||||
|
|
@ -786,10 +796,13 @@ func (f *FilterMaps) deleteTailEpoch(epoch uint32) (bool, error) {
|
|||
f.lvPointerCache.Remove(blockNumber)
|
||||
}
|
||||
return nil
|
||||
}, fmt.Sprintf("Deleting tail epoch #%d", epoch), func() bool {
|
||||
}
|
||||
action := fmt.Sprintf("Deleting tail epoch #%d", epoch)
|
||||
stopFn := func() bool {
|
||||
f.processEvents()
|
||||
return f.stop || !f.targetHeadIndexed()
|
||||
}); err == nil {
|
||||
}
|
||||
if err := f.safeDeleteWithLogs(deleteFn, action, stopFn); err == nil {
|
||||
// everything removed; mark as cleaned and report success
|
||||
if f.cleanedEpochsBefore == epoch {
|
||||
f.cleanedEpochsBefore = epoch + 1
|
||||
|
|
@ -808,6 +821,9 @@ func (f *FilterMaps) deleteTailEpoch(epoch uint32) (bool, error) {
|
|||
}
|
||||
|
||||
// exportCheckpoints exports epoch checkpoints in the format used by checkpoints.go.
|
||||
//
|
||||
// Note: acquiring the indexLock read lock is unnecessary here, as this function
|
||||
// is always called within the indexLoop.
|
||||
func (f *FilterMaps) exportCheckpoints() {
|
||||
finalLvPtr, err := f.getBlockLvPointer(f.finalBlock + 1)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -43,6 +43,8 @@ func (f *FilterMaps) indexerLoop() {
|
|||
log.Info("Started log indexer")
|
||||
|
||||
for !f.stop {
|
||||
// Note: acquiring the indexLock read lock is unnecessary here,
|
||||
// as the `indexedRange` is accessed within the indexerLoop.
|
||||
if !f.indexedRange.initialized {
|
||||
if f.targetView.HeadNumber() == 0 {
|
||||
// initialize when chain head is available
|
||||
|
|
@ -105,7 +107,7 @@ type targetUpdate struct {
|
|||
historyCutoff, finalBlock uint64
|
||||
}
|
||||
|
||||
// SetTargetView sets a new target chain view for the indexer to render.
|
||||
// SetTarget sets a new target chain view for the indexer to render.
|
||||
// Note that SetTargetView never blocks.
|
||||
func (f *FilterMaps) SetTarget(targetView *ChainView, historyCutoff, finalBlock uint64) {
|
||||
if targetView == nil {
|
||||
|
|
@ -178,6 +180,8 @@ func (f *FilterMaps) processSingleEvent(blocking bool) bool {
|
|||
if f.stop {
|
||||
return false
|
||||
}
|
||||
// Note: acquiring the indexLock read lock is unnecessary here,
|
||||
// as this function is always called within the indexLoop.
|
||||
if !f.hasTempRange {
|
||||
for _, mb := range f.matcherSyncRequests {
|
||||
mb.synced()
|
||||
|
|
|
|||
|
|
@ -111,17 +111,17 @@ func (fm *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex
|
|||
// synced signals to the matcher that has triggered a synchronisation that it
|
||||
// has been finished and the log index is consistent with the chain head passed
|
||||
// as a parameter.
|
||||
//
|
||||
// Note that if the log index head was far behind the chain head then it might not
|
||||
// be synced up to the given head in a single step. Still, the latest chain head
|
||||
// should be passed as a parameter and the existing log index should be consistent
|
||||
// with that chain.
|
||||
//
|
||||
// Note: acquiring the indexLock read lock is unnecessary here, as this function
|
||||
// is always called within the indexLoop.
|
||||
func (fm *FilterMapsMatcherBackend) synced() {
|
||||
fm.f.indexLock.RLock()
|
||||
fm.f.matchersLock.Lock()
|
||||
defer func() {
|
||||
fm.f.matchersLock.Unlock()
|
||||
fm.f.indexLock.RUnlock()
|
||||
}()
|
||||
defer fm.f.matchersLock.Unlock()
|
||||
|
||||
indexedBlocks := fm.f.indexedRange.blocks
|
||||
if !fm.f.indexedRange.headIndexed && !indexedBlocks.IsEmpty() {
|
||||
|
|
@ -154,6 +154,8 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
|
|||
case <-ctx.Done():
|
||||
return SyncRange{}, ctx.Err()
|
||||
case <-fm.f.disabledCh:
|
||||
// Note: acquiring the indexLock read lock is unnecessary here,
|
||||
// as the indexer has already been terminated.
|
||||
return SyncRange{IndexedView: fm.f.indexedView}, nil
|
||||
}
|
||||
select {
|
||||
|
|
@ -162,6 +164,8 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
|
|||
case <-ctx.Done():
|
||||
return SyncRange{}, ctx.Err()
|
||||
case <-fm.f.disabledCh:
|
||||
// Note: acquiring the indexLock read lock is unnecessary here,
|
||||
// as the indexer has already been terminated.
|
||||
return SyncRange{IndexedView: fm.f.indexedView}, nil
|
||||
}
|
||||
}
|
||||
|
|
@ -170,7 +174,9 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
|
|||
// valid range with the current indexed range. This function should be called
|
||||
// whenever a part of the log index has been removed, before adding new blocks
|
||||
// to it.
|
||||
// Note that this function assumes that the index read lock is being held.
|
||||
//
|
||||
// Note: acquiring the indexLock read lock is unnecessary here, as this function
|
||||
// is always called within the indexLoop.
|
||||
func (f *FilterMaps) updateMatchersValidRange() {
|
||||
f.matchersLock.Lock()
|
||||
defer f.matchersLock.Unlock()
|
||||
|
|
|
|||
Loading…
Reference in a new issue