1
0
Fork 0
forked from forks/go-ethereum

eth/filters, core/filtermaps: safe chain view update (#31590)

This PR changes the chain view update mechanism of the log filter.
Previously the head updates were all wired through the indexer, even in
unindexed mode. This was both a bit weird and also unsafe as the
indexer's chain view was updates asynchronously with some delay, making
some log related tests flaky. Also, the reorg safety of the indexed
search was integrated with unindexed search in a weird way, relying on
`syncRange.ValidBlocks` in the unindexed case too, with a special
condition added to only consider the head of the valid range but not the
tail in the unindexed case.

In this PR the current chain view is directly accessible through the
filter backend and unindexed search is also chain view based, making it
inherently safe. The matcher sync mechanism is now only used for indexed
search as originally intended, removing a few ugly special conditions.

The PR is currently based on top of
https://github.com/ethereum/go-ethereum/pull/31642
Together they fix https://github.com/ethereum/go-ethereum/issues/31518
and replace https://github.com/ethereum/go-ethereum/pull/31542

---------

Co-authored-by: Gary Rong <garyrong0905@gmail.com>
This commit is contained in:
Felföldi Zsolt 2025-04-20 09:48:49 +02:00 committed by GitHub
parent bf6da20012
commit 7f574372d5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 327 additions and 201 deletions

View file

@ -939,6 +939,7 @@ var bindTests = []struct {
if _, err := eventer.RaiseSimpleEvent(auth, common.Address{byte(j)}, [32]byte{byte(j)}, true, big.NewInt(int64(10*i+j))); err != nil {
t.Fatalf("block %d, event %d: raise failed: %v", i, j, err)
}
time.Sleep(time.Millisecond * 200)
}
sim.Commit()
}
@ -1495,7 +1496,7 @@ var bindTests = []struct {
if n != 3 {
t.Fatalf("Invalid bar0 event")
}
case <-time.NewTimer(3 * time.Second).C:
case <-time.NewTimer(10 * time.Second).C:
t.Fatalf("Wait bar0 event timeout")
}
@ -1506,7 +1507,7 @@ var bindTests = []struct {
if n != 1 {
t.Fatalf("Invalid bar event")
}
case <-time.NewTimer(3 * time.Second).C:
case <-time.NewTimer(10 * time.Second).C:
t.Fatalf("Wait bar event timeout")
}
close(stopCh)

View file

@ -58,47 +58,75 @@ func NewChainView(chain blockchain, number uint64, hash common.Hash) *ChainView
return cv
}
// getBlockHash returns the block hash belonging to the given block number.
// HeadNumber returns the head block number of the chain view.
func (cv *ChainView) HeadNumber() uint64 {
return cv.headNumber
}
// BlockHash returns the block hash belonging to the given block number.
// Note that the hash of the head block is not returned because ChainView might
// represent a view where the head block is currently being created.
func (cv *ChainView) getBlockHash(number uint64) common.Hash {
if number >= cv.headNumber {
func (cv *ChainView) BlockHash(number uint64) common.Hash {
cv.lock.Lock()
defer cv.lock.Unlock()
if number > cv.headNumber {
panic("invalid block number")
}
return cv.blockHash(number)
}
// getBlockId returns the unique block id belonging to the given block number.
// BlockId returns the unique block id belonging to the given block number.
// Note that it is currently equal to the block hash. In the future it might
// be a different id for future blocks if the log index root becomes part of
// consensus and therefore rendering the index with the new head will happen
// before the hash of that new head is available.
func (cv *ChainView) getBlockId(number uint64) common.Hash {
func (cv *ChainView) BlockId(number uint64) common.Hash {
cv.lock.Lock()
defer cv.lock.Unlock()
if number > cv.headNumber {
panic("invalid block number")
}
return cv.blockHash(number)
}
// getReceipts returns the set of receipts belonging to the block at the given
// Header returns the block header at the given block number.
func (cv *ChainView) Header(number uint64) *types.Header {
return cv.chain.GetHeader(cv.BlockHash(number), number)
}
// Receipts returns the set of receipts belonging to the block at the given
// block number.
func (cv *ChainView) getReceipts(number uint64) types.Receipts {
if number > cv.headNumber {
panic("invalid block number")
}
blockHash := cv.blockHash(number)
func (cv *ChainView) Receipts(number uint64) types.Receipts {
blockHash := cv.BlockHash(number)
if blockHash == (common.Hash{}) {
log.Error("Chain view: block hash unavailable", "number", number, "head", cv.headNumber)
}
return cv.chain.GetReceiptsByHash(blockHash)
}
// SharedRange returns the block range shared by two chain views.
func (cv *ChainView) SharedRange(cv2 *ChainView) common.Range[uint64] {
cv.lock.Lock()
defer cv.lock.Unlock()
if cv == nil || cv2 == nil || !cv.extendNonCanonical() || !cv2.extendNonCanonical() {
return common.Range[uint64]{}
}
var sharedLen uint64
for n := min(cv.headNumber+1-uint64(len(cv.hashes)), cv2.headNumber+1-uint64(len(cv2.hashes))); n <= cv.headNumber && n <= cv2.headNumber && cv.blockHash(n) == cv2.blockHash(n); n++ {
sharedLen = n + 1
}
return common.NewRange(0, sharedLen)
}
// limitedView returns a new chain view that is a truncated version of the parent view.
func (cv *ChainView) limitedView(newHead uint64) *ChainView {
if newHead >= cv.headNumber {
return cv
}
return NewChainView(cv.chain, newHead, cv.blockHash(newHead))
return NewChainView(cv.chain, newHead, cv.BlockHash(newHead))
}
// equalViews returns true if the two chain views are equivalent.
@ -106,7 +134,7 @@ func equalViews(cv1, cv2 *ChainView) bool {
if cv1 == nil || cv2 == nil {
return false
}
return cv1.headNumber == cv2.headNumber && cv1.getBlockId(cv1.headNumber) == cv2.getBlockId(cv2.headNumber)
return cv1.headNumber == cv2.headNumber && cv1.BlockId(cv1.headNumber) == cv2.BlockId(cv2.headNumber)
}
// matchViews returns true if the two chain views are equivalent up until the
@ -120,9 +148,9 @@ func matchViews(cv1, cv2 *ChainView, number uint64) bool {
return false
}
if number == cv1.headNumber || number == cv2.headNumber {
return cv1.getBlockId(number) == cv2.getBlockId(number)
return cv1.BlockId(number) == cv2.BlockId(number)
}
return cv1.getBlockHash(number) == cv2.getBlockHash(number)
return cv1.BlockHash(number) == cv2.BlockHash(number)
}
// extendNonCanonical checks whether the previously known reverse list of head
@ -150,9 +178,6 @@ func (cv *ChainView) extendNonCanonical() bool {
// blockHash returns the given block hash without doing the head number check.
func (cv *ChainView) blockHash(number uint64) common.Hash {
cv.lock.Lock()
defer cv.lock.Unlock()
if number+uint64(len(cv.hashes)) <= cv.headNumber {
hash := cv.chain.GetCanonicalHash(number)
if !cv.extendNonCanonical() {

View file

@ -262,7 +262,7 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
f.targetView = initView
if f.indexedRange.initialized {
f.indexedView = f.initChainView(f.targetView)
f.indexedRange.headIndexed = f.indexedRange.blocks.AfterLast() == f.indexedView.headNumber+1
f.indexedRange.headIndexed = f.indexedRange.blocks.AfterLast() == f.indexedView.HeadNumber()+1
if !f.indexedRange.headIndexed {
f.indexedRange.headDelimiter = 0
}
@ -313,7 +313,7 @@ func (f *FilterMaps) initChainView(chainView *ChainView) *ChainView {
log.Error("Could not initialize indexed chain view", "error", err)
break
}
if lastBlockNumber <= chainView.headNumber && chainView.getBlockId(lastBlockNumber) == lastBlockId {
if lastBlockNumber <= chainView.HeadNumber() && chainView.BlockId(lastBlockNumber) == lastBlockId {
return chainView.limitedView(lastBlockNumber)
}
}
@ -370,7 +370,7 @@ func (f *FilterMaps) init() error {
for min < max {
mid := (min + max + 1) / 2
cp := checkpointList[mid-1]
if cp.BlockNumber <= f.targetView.headNumber && f.targetView.getBlockId(cp.BlockNumber) == cp.BlockId {
if cp.BlockNumber <= f.targetView.HeadNumber() && f.targetView.BlockId(cp.BlockNumber) == cp.BlockId {
min = mid
} else {
max = mid - 1
@ -512,7 +512,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
}
}
// get block receipts
receipts := f.indexedView.getReceipts(firstBlockNumber)
receipts := f.indexedView.Receipts(firstBlockNumber)
if receipts == nil {
return nil, fmt.Errorf("failed to retrieve receipts for block %d containing searched log value index %d: %v", firstBlockNumber, lvIndex, err)
}

View file

@ -44,7 +44,7 @@ func (f *FilterMaps) indexerLoop() {
for !f.stop {
if !f.indexedRange.initialized {
if f.targetView.headNumber == 0 {
if f.targetView.HeadNumber() == 0 {
// initialize when chain head is available
f.processSingleEvent(true)
continue
@ -249,7 +249,7 @@ func (f *FilterMaps) tryIndexHead() error {
log.Info("Log index head rendering in progress",
"first block", f.indexedRange.blocks.First(), "last block", f.indexedRange.blocks.Last(),
"processed", f.indexedRange.blocks.AfterLast()-f.ptrHeadIndex,
"remaining", f.indexedView.headNumber-f.indexedRange.blocks.Last(),
"remaining", f.indexedView.HeadNumber()-f.indexedRange.blocks.Last(),
"elapsed", common.PrettyDuration(time.Since(f.startedHeadIndexAt)))
f.loggedHeadIndex = true
f.lastLogHeadIndex = time.Now()
@ -418,10 +418,10 @@ func (f *FilterMaps) needTailEpoch(epoch uint32) bool {
// tailTargetBlock returns the target value for the tail block number according
// to the log history parameter and the current index head.
func (f *FilterMaps) tailTargetBlock() uint64 {
if f.history == 0 || f.indexedView.headNumber < f.history {
if f.history == 0 || f.indexedView.HeadNumber() < f.history {
return 0
}
return f.indexedView.headNumber + 1 - f.history
return f.indexedView.HeadNumber() + 1 - f.history
}
// tailPartialBlocks returns the number of rendered blocks in the partially

View file

@ -143,8 +143,8 @@ func (f *FilterMaps) lastCanonicalSnapshotOfMap(mapIndex uint32) *renderedMap {
var best *renderedMap
for _, blockNumber := range f.renderSnapshots.Keys() {
if cp, _ := f.renderSnapshots.Get(blockNumber); cp != nil && blockNumber < f.indexedRange.blocks.AfterLast() &&
blockNumber <= f.indexedView.headNumber && f.indexedView.getBlockId(blockNumber) == cp.lastBlockId &&
blockNumber <= f.targetView.headNumber && f.targetView.getBlockId(blockNumber) == cp.lastBlockId &&
blockNumber <= f.indexedView.HeadNumber() && f.indexedView.BlockId(blockNumber) == cp.lastBlockId &&
blockNumber <= f.targetView.HeadNumber() && f.targetView.BlockId(blockNumber) == cp.lastBlockId &&
cp.mapIndex == mapIndex && (best == nil || blockNumber > best.lastBlock) {
best = cp
}
@ -173,7 +173,7 @@ func (f *FilterMaps) lastCanonicalMapBoundaryBefore(renderBefore uint32) (nextMa
return 0, 0, 0, fmt.Errorf("failed to retrieve last block of reverse iterated map %d: %v", mapIndex, err)
}
if (f.indexedRange.headIndexed && mapIndex >= f.indexedRange.maps.Last()) ||
lastBlock >= f.targetView.headNumber || lastBlockId != f.targetView.getBlockId(lastBlock) {
lastBlock >= f.targetView.HeadNumber() || lastBlockId != f.targetView.BlockId(lastBlock) {
continue // map is not full or inconsistent with targetView; roll back
}
lvPtr, err := f.getBlockLvPointer(lastBlock)
@ -247,7 +247,7 @@ func (f *FilterMaps) loadHeadSnapshot() error {
filterMap: fm,
mapIndex: f.indexedRange.maps.Last(),
lastBlock: f.indexedRange.blocks.Last(),
lastBlockId: f.indexedView.getBlockId(f.indexedRange.blocks.Last()),
lastBlockId: f.indexedView.BlockId(f.indexedRange.blocks.Last()),
blockLvPtrs: lvPtrs,
finished: true,
headDelimiter: f.indexedRange.headDelimiter,
@ -264,7 +264,7 @@ func (r *mapRenderer) makeSnapshot() {
filterMap: r.currentMap.filterMap.fastCopy(),
mapIndex: r.currentMap.mapIndex,
lastBlock: r.currentMap.lastBlock,
lastBlockId: r.iterator.chainView.getBlockId(r.currentMap.lastBlock),
lastBlockId: r.iterator.chainView.BlockId(r.currentMap.lastBlock),
blockLvPtrs: r.currentMap.blockLvPtrs,
finished: true,
headDelimiter: r.iterator.lvIndex,
@ -370,7 +370,7 @@ func (r *mapRenderer) renderCurrentMap(stopCb func() bool) (bool, error) {
r.currentMap.finished = true
r.currentMap.headDelimiter = r.iterator.lvIndex
}
r.currentMap.lastBlockId = r.f.targetView.getBlockId(r.currentMap.lastBlock)
r.currentMap.lastBlockId = r.f.targetView.BlockId(r.currentMap.lastBlock)
totalTime += time.Since(start)
mapRenderTimer.Update(totalTime)
mapLogValueMeter.Mark(logValuesProcessed)
@ -566,8 +566,8 @@ func (r *mapRenderer) getUpdatedRange() (filterMapsRange, error) {
lm := r.finishedMaps[r.finished.Last()]
newRange.headIndexed = lm.finished
if lm.finished {
newRange.blocks.SetLast(r.f.targetView.headNumber)
if lm.lastBlock != r.f.targetView.headNumber {
newRange.blocks.SetLast(r.f.targetView.HeadNumber())
if lm.lastBlock != r.f.targetView.HeadNumber() {
panic("map rendering finished but last block != head block")
}
newRange.headDelimiter = lm.headDelimiter
@ -665,13 +665,13 @@ var errUnindexedRange = errors.New("unindexed range")
// given block's first log value entry (the block delimiter), according to the
// current targetView.
func (f *FilterMaps) newLogIteratorFromBlockDelimiter(blockNumber, lvIndex uint64) (*logIterator, error) {
if blockNumber > f.targetView.headNumber {
return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", blockNumber, f.targetView.headNumber)
if blockNumber > f.targetView.HeadNumber() {
return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", blockNumber, f.targetView.HeadNumber())
}
if !f.indexedRange.blocks.Includes(blockNumber) {
return nil, errUnindexedRange
}
finished := blockNumber == f.targetView.headNumber
finished := blockNumber == f.targetView.HeadNumber()
l := &logIterator{
chainView: f.targetView,
params: &f.Params,
@ -687,11 +687,11 @@ func (f *FilterMaps) newLogIteratorFromBlockDelimiter(blockNumber, lvIndex uint6
// newLogIteratorFromMapBoundary creates a logIterator starting at the given
// map boundary, according to the current targetView.
func (f *FilterMaps) newLogIteratorFromMapBoundary(mapIndex uint32, startBlock, startLvPtr uint64) (*logIterator, error) {
if startBlock > f.targetView.headNumber {
return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", startBlock, f.targetView.headNumber)
if startBlock > f.targetView.HeadNumber() {
return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", startBlock, f.targetView.HeadNumber())
}
// get block receipts
receipts := f.targetView.getReceipts(startBlock)
receipts := f.targetView.Receipts(startBlock)
if receipts == nil {
return nil, fmt.Errorf("receipts not found for start block %d", startBlock)
}
@ -758,7 +758,7 @@ func (l *logIterator) next() error {
if l.delimiter {
l.delimiter = false
l.blockNumber++
l.receipts = l.chainView.getReceipts(l.blockNumber)
l.receipts = l.chainView.Receipts(l.blockNumber)
if l.receipts == nil {
return fmt.Errorf("receipts not found for block %d", l.blockNumber)
}
@ -795,7 +795,7 @@ func (l *logIterator) enforceValidState() {
}
l.logIndex = 0
}
if l.blockNumber == l.chainView.headNumber {
if l.blockNumber == l.chainView.HeadNumber() {
l.finished = true
} else {
l.delimiter = true

View file

@ -57,7 +57,7 @@ type MatcherBackend interface {
// all states of the chain since the previous SyncLogIndex or the creation of
// the matcher backend.
type SyncRange struct {
HeadNumber uint64
IndexedView *ChainView
// block range where the index has not changed since the last matcher sync
// and therefore the set of matches found in this region is guaranteed to
// be valid and complete.

View file

@ -128,7 +128,7 @@ func (fm *FilterMapsMatcherBackend) synced() {
indexedBlocks.SetAfterLast(indexedBlocks.Last()) // remove partially indexed last block
}
fm.syncCh <- SyncRange{
HeadNumber: fm.f.targetView.headNumber,
IndexedView: fm.f.indexedView,
ValidBlocks: fm.validBlocks,
IndexedBlocks: indexedBlocks,
}
@ -154,7 +154,7 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
case <-ctx.Done():
return SyncRange{}, ctx.Err()
case <-fm.f.disabledCh:
return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil
return SyncRange{IndexedView: fm.f.indexedView}, nil
}
select {
case vr := <-syncCh:
@ -162,7 +162,7 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
case <-ctx.Done():
return SyncRange{}, ctx.Err()
case <-fm.f.disabledCh:
return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil
return SyncRange{IndexedView: fm.f.indexedView}, nil
}
}

View file

@ -443,6 +443,14 @@ func (b *EthAPIBackend) RPCTxFeeCap() float64 {
return b.eth.config.RPCTxFeeCap
}
func (b *EthAPIBackend) CurrentView() *filtermaps.ChainView {
head := b.eth.blockchain.CurrentBlock()
if head == nil {
return nil
}
return filtermaps.NewChainView(b.eth.blockchain, head.Number.Uint64(), head.Hash())
}
func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend {
return b.eth.filterMaps.NewMatcherBackend()
}

View file

@ -146,25 +146,29 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
const (
rangeLogsTestSync = iota
rangeLogsTestTrimmed
rangeLogsTestIndexed
rangeLogsTestUnindexed
rangeLogsTestDone
rangeLogsTestDone = iota // zero range
rangeLogsTestSync // before sync; zero range
rangeLogsTestSynced // after sync; valid blocks range
rangeLogsTestIndexed // individual search range
rangeLogsTestUnindexed // individual search range
rangeLogsTestResults // results range after search iteration
rangeLogsTestReorg // results range trimmed by reorg
)
type rangeLogsTestEvent struct {
event int
begin, end uint64
event int
blocks common.Range[uint64]
}
// searchSession represents a single search session.
type searchSession struct {
ctx context.Context
filter *Filter
mb filtermaps.MatcherBackend
syncRange filtermaps.SyncRange // latest synchronized state with the matcher
firstBlock, lastBlock uint64 // specified search range; each can be MaxUint64
ctx context.Context
filter *Filter
mb filtermaps.MatcherBackend
syncRange filtermaps.SyncRange // latest synchronized state with the matcher
chainView *filtermaps.ChainView // can be more recent than the indexed view in syncRange
// block ranges always refer to the current chainView
firstBlock, lastBlock uint64 // specified search range; MaxUint64 means latest block
searchRange common.Range[uint64] // actual search range; end trimmed to latest head
matchRange common.Range[uint64] // range in which we have results (subset of searchRange)
matches []*types.Log // valid set of matches in matchRange
@ -182,84 +186,99 @@ func newSearchSession(ctx context.Context, filter *Filter, mb filtermaps.Matcher
}
// enforce a consistent state before starting the search in order to be able
// to determine valid range later
if err := s.syncMatcher(0); err != nil {
var err error
s.syncRange, err = s.mb.SyncLogIndex(s.ctx)
if err != nil {
return nil, err
}
if err := s.updateChainView(); err != nil {
return nil, err
}
return s, nil
}
// syncMatcher performs a synchronization step with the matcher. The resulting
// syncRange structure holds information about the latest range of indexed blocks
// and the guaranteed valid blocks whose log index have not been changed since
// the previous synchronization.
// The function also performs trimming of the match set in order to always keep
// it consistent with the synced matcher state.
// Tail trimming is only performed if the first block of the valid log index range
// is higher than trimTailThreshold. This is useful because unindexed log search
// is not affected by the valid tail (on the other hand, valid head is taken into
// account in order to provide reorg safety, even though the log index is not used).
// In case of indexed search the tail is only trimmed if the first part of the
// recently obtained results might be invalid. If guaranteed valid new results
// have been added at the head of previously validated results then there is no
// need to discard those even if the index tail have been unindexed since that.
func (s *searchSession) syncMatcher(trimTailThreshold uint64) error {
if s.filter.rangeLogsTestHook != nil && !s.matchRange.IsEmpty() {
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestSync, begin: s.matchRange.First(), end: s.matchRange.Last()}
}
var err error
s.syncRange, err = s.mb.SyncLogIndex(s.ctx)
if err != nil {
return err
// updateChainView updates to the latest view of the underlying chain and sets
// searchRange by replacing MaxUint64 (meaning latest block) with actual head
// number in the specified search range.
// If the session already had an existing chain view and set of matches then
// it also trims part of the match set that a chain reorg might have invalidated.
func (s *searchSession) updateChainView() error {
// update chain view based on current chain head (might be more recent than
// the indexed view of syncRange as the indexer updates it asynchronously
// with some delay
newChainView := s.filter.sys.backend.CurrentView()
if newChainView == nil {
return errors.New("head block not available")
}
head := newChainView.HeadNumber()
// update actual search range based on current head number
first := min(s.firstBlock, s.syncRange.HeadNumber)
last := min(s.lastBlock, s.syncRange.HeadNumber)
s.searchRange = common.NewRange(first, last+1-first)
// discard everything that is not needed or might be invalid
trimRange := s.syncRange.ValidBlocks
if trimRange.First() <= trimTailThreshold {
// everything before this point is already known to be valid; if this is
// valid then keep everything before
trimRange.SetFirst(0)
firstBlock, lastBlock := s.firstBlock, s.lastBlock
if firstBlock == math.MaxUint64 {
firstBlock = head
}
trimRange = trimRange.Intersection(s.searchRange)
s.trimMatches(trimRange)
if s.filter.rangeLogsTestHook != nil {
if !s.matchRange.IsEmpty() {
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestTrimmed, begin: s.matchRange.First(), end: s.matchRange.Last()}
} else {
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestTrimmed, begin: 0, end: 0}
}
if lastBlock == math.MaxUint64 {
lastBlock = head
}
if firstBlock > lastBlock || lastBlock > head {
return errInvalidBlockRange
}
s.searchRange = common.NewRange(firstBlock, lastBlock+1-firstBlock)
// Trim existing match set in case a reorg may have invalidated some results
if !s.matchRange.IsEmpty() {
trimRange := newChainView.SharedRange(s.chainView).Intersection(s.searchRange)
s.matchRange, s.matches = s.trimMatches(trimRange, s.matchRange, s.matches)
}
s.chainView = newChainView
return nil
}
// trimMatches removes any entries from the current set of matches that is outside
// the given range.
func (s *searchSession) trimMatches(trimRange common.Range[uint64]) {
s.matchRange = s.matchRange.Intersection(trimRange)
if s.matchRange.IsEmpty() {
s.matches = nil
return
// trimMatches removes any entries from the specified set of matches that is
// outside the given range.
func (s *searchSession) trimMatches(trimRange, matchRange common.Range[uint64], matches []*types.Log) (common.Range[uint64], []*types.Log) {
newRange := matchRange.Intersection(trimRange)
if newRange == matchRange {
return matchRange, matches
}
for len(s.matches) > 0 && s.matches[0].BlockNumber < s.matchRange.First() {
s.matches = s.matches[1:]
if newRange.IsEmpty() {
return newRange, nil
}
for len(s.matches) > 0 && s.matches[len(s.matches)-1].BlockNumber > s.matchRange.Last() {
s.matches = s.matches[:len(s.matches)-1]
for len(matches) > 0 && matches[0].BlockNumber < newRange.First() {
matches = matches[1:]
}
for len(matches) > 0 && matches[len(matches)-1].BlockNumber > newRange.Last() {
matches = matches[:len(matches)-1]
}
return newRange, matches
}
// searchInRange performs a single range search, either indexed or unindexed.
func (s *searchSession) searchInRange(r common.Range[uint64], indexed bool) ([]*types.Log, error) {
first, last := r.First(), r.Last()
func (s *searchSession) searchInRange(r common.Range[uint64], indexed bool) (common.Range[uint64], []*types.Log, error) {
if indexed {
if s.filter.rangeLogsTestHook != nil {
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestIndexed, first, last}
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestIndexed, r}
}
results, err := s.filter.indexedLogs(s.ctx, s.mb, first, last)
if err != filtermaps.ErrMatchAll {
return results, err
results, err := s.filter.indexedLogs(s.ctx, s.mb, r.First(), r.Last())
if err != nil && !errors.Is(err, filtermaps.ErrMatchAll) {
return common.Range[uint64]{}, nil, err
}
if err == nil {
// sync with filtermaps matcher
if s.filter.rangeLogsTestHook != nil {
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestSync, common.Range[uint64]{}}
}
var syncErr error
if s.syncRange, syncErr = s.mb.SyncLogIndex(s.ctx); syncErr != nil {
return common.Range[uint64]{}, nil, syncErr
}
if s.filter.rangeLogsTestHook != nil {
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestSynced, s.syncRange.ValidBlocks}
}
// discard everything that might be invalid
trimRange := s.syncRange.ValidBlocks.Intersection(s.chainView.SharedRange(s.syncRange.IndexedView))
matchRange, matches := s.trimMatches(trimRange, r, results)
return matchRange, matches, nil
}
// "match all" filters are not supported by filtermaps; fall back to
// unindexed search which is the most efficient in this case
@ -267,79 +286,85 @@ func (s *searchSession) searchInRange(r common.Range[uint64], indexed bool) ([]*
// fall through to unindexed case
}
if s.filter.rangeLogsTestHook != nil {
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, first, last}
s.filter.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, r}
}
return s.filter.unindexedLogs(s.ctx, first, last)
matches, err := s.filter.unindexedLogs(s.ctx, s.chainView, r.First(), r.Last())
if err != nil {
return common.Range[uint64]{}, nil, err
}
return r, matches, nil
}
// doSearchIteration performs a search on a range missing from an incomplete set
// of results, adds the new section and removes invalidated entries.
func (s *searchSession) doSearchIteration() error {
switch {
case s.syncRange.IndexedBlocks.IsEmpty():
// indexer is not ready; fallback to completely unindexed search, do not check valid range
var err error
s.matchRange = s.searchRange
s.matches, err = s.searchInRange(s.searchRange, false)
return err
case s.matchRange.IsEmpty():
// no results yet; try search in entire range
indexedSearchRange := s.searchRange.Intersection(s.syncRange.IndexedBlocks)
var err error
if s.forceUnindexed = indexedSearchRange.IsEmpty(); !s.forceUnindexed {
// indexed search on the intersection of indexed and searched range
s.matchRange = indexedSearchRange
s.matches, err = s.searchInRange(indexedSearchRange, true)
matchRange, matches, err := s.searchInRange(indexedSearchRange, true)
if err != nil {
return err
}
return s.syncMatcher(0) // trim everything that the matcher considers potentially invalid
s.matchRange = matchRange
s.matches = matches
return nil
} else {
// no intersection of indexed and searched range; unindexed search on the whole searched range
s.matchRange = s.searchRange
s.matches, err = s.searchInRange(s.searchRange, false)
// no intersection of indexed and searched range; unindexed search on
// the whole searched range
matchRange, matches, err := s.searchInRange(s.searchRange, false)
if err != nil {
return err
}
return s.syncMatcher(math.MaxUint64) // unindexed search is not affected by the tail of valid range
s.matchRange = matchRange
s.matches = matches
return nil
}
case !s.matchRange.IsEmpty() && s.matchRange.First() > s.searchRange.First():
// we have results but tail section is missing; do unindexed search for
// the tail part but still allow indexed search for missing head section
// Results are available, but the tail section is missing. Perform an unindexed
// search for the missing tail, while still allowing indexed search for the head.
//
// The unindexed search is necessary because the tail portion of the indexes
// has been pruned.
tailRange := common.NewRange(s.searchRange.First(), s.matchRange.First()-s.searchRange.First())
tailMatches, err := s.searchInRange(tailRange, false)
_, tailMatches, err := s.searchInRange(tailRange, false)
if err != nil {
return err
}
s.matches = append(tailMatches, s.matches...)
s.matchRange = tailRange.Union(s.matchRange)
return s.syncMatcher(math.MaxUint64) // unindexed search is not affected by the tail of valid range
return nil
case !s.matchRange.IsEmpty() && s.matchRange.First() == s.searchRange.First() && s.searchRange.AfterLast() > s.matchRange.AfterLast():
// we have results but head section is missing
// Results are available, but the head section is missing. Try to perform
// the indexed search for the missing head, or fallback to unindexed search
// if the tail portion of indexed range has been pruned.
headRange := common.NewRange(s.matchRange.AfterLast(), s.searchRange.AfterLast()-s.matchRange.AfterLast())
if !s.forceUnindexed {
indexedHeadRange := headRange.Intersection(s.syncRange.IndexedBlocks)
if !indexedHeadRange.IsEmpty() && indexedHeadRange.First() == headRange.First() {
// indexed head range search is possible
headRange = indexedHeadRange
} else {
// The tail portion of the indexes has been pruned, falling back
// to unindexed search.
s.forceUnindexed = true
}
}
headMatches, err := s.searchInRange(headRange, !s.forceUnindexed)
headMatchRange, headMatches, err := s.searchInRange(headRange, !s.forceUnindexed)
if err != nil {
return err
}
s.matches = append(s.matches, headMatches...)
s.matchRange = s.matchRange.Union(headRange)
if s.forceUnindexed {
return s.syncMatcher(math.MaxUint64) // unindexed search is not affected by the tail of valid range
} else {
return s.syncMatcher(headRange.First()) // trim if the tail of latest head search results might be invalid
if headMatchRange.First() != s.matchRange.AfterLast() {
// improbable corner case, first part of new head range invalidated by tail unindexing
s.matches, s.matchRange = headMatches, headMatchRange
return nil
}
s.matches = append(s.matches, headMatches...)
s.matchRange = s.matchRange.Union(headMatchRange)
return nil
default:
panic("invalid search session state")
@ -349,7 +374,7 @@ func (s *searchSession) doSearchIteration() error {
func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([]*types.Log, error) {
if f.rangeLogsTestHook != nil {
defer func() {
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestDone, 0, 0}
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestDone, common.Range[uint64]{}}
close(f.rangeLogsTestHook)
}()
}
@ -366,7 +391,17 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([
}
for session.searchRange != session.matchRange {
if err := session.doSearchIteration(); err != nil {
return session.matches, err
return nil, err
}
if f.rangeLogsTestHook != nil {
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestResults, session.matchRange}
}
mr := session.matchRange
if err := session.updateChainView(); err != nil {
return nil, err
}
if f.rangeLogsTestHook != nil && session.matchRange != mr {
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestReorg, session.matchRange}
}
}
return session.matches, nil
@ -382,7 +417,7 @@ func (f *Filter) indexedLogs(ctx context.Context, mb filtermaps.MatcherBackend,
// unindexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) {
func (f *Filter) unindexedLogs(ctx context.Context, chainView *filtermaps.ChainView, begin, end uint64) ([]*types.Log, error) {
start := time.Now()
log.Debug("Performing unindexed log search", "begin", begin, "end", end)
var matches []*types.Log
@ -392,9 +427,14 @@ func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types
return matches, ctx.Err()
default:
}
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(blockNumber))
if header == nil || err != nil {
return matches, err
if blockNumber > chainView.HeadNumber() {
// check here so that we can return matches up until head along with
// the error
return matches, errInvalidBlockRange
}
header := chainView.Header(blockNumber)
if header == nil {
return matches, errors.New("header not found")
}
found, err := f.blockLogs(ctx, header)
if err != nil {

View file

@ -71,6 +71,7 @@ type Backend interface {
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
CurrentView() *filtermaps.ChainView
NewMatcherBackend() filtermaps.MatcherBackend
}

View file

@ -154,6 +154,11 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}
func (b *testBackend) CurrentView() *filtermaps.ChainView {
head := b.CurrentBlock()
return filtermaps.NewChainView(b, head.Number.Uint64(), head.Hash())
}
func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend {
return b.fm.NewMatcherBackend()
}

View file

@ -453,7 +453,8 @@ func TestRangeLogs(t *testing.T) {
addresses = []common.Address{{}}
)
expEvent := func(exp rangeLogsTestEvent) {
expEvent := func(expEvent int, expFirst, expAfterLast uint64) {
exp := rangeLogsTestEvent{expEvent, common.NewRange[uint64](expFirst, expAfterLast-expFirst)}
event++
ev := <-filter.rangeLogsTestHook
if ev != exp {
@ -472,7 +473,6 @@ func TestRangeLogs(t *testing.T) {
for range filter.rangeLogsTestHook {
}
}(filter)
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0})
}
updateHead := func() {
@ -483,81 +483,122 @@ func TestRangeLogs(t *testing.T) {
// test case #1
newFilter(300, 500)
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 401, 500})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 401, 500})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 401, 500})
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 300, 400})
expEvent(rangeLogsTestIndexed, 401, 501)
expEvent(rangeLogsTestSync, 0, 0)
expEvent(rangeLogsTestSynced, 401, 601)
expEvent(rangeLogsTestResults, 401, 501)
expEvent(rangeLogsTestUnindexed, 300, 401)
if _, err := bc.InsertChain(chain[600:700]); err != nil {
t.Fatal(err)
}
updateHead()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 300, 500})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 300, 500}) // unindexed search is not affected by trimmed tail
expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0})
expEvent(rangeLogsTestResults, 300, 501)
expEvent(rangeLogsTestDone, 0, 0)
// test case #2
newFilter(400, int64(rpc.LatestBlockNumber))
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 501, 700})
expEvent(rangeLogsTestIndexed, 501, 701)
if _, err := bc.InsertChain(chain[700:800]); err != nil {
t.Fatal(err)
}
updateHead()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 501, 700})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 601, 698})
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 600})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 698})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 698})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 699, 800})
if err := bc.SetHead(750); err != nil {
expEvent(rangeLogsTestSync, 0, 0)
expEvent(rangeLogsTestSynced, 601, 699)
expEvent(rangeLogsTestResults, 601, 699)
expEvent(rangeLogsTestUnindexed, 400, 601)
expEvent(rangeLogsTestResults, 400, 699)
expEvent(rangeLogsTestIndexed, 699, 801)
if _, err := bc.SetCanonical(chain[749]); err != nil { // set head to block 750
t.Fatal(err)
}
updateHead()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 800})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 748})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 749, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0})
expEvent(rangeLogsTestSync, 0, 0)
expEvent(rangeLogsTestSynced, 601, 749)
expEvent(rangeLogsTestResults, 400, 749)
expEvent(rangeLogsTestIndexed, 749, 751)
expEvent(rangeLogsTestSync, 0, 0)
expEvent(rangeLogsTestSynced, 551, 751)
expEvent(rangeLogsTestResults, 400, 751)
expEvent(rangeLogsTestDone, 0, 0)
// test case #3
newFilter(int64(rpc.LatestBlockNumber), int64(rpc.LatestBlockNumber))
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750})
if err := bc.SetHead(740); err != nil {
expEvent(rangeLogsTestIndexed, 750, 751)
if _, err := bc.SetCanonical(chain[739]); err != nil {
t.Fatal(err)
}
updateHead()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 740, 740})
expEvent(rangeLogsTestSync, 0, 0)
expEvent(rangeLogsTestSynced, 551, 739)
expEvent(rangeLogsTestResults, 0, 0)
expEvent(rangeLogsTestIndexed, 740, 741)
if _, err := bc.InsertChain(chain[740:750]); err != nil {
t.Fatal(err)
}
updateHead()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 740, 740})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 750, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0})
expEvent(rangeLogsTestSync, 0, 0)
expEvent(rangeLogsTestSynced, 551, 739)
expEvent(rangeLogsTestResults, 0, 0)
expEvent(rangeLogsTestIndexed, 750, 751)
expEvent(rangeLogsTestSync, 0, 0)
expEvent(rangeLogsTestSynced, 551, 751)
expEvent(rangeLogsTestResults, 750, 751)
expEvent(rangeLogsTestDone, 0, 0)
// test case #4
if _, err := bc.SetCanonical(chain[499]); err != nil {
t.Fatal(err)
}
updateHead()
newFilter(400, int64(rpc.LatestBlockNumber))
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 551, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 551, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 551, 750})
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 550})
expEvent(rangeLogsTestIndexed, 400, 501)
if _, err := bc.InsertChain(chain[500:650]); err != nil {
t.Fatal(err)
}
updateHead()
expEvent(rangeLogsTestSync, 0, 0)
expEvent(rangeLogsTestSynced, 451, 499)
expEvent(rangeLogsTestResults, 451, 499)
expEvent(rangeLogsTestUnindexed, 400, 451)
expEvent(rangeLogsTestResults, 400, 499)
// indexed head extension seems possible
expEvent(rangeLogsTestIndexed, 499, 651)
// further head extension causes tail unindexing in searched range
if _, err := bc.InsertChain(chain[650:750]); err != nil {
t.Fatal(err)
}
updateHead()
expEvent(rangeLogsTestSync, 0, 0)
expEvent(rangeLogsTestSynced, 551, 649)
// tail trimmed to 551; cannot merge with existing results
expEvent(rangeLogsTestResults, 551, 649)
expEvent(rangeLogsTestUnindexed, 400, 551)
expEvent(rangeLogsTestResults, 400, 649)
expEvent(rangeLogsTestIndexed, 649, 751)
expEvent(rangeLogsTestSync, 0, 0)
expEvent(rangeLogsTestSynced, 551, 751)
expEvent(rangeLogsTestResults, 400, 751)
expEvent(rangeLogsTestDone, 0, 0)
// test case #5
newFilter(400, int64(rpc.LatestBlockNumber))
expEvent(rangeLogsTestIndexed, 551, 751)
expEvent(rangeLogsTestSync, 0, 0)
expEvent(rangeLogsTestSynced, 551, 751)
expEvent(rangeLogsTestResults, 551, 751)
expEvent(rangeLogsTestUnindexed, 400, 551)
if _, err := bc.InsertChain(chain[750:1000]); err != nil {
t.Fatal(err)
}
updateHead()
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 750})
// indexed range affected by tail pruning so we have to discard the entire
// match set
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0})
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 801, 1000})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 801, 1000})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 801, 1000})
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 800})
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 1000})
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 1000})
expEvent(rangeLogsTestResults, 400, 751)
// indexed tail already beyond results head; revert to unindexed head search
expEvent(rangeLogsTestUnindexed, 751, 1001)
if _, err := bc.SetCanonical(chain[899]); err != nil {
t.Fatal(err)
}
updateHead()
expEvent(rangeLogsTestResults, 400, 1001)
expEvent(rangeLogsTestReorg, 400, 901)
expEvent(rangeLogsTestDone, 0, 0)
}

View file

@ -619,6 +619,9 @@ func (b testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
func (b testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
panic("implement me")
}
func (b testBackend) CurrentView() *filtermaps.ChainView {
panic("implement me")
}
func (b testBackend) NewMatcherBackend() filtermaps.MatcherBackend {
panic("implement me")
}

View file

@ -95,6 +95,7 @@ type Backend interface {
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
CurrentView() *filtermaps.ChainView
NewMatcherBackend() filtermaps.MatcherBackend
}

View file

@ -401,6 +401,7 @@ func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
func (b *backendMock) Engine() consensus.Engine { return nil }
func (b *backendMock) CurrentView() *filtermaps.ChainView { return nil }
func (b *backendMock) NewMatcherBackend() filtermaps.MatcherBackend { return nil }
func (b *backendMock) HistoryPruningCutoff() uint64 { return 0 }