diff --git a/accounts/abi/abigen/bind_test.go b/accounts/abi/abigen/bind_test.go index 195064fb7a..b3c52e81e5 100644 --- a/accounts/abi/abigen/bind_test.go +++ b/accounts/abi/abigen/bind_test.go @@ -939,6 +939,7 @@ var bindTests = []struct { if _, err := eventer.RaiseSimpleEvent(auth, common.Address{byte(j)}, [32]byte{byte(j)}, true, big.NewInt(int64(10*i+j))); err != nil { t.Fatalf("block %d, event %d: raise failed: %v", i, j, err) } + time.Sleep(time.Millisecond * 200) } sim.Commit() } @@ -1495,7 +1496,7 @@ var bindTests = []struct { if n != 3 { t.Fatalf("Invalid bar0 event") } - case <-time.NewTimer(3 * time.Second).C: + case <-time.NewTimer(10 * time.Second).C: t.Fatalf("Wait bar0 event timeout") } @@ -1506,7 +1507,7 @@ var bindTests = []struct { if n != 1 { t.Fatalf("Invalid bar event") } - case <-time.NewTimer(3 * time.Second).C: + case <-time.NewTimer(10 * time.Second).C: t.Fatalf("Wait bar event timeout") } close(stopCh) diff --git a/core/filtermaps/chain_view.go b/core/filtermaps/chain_view.go index d6f0a727bf..aa74f3901a 100644 --- a/core/filtermaps/chain_view.go +++ b/core/filtermaps/chain_view.go @@ -58,47 +58,75 @@ func NewChainView(chain blockchain, number uint64, hash common.Hash) *ChainView return cv } -// getBlockHash returns the block hash belonging to the given block number. +// HeadNumber returns the head block number of the chain view. +func (cv *ChainView) HeadNumber() uint64 { + return cv.headNumber +} + +// BlockHash returns the block hash belonging to the given block number. // Note that the hash of the head block is not returned because ChainView might // represent a view where the head block is currently being created. -func (cv *ChainView) getBlockHash(number uint64) common.Hash { - if number >= cv.headNumber { +func (cv *ChainView) BlockHash(number uint64) common.Hash { + cv.lock.Lock() + defer cv.lock.Unlock() + + if number > cv.headNumber { panic("invalid block number") } return cv.blockHash(number) } -// getBlockId returns the unique block id belonging to the given block number. +// BlockId returns the unique block id belonging to the given block number. // Note that it is currently equal to the block hash. In the future it might // be a different id for future blocks if the log index root becomes part of // consensus and therefore rendering the index with the new head will happen // before the hash of that new head is available. -func (cv *ChainView) getBlockId(number uint64) common.Hash { +func (cv *ChainView) BlockId(number uint64) common.Hash { + cv.lock.Lock() + defer cv.lock.Unlock() + if number > cv.headNumber { panic("invalid block number") } return cv.blockHash(number) } -// getReceipts returns the set of receipts belonging to the block at the given +// Header returns the block header at the given block number. +func (cv *ChainView) Header(number uint64) *types.Header { + return cv.chain.GetHeader(cv.BlockHash(number), number) +} + +// Receipts returns the set of receipts belonging to the block at the given // block number. -func (cv *ChainView) getReceipts(number uint64) types.Receipts { - if number > cv.headNumber { - panic("invalid block number") - } - blockHash := cv.blockHash(number) +func (cv *ChainView) Receipts(number uint64) types.Receipts { + blockHash := cv.BlockHash(number) if blockHash == (common.Hash{}) { log.Error("Chain view: block hash unavailable", "number", number, "head", cv.headNumber) } return cv.chain.GetReceiptsByHash(blockHash) } +// SharedRange returns the block range shared by two chain views. +func (cv *ChainView) SharedRange(cv2 *ChainView) common.Range[uint64] { + cv.lock.Lock() + defer cv.lock.Unlock() + + if cv == nil || cv2 == nil || !cv.extendNonCanonical() || !cv2.extendNonCanonical() { + return common.Range[uint64]{} + } + var sharedLen uint64 + for n := min(cv.headNumber+1-uint64(len(cv.hashes)), cv2.headNumber+1-uint64(len(cv2.hashes))); n <= cv.headNumber && n <= cv2.headNumber && cv.blockHash(n) == cv2.blockHash(n); n++ { + sharedLen = n + 1 + } + return common.NewRange(0, sharedLen) +} + // limitedView returns a new chain view that is a truncated version of the parent view. func (cv *ChainView) limitedView(newHead uint64) *ChainView { if newHead >= cv.headNumber { return cv } - return NewChainView(cv.chain, newHead, cv.blockHash(newHead)) + return NewChainView(cv.chain, newHead, cv.BlockHash(newHead)) } // equalViews returns true if the two chain views are equivalent. @@ -106,7 +134,7 @@ func equalViews(cv1, cv2 *ChainView) bool { if cv1 == nil || cv2 == nil { return false } - return cv1.headNumber == cv2.headNumber && cv1.getBlockId(cv1.headNumber) == cv2.getBlockId(cv2.headNumber) + return cv1.headNumber == cv2.headNumber && cv1.BlockId(cv1.headNumber) == cv2.BlockId(cv2.headNumber) } // matchViews returns true if the two chain views are equivalent up until the @@ -120,9 +148,9 @@ func matchViews(cv1, cv2 *ChainView, number uint64) bool { return false } if number == cv1.headNumber || number == cv2.headNumber { - return cv1.getBlockId(number) == cv2.getBlockId(number) + return cv1.BlockId(number) == cv2.BlockId(number) } - return cv1.getBlockHash(number) == cv2.getBlockHash(number) + return cv1.BlockHash(number) == cv2.BlockHash(number) } // extendNonCanonical checks whether the previously known reverse list of head @@ -150,9 +178,6 @@ func (cv *ChainView) extendNonCanonical() bool { // blockHash returns the given block hash without doing the head number check. func (cv *ChainView) blockHash(number uint64) common.Hash { - cv.lock.Lock() - defer cv.lock.Unlock() - if number+uint64(len(cv.hashes)) <= cv.headNumber { hash := cv.chain.GetCanonicalHash(number) if !cv.extendNonCanonical() { diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index fa2d6e3ffb..a617de8968 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -262,7 +262,7 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f f.targetView = initView if f.indexedRange.initialized { f.indexedView = f.initChainView(f.targetView) - f.indexedRange.headIndexed = f.indexedRange.blocks.AfterLast() == f.indexedView.headNumber+1 + f.indexedRange.headIndexed = f.indexedRange.blocks.AfterLast() == f.indexedView.HeadNumber()+1 if !f.indexedRange.headIndexed { f.indexedRange.headDelimiter = 0 } @@ -313,7 +313,7 @@ func (f *FilterMaps) initChainView(chainView *ChainView) *ChainView { log.Error("Could not initialize indexed chain view", "error", err) break } - if lastBlockNumber <= chainView.headNumber && chainView.getBlockId(lastBlockNumber) == lastBlockId { + if lastBlockNumber <= chainView.HeadNumber() && chainView.BlockId(lastBlockNumber) == lastBlockId { return chainView.limitedView(lastBlockNumber) } } @@ -370,7 +370,7 @@ func (f *FilterMaps) init() error { for min < max { mid := (min + max + 1) / 2 cp := checkpointList[mid-1] - if cp.BlockNumber <= f.targetView.headNumber && f.targetView.getBlockId(cp.BlockNumber) == cp.BlockId { + if cp.BlockNumber <= f.targetView.HeadNumber() && f.targetView.BlockId(cp.BlockNumber) == cp.BlockId { min = mid } else { max = mid - 1 @@ -512,7 +512,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { } } // get block receipts - receipts := f.indexedView.getReceipts(firstBlockNumber) + receipts := f.indexedView.Receipts(firstBlockNumber) if receipts == nil { return nil, fmt.Errorf("failed to retrieve receipts for block %d containing searched log value index %d: %v", firstBlockNumber, lvIndex, err) } diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 9a5424da4a..383ec078c9 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -44,7 +44,7 @@ func (f *FilterMaps) indexerLoop() { for !f.stop { if !f.indexedRange.initialized { - if f.targetView.headNumber == 0 { + if f.targetView.HeadNumber() == 0 { // initialize when chain head is available f.processSingleEvent(true) continue @@ -249,7 +249,7 @@ func (f *FilterMaps) tryIndexHead() error { log.Info("Log index head rendering in progress", "first block", f.indexedRange.blocks.First(), "last block", f.indexedRange.blocks.Last(), "processed", f.indexedRange.blocks.AfterLast()-f.ptrHeadIndex, - "remaining", f.indexedView.headNumber-f.indexedRange.blocks.Last(), + "remaining", f.indexedView.HeadNumber()-f.indexedRange.blocks.Last(), "elapsed", common.PrettyDuration(time.Since(f.startedHeadIndexAt))) f.loggedHeadIndex = true f.lastLogHeadIndex = time.Now() @@ -418,10 +418,10 @@ func (f *FilterMaps) needTailEpoch(epoch uint32) bool { // tailTargetBlock returns the target value for the tail block number according // to the log history parameter and the current index head. func (f *FilterMaps) tailTargetBlock() uint64 { - if f.history == 0 || f.indexedView.headNumber < f.history { + if f.history == 0 || f.indexedView.HeadNumber() < f.history { return 0 } - return f.indexedView.headNumber + 1 - f.history + return f.indexedView.HeadNumber() + 1 - f.history } // tailPartialBlocks returns the number of rendered blocks in the partially diff --git a/core/filtermaps/map_renderer.go b/core/filtermaps/map_renderer.go index 23d84f0eca..7c2aa8dc32 100644 --- a/core/filtermaps/map_renderer.go +++ b/core/filtermaps/map_renderer.go @@ -143,8 +143,8 @@ func (f *FilterMaps) lastCanonicalSnapshotOfMap(mapIndex uint32) *renderedMap { var best *renderedMap for _, blockNumber := range f.renderSnapshots.Keys() { if cp, _ := f.renderSnapshots.Get(blockNumber); cp != nil && blockNumber < f.indexedRange.blocks.AfterLast() && - blockNumber <= f.indexedView.headNumber && f.indexedView.getBlockId(blockNumber) == cp.lastBlockId && - blockNumber <= f.targetView.headNumber && f.targetView.getBlockId(blockNumber) == cp.lastBlockId && + blockNumber <= f.indexedView.HeadNumber() && f.indexedView.BlockId(blockNumber) == cp.lastBlockId && + blockNumber <= f.targetView.HeadNumber() && f.targetView.BlockId(blockNumber) == cp.lastBlockId && cp.mapIndex == mapIndex && (best == nil || blockNumber > best.lastBlock) { best = cp } @@ -173,7 +173,7 @@ func (f *FilterMaps) lastCanonicalMapBoundaryBefore(renderBefore uint32) (nextMa return 0, 0, 0, fmt.Errorf("failed to retrieve last block of reverse iterated map %d: %v", mapIndex, err) } if (f.indexedRange.headIndexed && mapIndex >= f.indexedRange.maps.Last()) || - lastBlock >= f.targetView.headNumber || lastBlockId != f.targetView.getBlockId(lastBlock) { + lastBlock >= f.targetView.HeadNumber() || lastBlockId != f.targetView.BlockId(lastBlock) { continue // map is not full or inconsistent with targetView; roll back } lvPtr, err := f.getBlockLvPointer(lastBlock) @@ -247,7 +247,7 @@ func (f *FilterMaps) loadHeadSnapshot() error { filterMap: fm, mapIndex: f.indexedRange.maps.Last(), lastBlock: f.indexedRange.blocks.Last(), - lastBlockId: f.indexedView.getBlockId(f.indexedRange.blocks.Last()), + lastBlockId: f.indexedView.BlockId(f.indexedRange.blocks.Last()), blockLvPtrs: lvPtrs, finished: true, headDelimiter: f.indexedRange.headDelimiter, @@ -264,7 +264,7 @@ func (r *mapRenderer) makeSnapshot() { filterMap: r.currentMap.filterMap.fastCopy(), mapIndex: r.currentMap.mapIndex, lastBlock: r.currentMap.lastBlock, - lastBlockId: r.iterator.chainView.getBlockId(r.currentMap.lastBlock), + lastBlockId: r.iterator.chainView.BlockId(r.currentMap.lastBlock), blockLvPtrs: r.currentMap.blockLvPtrs, finished: true, headDelimiter: r.iterator.lvIndex, @@ -370,7 +370,7 @@ func (r *mapRenderer) renderCurrentMap(stopCb func() bool) (bool, error) { r.currentMap.finished = true r.currentMap.headDelimiter = r.iterator.lvIndex } - r.currentMap.lastBlockId = r.f.targetView.getBlockId(r.currentMap.lastBlock) + r.currentMap.lastBlockId = r.f.targetView.BlockId(r.currentMap.lastBlock) totalTime += time.Since(start) mapRenderTimer.Update(totalTime) mapLogValueMeter.Mark(logValuesProcessed) @@ -566,8 +566,8 @@ func (r *mapRenderer) getUpdatedRange() (filterMapsRange, error) { lm := r.finishedMaps[r.finished.Last()] newRange.headIndexed = lm.finished if lm.finished { - newRange.blocks.SetLast(r.f.targetView.headNumber) - if lm.lastBlock != r.f.targetView.headNumber { + newRange.blocks.SetLast(r.f.targetView.HeadNumber()) + if lm.lastBlock != r.f.targetView.HeadNumber() { panic("map rendering finished but last block != head block") } newRange.headDelimiter = lm.headDelimiter @@ -665,13 +665,13 @@ var errUnindexedRange = errors.New("unindexed range") // given block's first log value entry (the block delimiter), according to the // current targetView. func (f *FilterMaps) newLogIteratorFromBlockDelimiter(blockNumber, lvIndex uint64) (*logIterator, error) { - if blockNumber > f.targetView.headNumber { - return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", blockNumber, f.targetView.headNumber) + if blockNumber > f.targetView.HeadNumber() { + return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", blockNumber, f.targetView.HeadNumber()) } if !f.indexedRange.blocks.Includes(blockNumber) { return nil, errUnindexedRange } - finished := blockNumber == f.targetView.headNumber + finished := blockNumber == f.targetView.HeadNumber() l := &logIterator{ chainView: f.targetView, params: &f.Params, @@ -687,11 +687,11 @@ func (f *FilterMaps) newLogIteratorFromBlockDelimiter(blockNumber, lvIndex uint6 // newLogIteratorFromMapBoundary creates a logIterator starting at the given // map boundary, according to the current targetView. func (f *FilterMaps) newLogIteratorFromMapBoundary(mapIndex uint32, startBlock, startLvPtr uint64) (*logIterator, error) { - if startBlock > f.targetView.headNumber { - return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", startBlock, f.targetView.headNumber) + if startBlock > f.targetView.HeadNumber() { + return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", startBlock, f.targetView.HeadNumber()) } // get block receipts - receipts := f.targetView.getReceipts(startBlock) + receipts := f.targetView.Receipts(startBlock) if receipts == nil { return nil, fmt.Errorf("receipts not found for start block %d", startBlock) } @@ -758,7 +758,7 @@ func (l *logIterator) next() error { if l.delimiter { l.delimiter = false l.blockNumber++ - l.receipts = l.chainView.getReceipts(l.blockNumber) + l.receipts = l.chainView.Receipts(l.blockNumber) if l.receipts == nil { return fmt.Errorf("receipts not found for block %d", l.blockNumber) } @@ -795,7 +795,7 @@ func (l *logIterator) enforceValidState() { } l.logIndex = 0 } - if l.blockNumber == l.chainView.headNumber { + if l.blockNumber == l.chainView.HeadNumber() { l.finished = true } else { l.delimiter = true diff --git a/core/filtermaps/matcher.go b/core/filtermaps/matcher.go index 5738bf166a..a5eeaaa5f0 100644 --- a/core/filtermaps/matcher.go +++ b/core/filtermaps/matcher.go @@ -57,7 +57,7 @@ type MatcherBackend interface { // all states of the chain since the previous SyncLogIndex or the creation of // the matcher backend. type SyncRange struct { - HeadNumber uint64 + IndexedView *ChainView // block range where the index has not changed since the last matcher sync // and therefore the set of matches found in this region is guaranteed to // be valid and complete. diff --git a/core/filtermaps/matcher_backend.go b/core/filtermaps/matcher_backend.go index 335ac84551..ee18a0694c 100644 --- a/core/filtermaps/matcher_backend.go +++ b/core/filtermaps/matcher_backend.go @@ -128,7 +128,7 @@ func (fm *FilterMapsMatcherBackend) synced() { indexedBlocks.SetAfterLast(indexedBlocks.Last()) // remove partially indexed last block } fm.syncCh <- SyncRange{ - HeadNumber: fm.f.targetView.headNumber, + IndexedView: fm.f.indexedView, ValidBlocks: fm.validBlocks, IndexedBlocks: indexedBlocks, } @@ -154,7 +154,7 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange case <-ctx.Done(): return SyncRange{}, ctx.Err() case <-fm.f.disabledCh: - return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil + return SyncRange{IndexedView: fm.f.indexedView}, nil } select { case vr := <-syncCh: @@ -162,7 +162,7 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange case <-ctx.Done(): return SyncRange{}, ctx.Err() case <-fm.f.disabledCh: - return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil + return SyncRange{IndexedView: fm.f.indexedView}, nil } } diff --git a/eth/api_backend.go b/eth/api_backend.go index 64fb58a1fd..10f7ffcbce 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -443,6 +443,14 @@ func (b *EthAPIBackend) RPCTxFeeCap() float64 { return b.eth.config.RPCTxFeeCap } +func (b *EthAPIBackend) CurrentView() *filtermaps.ChainView { + head := b.eth.blockchain.CurrentBlock() + if head == nil { + return nil + } + return filtermaps.NewChainView(b.eth.blockchain, head.Number.Uint64(), head.Hash()) +} + func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend { return b.eth.filterMaps.NewMatcherBackend() } diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 78c80d8f26..dd6643c59e 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -146,25 +146,29 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { } const ( - rangeLogsTestSync = iota - rangeLogsTestTrimmed - rangeLogsTestIndexed - rangeLogsTestUnindexed - rangeLogsTestDone + rangeLogsTestDone = iota // zero range + rangeLogsTestSync // before sync; zero range + rangeLogsTestSynced // after sync; valid blocks range + rangeLogsTestIndexed // individual search range + rangeLogsTestUnindexed // individual search range + rangeLogsTestResults // results range after search iteration + rangeLogsTestReorg // results range trimmed by reorg ) type rangeLogsTestEvent struct { - event int - begin, end uint64 + event int + blocks common.Range[uint64] } // searchSession represents a single search session. type searchSession struct { - ctx context.Context - filter *Filter - mb filtermaps.MatcherBackend - syncRange filtermaps.SyncRange // latest synchronized state with the matcher - firstBlock, lastBlock uint64 // specified search range; each can be MaxUint64 + ctx context.Context + filter *Filter + mb filtermaps.MatcherBackend + syncRange filtermaps.SyncRange // latest synchronized state with the matcher + chainView *filtermaps.ChainView // can be more recent than the indexed view in syncRange + // block ranges always refer to the current chainView + firstBlock, lastBlock uint64 // specified search range; MaxUint64 means latest block searchRange common.Range[uint64] // actual search range; end trimmed to latest head matchRange common.Range[uint64] // range in which we have results (subset of searchRange) matches []*types.Log // valid set of matches in matchRange @@ -182,84 +186,99 @@ func newSearchSession(ctx context.Context, filter *Filter, mb filtermaps.Matcher } // enforce a consistent state before starting the search in order to be able // to determine valid range later - if err := s.syncMatcher(0); err != nil { + var err error + s.syncRange, err = s.mb.SyncLogIndex(s.ctx) + if err != nil { + return nil, err + } + if err := s.updateChainView(); err != nil { return nil, err } return s, nil } -// syncMatcher performs a synchronization step with the matcher. The resulting -// syncRange structure holds information about the latest range of indexed blocks -// and the guaranteed valid blocks whose log index have not been changed since -// the previous synchronization. -// The function also performs trimming of the match set in order to always keep -// it consistent with the synced matcher state. -// Tail trimming is only performed if the first block of the valid log index range -// is higher than trimTailThreshold. This is useful because unindexed log search -// is not affected by the valid tail (on the other hand, valid head is taken into -// account in order to provide reorg safety, even though the log index is not used). -// In case of indexed search the tail is only trimmed if the first part of the -// recently obtained results might be invalid. If guaranteed valid new results -// have been added at the head of previously validated results then there is no -// need to discard those even if the index tail have been unindexed since that. -func (s *searchSession) syncMatcher(trimTailThreshold uint64) error { - if s.filter.rangeLogsTestHook != nil && !s.matchRange.IsEmpty() { - s.filter.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestSync, begin: s.matchRange.First(), end: s.matchRange.Last()} - } - var err error - s.syncRange, err = s.mb.SyncLogIndex(s.ctx) - if err != nil { - return err +// updateChainView updates to the latest view of the underlying chain and sets +// searchRange by replacing MaxUint64 (meaning latest block) with actual head +// number in the specified search range. +// If the session already had an existing chain view and set of matches then +// it also trims part of the match set that a chain reorg might have invalidated. +func (s *searchSession) updateChainView() error { + // update chain view based on current chain head (might be more recent than + // the indexed view of syncRange as the indexer updates it asynchronously + // with some delay + newChainView := s.filter.sys.backend.CurrentView() + if newChainView == nil { + return errors.New("head block not available") } + head := newChainView.HeadNumber() + // update actual search range based on current head number - first := min(s.firstBlock, s.syncRange.HeadNumber) - last := min(s.lastBlock, s.syncRange.HeadNumber) - s.searchRange = common.NewRange(first, last+1-first) - // discard everything that is not needed or might be invalid - trimRange := s.syncRange.ValidBlocks - if trimRange.First() <= trimTailThreshold { - // everything before this point is already known to be valid; if this is - // valid then keep everything before - trimRange.SetFirst(0) + firstBlock, lastBlock := s.firstBlock, s.lastBlock + if firstBlock == math.MaxUint64 { + firstBlock = head } - trimRange = trimRange.Intersection(s.searchRange) - s.trimMatches(trimRange) - if s.filter.rangeLogsTestHook != nil { - if !s.matchRange.IsEmpty() { - s.filter.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestTrimmed, begin: s.matchRange.First(), end: s.matchRange.Last()} - } else { - s.filter.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestTrimmed, begin: 0, end: 0} - } + if lastBlock == math.MaxUint64 { + lastBlock = head } + if firstBlock > lastBlock || lastBlock > head { + return errInvalidBlockRange + } + s.searchRange = common.NewRange(firstBlock, lastBlock+1-firstBlock) + + // Trim existing match set in case a reorg may have invalidated some results + if !s.matchRange.IsEmpty() { + trimRange := newChainView.SharedRange(s.chainView).Intersection(s.searchRange) + s.matchRange, s.matches = s.trimMatches(trimRange, s.matchRange, s.matches) + } + s.chainView = newChainView return nil } -// trimMatches removes any entries from the current set of matches that is outside -// the given range. -func (s *searchSession) trimMatches(trimRange common.Range[uint64]) { - s.matchRange = s.matchRange.Intersection(trimRange) - if s.matchRange.IsEmpty() { - s.matches = nil - return +// trimMatches removes any entries from the specified set of matches that is +// outside the given range. +func (s *searchSession) trimMatches(trimRange, matchRange common.Range[uint64], matches []*types.Log) (common.Range[uint64], []*types.Log) { + newRange := matchRange.Intersection(trimRange) + if newRange == matchRange { + return matchRange, matches } - for len(s.matches) > 0 && s.matches[0].BlockNumber < s.matchRange.First() { - s.matches = s.matches[1:] + if newRange.IsEmpty() { + return newRange, nil } - for len(s.matches) > 0 && s.matches[len(s.matches)-1].BlockNumber > s.matchRange.Last() { - s.matches = s.matches[:len(s.matches)-1] + for len(matches) > 0 && matches[0].BlockNumber < newRange.First() { + matches = matches[1:] } + for len(matches) > 0 && matches[len(matches)-1].BlockNumber > newRange.Last() { + matches = matches[:len(matches)-1] + } + return newRange, matches } // searchInRange performs a single range search, either indexed or unindexed. -func (s *searchSession) searchInRange(r common.Range[uint64], indexed bool) ([]*types.Log, error) { - first, last := r.First(), r.Last() +func (s *searchSession) searchInRange(r common.Range[uint64], indexed bool) (common.Range[uint64], []*types.Log, error) { if indexed { if s.filter.rangeLogsTestHook != nil { - s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestIndexed, first, last} + s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestIndexed, r} } - results, err := s.filter.indexedLogs(s.ctx, s.mb, first, last) - if err != filtermaps.ErrMatchAll { - return results, err + results, err := s.filter.indexedLogs(s.ctx, s.mb, r.First(), r.Last()) + if err != nil && !errors.Is(err, filtermaps.ErrMatchAll) { + return common.Range[uint64]{}, nil, err + } + if err == nil { + // sync with filtermaps matcher + if s.filter.rangeLogsTestHook != nil { + s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestSync, common.Range[uint64]{}} + } + var syncErr error + if s.syncRange, syncErr = s.mb.SyncLogIndex(s.ctx); syncErr != nil { + return common.Range[uint64]{}, nil, syncErr + } + if s.filter.rangeLogsTestHook != nil { + s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestSynced, s.syncRange.ValidBlocks} + } + // discard everything that might be invalid + trimRange := s.syncRange.ValidBlocks.Intersection(s.chainView.SharedRange(s.syncRange.IndexedView)) + matchRange, matches := s.trimMatches(trimRange, r, results) + return matchRange, matches, nil } // "match all" filters are not supported by filtermaps; fall back to // unindexed search which is the most efficient in this case @@ -267,79 +286,85 @@ func (s *searchSession) searchInRange(r common.Range[uint64], indexed bool) ([]* // fall through to unindexed case } if s.filter.rangeLogsTestHook != nil { - s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, first, last} + s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, r} } - return s.filter.unindexedLogs(s.ctx, first, last) + matches, err := s.filter.unindexedLogs(s.ctx, s.chainView, r.First(), r.Last()) + if err != nil { + return common.Range[uint64]{}, nil, err + } + return r, matches, nil } // doSearchIteration performs a search on a range missing from an incomplete set // of results, adds the new section and removes invalidated entries. func (s *searchSession) doSearchIteration() error { switch { - case s.syncRange.IndexedBlocks.IsEmpty(): - // indexer is not ready; fallback to completely unindexed search, do not check valid range - var err error - s.matchRange = s.searchRange - s.matches, err = s.searchInRange(s.searchRange, false) - return err - case s.matchRange.IsEmpty(): // no results yet; try search in entire range indexedSearchRange := s.searchRange.Intersection(s.syncRange.IndexedBlocks) - var err error if s.forceUnindexed = indexedSearchRange.IsEmpty(); !s.forceUnindexed { // indexed search on the intersection of indexed and searched range - s.matchRange = indexedSearchRange - s.matches, err = s.searchInRange(indexedSearchRange, true) + matchRange, matches, err := s.searchInRange(indexedSearchRange, true) if err != nil { return err } - return s.syncMatcher(0) // trim everything that the matcher considers potentially invalid + s.matchRange = matchRange + s.matches = matches + return nil } else { - // no intersection of indexed and searched range; unindexed search on the whole searched range - s.matchRange = s.searchRange - s.matches, err = s.searchInRange(s.searchRange, false) + // no intersection of indexed and searched range; unindexed search on + // the whole searched range + matchRange, matches, err := s.searchInRange(s.searchRange, false) if err != nil { return err } - return s.syncMatcher(math.MaxUint64) // unindexed search is not affected by the tail of valid range + s.matchRange = matchRange + s.matches = matches + return nil } case !s.matchRange.IsEmpty() && s.matchRange.First() > s.searchRange.First(): - // we have results but tail section is missing; do unindexed search for - // the tail part but still allow indexed search for missing head section + // Results are available, but the tail section is missing. Perform an unindexed + // search for the missing tail, while still allowing indexed search for the head. + // + // The unindexed search is necessary because the tail portion of the indexes + // has been pruned. tailRange := common.NewRange(s.searchRange.First(), s.matchRange.First()-s.searchRange.First()) - tailMatches, err := s.searchInRange(tailRange, false) + _, tailMatches, err := s.searchInRange(tailRange, false) if err != nil { return err } s.matches = append(tailMatches, s.matches...) s.matchRange = tailRange.Union(s.matchRange) - return s.syncMatcher(math.MaxUint64) // unindexed search is not affected by the tail of valid range + return nil case !s.matchRange.IsEmpty() && s.matchRange.First() == s.searchRange.First() && s.searchRange.AfterLast() > s.matchRange.AfterLast(): - // we have results but head section is missing + // Results are available, but the head section is missing. Try to perform + // the indexed search for the missing head, or fallback to unindexed search + // if the tail portion of indexed range has been pruned. headRange := common.NewRange(s.matchRange.AfterLast(), s.searchRange.AfterLast()-s.matchRange.AfterLast()) if !s.forceUnindexed { indexedHeadRange := headRange.Intersection(s.syncRange.IndexedBlocks) if !indexedHeadRange.IsEmpty() && indexedHeadRange.First() == headRange.First() { - // indexed head range search is possible headRange = indexedHeadRange } else { + // The tail portion of the indexes has been pruned, falling back + // to unindexed search. s.forceUnindexed = true } } - headMatches, err := s.searchInRange(headRange, !s.forceUnindexed) + headMatchRange, headMatches, err := s.searchInRange(headRange, !s.forceUnindexed) if err != nil { return err } - s.matches = append(s.matches, headMatches...) - s.matchRange = s.matchRange.Union(headRange) - if s.forceUnindexed { - return s.syncMatcher(math.MaxUint64) // unindexed search is not affected by the tail of valid range - } else { - return s.syncMatcher(headRange.First()) // trim if the tail of latest head search results might be invalid + if headMatchRange.First() != s.matchRange.AfterLast() { + // improbable corner case, first part of new head range invalidated by tail unindexing + s.matches, s.matchRange = headMatches, headMatchRange + return nil } + s.matches = append(s.matches, headMatches...) + s.matchRange = s.matchRange.Union(headMatchRange) + return nil default: panic("invalid search session state") @@ -349,7 +374,7 @@ func (s *searchSession) doSearchIteration() error { func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([]*types.Log, error) { if f.rangeLogsTestHook != nil { defer func() { - f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestDone, 0, 0} + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestDone, common.Range[uint64]{}} close(f.rangeLogsTestHook) }() } @@ -366,7 +391,17 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([ } for session.searchRange != session.matchRange { if err := session.doSearchIteration(); err != nil { - return session.matches, err + return nil, err + } + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestResults, session.matchRange} + } + mr := session.matchRange + if err := session.updateChainView(); err != nil { + return nil, err + } + if f.rangeLogsTestHook != nil && session.matchRange != mr { + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestReorg, session.matchRange} } } return session.matches, nil @@ -382,7 +417,7 @@ func (f *Filter) indexedLogs(ctx context.Context, mb filtermaps.MatcherBackend, // unindexedLogs returns the logs matching the filter criteria based on raw block // iteration and bloom matching. -func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) { +func (f *Filter) unindexedLogs(ctx context.Context, chainView *filtermaps.ChainView, begin, end uint64) ([]*types.Log, error) { start := time.Now() log.Debug("Performing unindexed log search", "begin", begin, "end", end) var matches []*types.Log @@ -392,9 +427,14 @@ func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types return matches, ctx.Err() default: } - header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(blockNumber)) - if header == nil || err != nil { - return matches, err + if blockNumber > chainView.HeadNumber() { + // check here so that we can return matches up until head along with + // the error + return matches, errInvalidBlockRange + } + header := chainView.Header(blockNumber) + if header == nil { + return matches, errors.New("header not found") } found, err := f.blockLogs(ctx, header) if err != nil { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index b787f1067b..10e433f09b 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -71,6 +71,7 @@ type Backend interface { SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + CurrentView() *filtermaps.ChainView NewMatcherBackend() filtermaps.MatcherBackend } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 3bb019d105..fa5d4fe897 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -154,6 +154,11 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc return b.chainFeed.Subscribe(ch) } +func (b *testBackend) CurrentView() *filtermaps.ChainView { + head := b.CurrentBlock() + return filtermaps.NewChainView(b, head.Number.Uint64(), head.Hash()) +} + func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend { return b.fm.NewMatcherBackend() } diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 4026c03e89..d6065230f8 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -453,7 +453,8 @@ func TestRangeLogs(t *testing.T) { addresses = []common.Address{{}} ) - expEvent := func(exp rangeLogsTestEvent) { + expEvent := func(expEvent int, expFirst, expAfterLast uint64) { + exp := rangeLogsTestEvent{expEvent, common.NewRange[uint64](expFirst, expAfterLast-expFirst)} event++ ev := <-filter.rangeLogsTestHook if ev != exp { @@ -472,7 +473,6 @@ func TestRangeLogs(t *testing.T) { for range filter.rangeLogsTestHook { } }(filter) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0}) } updateHead := func() { @@ -483,81 +483,122 @@ func TestRangeLogs(t *testing.T) { // test case #1 newFilter(300, 500) - expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 401, 500}) - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 401, 500}) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 401, 500}) - expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 300, 400}) + expEvent(rangeLogsTestIndexed, 401, 501) + expEvent(rangeLogsTestSync, 0, 0) + expEvent(rangeLogsTestSynced, 401, 601) + expEvent(rangeLogsTestResults, 401, 501) + expEvent(rangeLogsTestUnindexed, 300, 401) if _, err := bc.InsertChain(chain[600:700]); err != nil { t.Fatal(err) } updateHead() - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 300, 500}) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 300, 500}) // unindexed search is not affected by trimmed tail - expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0}) + expEvent(rangeLogsTestResults, 300, 501) + expEvent(rangeLogsTestDone, 0, 0) // test case #2 newFilter(400, int64(rpc.LatestBlockNumber)) - expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 501, 700}) + expEvent(rangeLogsTestIndexed, 501, 701) if _, err := bc.InsertChain(chain[700:800]); err != nil { t.Fatal(err) } updateHead() - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 501, 700}) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 601, 698}) - expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 600}) - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 698}) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 698}) - expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 699, 800}) - if err := bc.SetHead(750); err != nil { + expEvent(rangeLogsTestSync, 0, 0) + expEvent(rangeLogsTestSynced, 601, 699) + expEvent(rangeLogsTestResults, 601, 699) + expEvent(rangeLogsTestUnindexed, 400, 601) + expEvent(rangeLogsTestResults, 400, 699) + expEvent(rangeLogsTestIndexed, 699, 801) + if _, err := bc.SetCanonical(chain[749]); err != nil { // set head to block 750 t.Fatal(err) } updateHead() - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 800}) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 748}) - expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 749, 750}) - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 750}) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 750}) - expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0}) + expEvent(rangeLogsTestSync, 0, 0) + expEvent(rangeLogsTestSynced, 601, 749) + expEvent(rangeLogsTestResults, 400, 749) + expEvent(rangeLogsTestIndexed, 749, 751) + expEvent(rangeLogsTestSync, 0, 0) + expEvent(rangeLogsTestSynced, 551, 751) + expEvent(rangeLogsTestResults, 400, 751) + expEvent(rangeLogsTestDone, 0, 0) // test case #3 newFilter(int64(rpc.LatestBlockNumber), int64(rpc.LatestBlockNumber)) - expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750}) - if err := bc.SetHead(740); err != nil { + expEvent(rangeLogsTestIndexed, 750, 751) + if _, err := bc.SetCanonical(chain[739]); err != nil { t.Fatal(err) } updateHead() - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750}) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0}) - expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 740, 740}) + expEvent(rangeLogsTestSync, 0, 0) + expEvent(rangeLogsTestSynced, 551, 739) + expEvent(rangeLogsTestResults, 0, 0) + expEvent(rangeLogsTestIndexed, 740, 741) if _, err := bc.InsertChain(chain[740:750]); err != nil { t.Fatal(err) } updateHead() - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 740, 740}) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0}) - expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750}) - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750}) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 750, 750}) - expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0}) + expEvent(rangeLogsTestSync, 0, 0) + expEvent(rangeLogsTestSynced, 551, 739) + expEvent(rangeLogsTestResults, 0, 0) + expEvent(rangeLogsTestIndexed, 750, 751) + expEvent(rangeLogsTestSync, 0, 0) + expEvent(rangeLogsTestSynced, 551, 751) + expEvent(rangeLogsTestResults, 750, 751) + expEvent(rangeLogsTestDone, 0, 0) // test case #4 + if _, err := bc.SetCanonical(chain[499]); err != nil { + t.Fatal(err) + } + updateHead() newFilter(400, int64(rpc.LatestBlockNumber)) - expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 551, 750}) - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 551, 750}) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 551, 750}) - expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 550}) + expEvent(rangeLogsTestIndexed, 400, 501) + if _, err := bc.InsertChain(chain[500:650]); err != nil { + t.Fatal(err) + } + updateHead() + expEvent(rangeLogsTestSync, 0, 0) + expEvent(rangeLogsTestSynced, 451, 499) + expEvent(rangeLogsTestResults, 451, 499) + expEvent(rangeLogsTestUnindexed, 400, 451) + expEvent(rangeLogsTestResults, 400, 499) + // indexed head extension seems possible + expEvent(rangeLogsTestIndexed, 499, 651) + // further head extension causes tail unindexing in searched range + if _, err := bc.InsertChain(chain[650:750]); err != nil { + t.Fatal(err) + } + updateHead() + expEvent(rangeLogsTestSync, 0, 0) + expEvent(rangeLogsTestSynced, 551, 649) + // tail trimmed to 551; cannot merge with existing results + expEvent(rangeLogsTestResults, 551, 649) + expEvent(rangeLogsTestUnindexed, 400, 551) + expEvent(rangeLogsTestResults, 400, 649) + expEvent(rangeLogsTestIndexed, 649, 751) + expEvent(rangeLogsTestSync, 0, 0) + expEvent(rangeLogsTestSynced, 551, 751) + expEvent(rangeLogsTestResults, 400, 751) + expEvent(rangeLogsTestDone, 0, 0) + + // test case #5 + newFilter(400, int64(rpc.LatestBlockNumber)) + expEvent(rangeLogsTestIndexed, 551, 751) + expEvent(rangeLogsTestSync, 0, 0) + expEvent(rangeLogsTestSynced, 551, 751) + expEvent(rangeLogsTestResults, 551, 751) + expEvent(rangeLogsTestUnindexed, 400, 551) if _, err := bc.InsertChain(chain[750:1000]); err != nil { t.Fatal(err) } updateHead() - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 750}) - // indexed range affected by tail pruning so we have to discard the entire - // match set - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0}) - expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 801, 1000}) - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 801, 1000}) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 801, 1000}) - expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 800}) - expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 1000}) - expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 1000}) + expEvent(rangeLogsTestResults, 400, 751) + // indexed tail already beyond results head; revert to unindexed head search + expEvent(rangeLogsTestUnindexed, 751, 1001) + if _, err := bc.SetCanonical(chain[899]); err != nil { + t.Fatal(err) + } + updateHead() + expEvent(rangeLogsTestResults, 400, 1001) + expEvent(rangeLogsTestReorg, 400, 901) + expEvent(rangeLogsTestDone, 0, 0) } diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 3fbf32e22e..aff051937d 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -619,6 +619,9 @@ func (b testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) func (b testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { panic("implement me") } +func (b testBackend) CurrentView() *filtermaps.ChainView { + panic("implement me") +} func (b testBackend) NewMatcherBackend() filtermaps.MatcherBackend { panic("implement me") } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index c4bf2e0591..e28cb93296 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -95,6 +95,7 @@ type Backend interface { SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription + CurrentView() *filtermaps.ChainView NewMatcherBackend() filtermaps.MatcherBackend } diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index b4d11a3033..9dd6a54729 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -401,6 +401,7 @@ func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) func (b *backendMock) Engine() consensus.Engine { return nil } +func (b *backendMock) CurrentView() *filtermaps.ChainView { return nil } func (b *backendMock) NewMatcherBackend() filtermaps.MatcherBackend { return nil } func (b *backendMock) HistoryPruningCutoff() uint64 { return 0 }