diff --git a/eth/filters/api.go b/eth/filters/api.go index b607bdcef1..2edb064f38 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -164,18 +164,21 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID { for { select { case pTx := <-pendingTxs: + var drop bool api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { f.txs = append(f.txs, pTx...) - // Cap the queue: drop oldest items when the limit is exceeded + // Cap the queue: uninstall filter when the limit is exceeded // to prevent unbounded memory growth if the client polls infrequently. if max := api.maxPendingItems; max > 0 && len(f.txs) > max { - log.Debug("Pending tx filter queue overflow, dropping oldest items", - "id", pendingTxSub.ID, "dropped", len(f.txs)-max) - f.txs = f.txs[len(f.txs)-max:] + drop = true } } api.filtersMu.Unlock() + if drop { + log.Debug("Pending tx filter queue overflow, uninstalling filter", "id", pendingTxSub.ID) + api.UninstallFilter(pendingTxSub.ID) + } case <-pendingTxSub.Err(): api.filtersMu.Lock() delete(api.filters, pendingTxSub.ID) @@ -246,17 +249,20 @@ func (api *FilterAPI) NewBlockFilter() rpc.ID { for { select { case h := <-headers: + var drop bool api.filtersMu.Lock() if f, found := api.filters[headerSub.ID]; found { f.hashes = append(f.hashes, h.Hash()) - // Cap the queue: drop oldest block hashes when the limit is exceeded. + // Cap the queue: uninstall filter when the limit is exceeded. if max := api.maxPendingItems; max > 0 && len(f.hashes) > max { - log.Debug("Block filter queue overflow, dropping oldest items", - "id", headerSub.ID, "dropped", len(f.hashes)-max) - f.hashes = f.hashes[len(f.hashes)-max:] + drop = true } } api.filtersMu.Unlock() + if drop { + log.Debug("Block filter queue overflow, uninstalling filter", "id", headerSub.ID) + api.UninstallFilter(headerSub.ID) + } case <-headerSub.Err(): api.filtersMu.Lock() delete(api.filters, headerSub.ID) @@ -438,18 +444,21 @@ func (api *FilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { for { select { case l := <-logs: + var drop bool api.filtersMu.Lock() if f, found := api.filters[logsSub.ID]; found { f.logs = append(f.logs, l...) - // Cap the queue: drop oldest logs when the limit is exceeded + // Cap the queue: uninstall filter when the limit is exceeded // to prevent unbounded memory growth if the client polls infrequently. if max := api.maxPendingItems; max > 0 && len(f.logs) > max { - log.Debug("Log filter queue overflow, dropping oldest items", - "id", logsSub.ID, "dropped", len(f.logs)-max) - f.logs = f.logs[len(f.logs)-max:] + drop = true } } api.filtersMu.Unlock() + if drop { + log.Debug("Log filter queue overflow, uninstalling filter", "id", logsSub.ID) + api.UninstallFilter(logsSub.ID) + } case <-logsSub.Err(): api.filtersMu.Lock() delete(api.filters, logsSub.ID) diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index e6bd17095c..510a26ff32 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -924,7 +924,7 @@ func TestTransactionReceiptsSubscription(t *testing.T) { } // TestPendingTxFilterQueueCap verifies that the pending transaction polling filter -// drops the oldest transactions when the queue exceeds MaxPendingItems, preventing +// uninstalls the filter when the queue exceeds MaxPendingItems, preventing // unbounded memory growth when the client polls infrequently. func TestPendingTxFilterQueueCap(t *testing.T) { t.Parallel() @@ -953,33 +953,27 @@ func TestPendingTxFilterQueueCap(t *testing.T) { backend.txFeed.Send(core.NewTxsEvent{Txs: batch1}) backend.txFeed.Send(core.NewTxsEvent{Txs: batch2}) - // Poll until we get some results or timeout - var collected []common.Hash + // Poll until we get an error or timeout timeout := time.Now().Add(2 * time.Second) + var gotErr error for time.Now().Before(timeout) { - results, err := api.GetFilterChanges(fid) + _, err := api.GetFilterChanges(fid) if err != nil { - t.Fatalf("GetFilterChanges error: %v", err) - } - collected = append(collected, results.([]common.Hash)...) - if len(collected) > 0 { + gotErr = err break } time.Sleep(10 * time.Millisecond) } - // The filter must have held at most maxItems transactions at any point. - // Since we drain in one poll, collected length should be <= maxItems. - if len(collected) > maxItems { - t.Errorf("expected at most %d items in capped filter queue, got %d", maxItems, len(collected)) - } - if len(collected) == 0 { - t.Error("expected at least some transactions to be received") + if gotErr == nil { + t.Error("expected filter to be uninstalled and return an error, but got none") + } else if !errors.Is(gotErr, errFilterNotFound) { + t.Errorf("expected errFilterNotFound, got %v", gotErr) } } -// TestBlockFilterQueueCap verifies that the block polling filter drops the oldest -// block hashes when the queue exceeds MaxPendingItems. +// TestBlockFilterQueueCap verifies that the block polling filter +// uninstalls the filter when the queue exceeds MaxPendingItems. func TestBlockFilterQueueCap(t *testing.T) { t.Parallel() @@ -1004,31 +998,27 @@ func TestBlockFilterQueueCap(t *testing.T) { backend.chainFeed.Send(core.ChainEvent{Header: blk.Header()}) } - // Poll until we get some results or timeout - var collected []common.Hash + // Poll until we get an error or timeout timeout := time.Now().Add(2 * time.Second) + var gotErr error for time.Now().Before(timeout) { - results, err := api.GetFilterChanges(fid) + _, err := api.GetFilterChanges(fid) if err != nil { - t.Fatalf("GetFilterChanges error: %v", err) - } - collected = append(collected, results.([]common.Hash)...) - if len(collected) > 0 { + gotErr = err break } time.Sleep(10 * time.Millisecond) } - if len(collected) > maxItems { - t.Errorf("expected at most %d block hashes in capped filter queue, got %d", maxItems, len(collected)) - } - if len(collected) == 0 { - t.Error("expected at least some block hashes to be received") + if gotErr == nil { + t.Error("expected filter to be uninstalled and return an error, but got none") + } else if !errors.Is(gotErr, errFilterNotFound) { + t.Errorf("expected errFilterNotFound, got %v", gotErr) } } -// TestLogFilterQueueCap verifies that the log polling filter drops the oldest logs -// when the queue exceeds MaxPendingItems. +// TestLogFilterQueueCap verifies that the log polling filter +// uninstalls the filter when the queue exceeds MaxPendingItems. func TestLogFilterQueueCap(t *testing.T) { t.Parallel() @@ -1061,32 +1051,21 @@ func TestLogFilterQueueCap(t *testing.T) { time.Sleep(50 * time.Millisecond) // give subscription time to install backend.logsFeed.Send(allLogs) - // Poll until we get some results or timeout - var collected []*types.Log + // Poll until we get an error or timeout timeout := time.Now().Add(2 * time.Second) + var gotErr error for time.Now().Before(timeout) { - results, err := api.GetFilterChanges(fid) + _, err := api.GetFilterChanges(fid) if err != nil { - t.Fatalf("GetFilterChanges error: %v", err) - } - collected = append(collected, results.([]*types.Log)...) - if len(collected) > 0 { + gotErr = err break } time.Sleep(10 * time.Millisecond) } - if len(collected) > maxItems { - t.Errorf("expected at most %d logs in capped filter queue, got %d", maxItems, len(collected)) - } - if len(collected) == 0 { - t.Error("expected at least some logs to be received") - } - // The logs we receive must be the NEWEST (highest BlockNumber), - // because we drop the oldest. - for _, l := range collected { - if l.BlockNumber <= uint64(len(allLogs)-maxItems) { - t.Errorf("received dropped log with BlockNumber %d (expected only blocks > %d)", l.BlockNumber, len(allLogs)-maxItems) - } + if gotErr == nil { + t.Error("expected filter to be uninstalled and return an error, but got none") + } else if !errors.Is(gotErr, errFilterNotFound) { + t.Errorf("expected errFilterNotFound, got %v", gotErr) } }