eth/filters: uninstall filter instead of dropping oldest items when queue exceeds capacity

This commit is contained in:
locoholy 2026-02-26 14:44:47 +05:00
parent 5569759238
commit a2b14e4d28
2 changed files with 50 additions and 62 deletions

View file

@ -164,18 +164,21 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
for {
select {
case pTx := <-pendingTxs:
var drop bool
api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID]; found {
f.txs = append(f.txs, pTx...)
// Cap the queue: drop oldest items when the limit is exceeded
// Cap the queue: uninstall filter when the limit is exceeded
// to prevent unbounded memory growth if the client polls infrequently.
if max := api.maxPendingItems; max > 0 && len(f.txs) > max {
log.Debug("Pending tx filter queue overflow, dropping oldest items",
"id", pendingTxSub.ID, "dropped", len(f.txs)-max)
f.txs = f.txs[len(f.txs)-max:]
drop = true
}
}
api.filtersMu.Unlock()
if drop {
log.Debug("Pending tx filter queue overflow, uninstalling filter", "id", pendingTxSub.ID)
api.UninstallFilter(pendingTxSub.ID)
}
case <-pendingTxSub.Err():
api.filtersMu.Lock()
delete(api.filters, pendingTxSub.ID)
@ -246,17 +249,20 @@ func (api *FilterAPI) NewBlockFilter() rpc.ID {
for {
select {
case h := <-headers:
var drop bool
api.filtersMu.Lock()
if f, found := api.filters[headerSub.ID]; found {
f.hashes = append(f.hashes, h.Hash())
// Cap the queue: drop oldest block hashes when the limit is exceeded.
// Cap the queue: uninstall filter when the limit is exceeded.
if max := api.maxPendingItems; max > 0 && len(f.hashes) > max {
log.Debug("Block filter queue overflow, dropping oldest items",
"id", headerSub.ID, "dropped", len(f.hashes)-max)
f.hashes = f.hashes[len(f.hashes)-max:]
drop = true
}
}
api.filtersMu.Unlock()
if drop {
log.Debug("Block filter queue overflow, uninstalling filter", "id", headerSub.ID)
api.UninstallFilter(headerSub.ID)
}
case <-headerSub.Err():
api.filtersMu.Lock()
delete(api.filters, headerSub.ID)
@ -438,18 +444,21 @@ func (api *FilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
for {
select {
case l := <-logs:
var drop bool
api.filtersMu.Lock()
if f, found := api.filters[logsSub.ID]; found {
f.logs = append(f.logs, l...)
// Cap the queue: drop oldest logs when the limit is exceeded
// Cap the queue: uninstall filter when the limit is exceeded
// to prevent unbounded memory growth if the client polls infrequently.
if max := api.maxPendingItems; max > 0 && len(f.logs) > max {
log.Debug("Log filter queue overflow, dropping oldest items",
"id", logsSub.ID, "dropped", len(f.logs)-max)
f.logs = f.logs[len(f.logs)-max:]
drop = true
}
}
api.filtersMu.Unlock()
if drop {
log.Debug("Log filter queue overflow, uninstalling filter", "id", logsSub.ID)
api.UninstallFilter(logsSub.ID)
}
case <-logsSub.Err():
api.filtersMu.Lock()
delete(api.filters, logsSub.ID)

View file

@ -924,7 +924,7 @@ func TestTransactionReceiptsSubscription(t *testing.T) {
}
// TestPendingTxFilterQueueCap verifies that the pending transaction polling filter
// drops the oldest transactions when the queue exceeds MaxPendingItems, preventing
// uninstalls the filter when the queue exceeds MaxPendingItems, preventing
// unbounded memory growth when the client polls infrequently.
func TestPendingTxFilterQueueCap(t *testing.T) {
t.Parallel()
@ -953,33 +953,27 @@ func TestPendingTxFilterQueueCap(t *testing.T) {
backend.txFeed.Send(core.NewTxsEvent{Txs: batch1})
backend.txFeed.Send(core.NewTxsEvent{Txs: batch2})
// Poll until we get some results or timeout
var collected []common.Hash
// Poll until we get an error or timeout
timeout := time.Now().Add(2 * time.Second)
var gotErr error
for time.Now().Before(timeout) {
results, err := api.GetFilterChanges(fid)
_, err := api.GetFilterChanges(fid)
if err != nil {
t.Fatalf("GetFilterChanges error: %v", err)
}
collected = append(collected, results.([]common.Hash)...)
if len(collected) > 0 {
gotErr = err
break
}
time.Sleep(10 * time.Millisecond)
}
// The filter must have held at most maxItems transactions at any point.
// Since we drain in one poll, collected length should be <= maxItems.
if len(collected) > maxItems {
t.Errorf("expected at most %d items in capped filter queue, got %d", maxItems, len(collected))
}
if len(collected) == 0 {
t.Error("expected at least some transactions to be received")
if gotErr == nil {
t.Error("expected filter to be uninstalled and return an error, but got none")
} else if !errors.Is(gotErr, errFilterNotFound) {
t.Errorf("expected errFilterNotFound, got %v", gotErr)
}
}
// TestBlockFilterQueueCap verifies that the block polling filter drops the oldest
// block hashes when the queue exceeds MaxPendingItems.
// TestBlockFilterQueueCap verifies that the block polling filter
// uninstalls the filter when the queue exceeds MaxPendingItems.
func TestBlockFilterQueueCap(t *testing.T) {
t.Parallel()
@ -1004,31 +998,27 @@ func TestBlockFilterQueueCap(t *testing.T) {
backend.chainFeed.Send(core.ChainEvent{Header: blk.Header()})
}
// Poll until we get some results or timeout
var collected []common.Hash
// Poll until we get an error or timeout
timeout := time.Now().Add(2 * time.Second)
var gotErr error
for time.Now().Before(timeout) {
results, err := api.GetFilterChanges(fid)
_, err := api.GetFilterChanges(fid)
if err != nil {
t.Fatalf("GetFilterChanges error: %v", err)
}
collected = append(collected, results.([]common.Hash)...)
if len(collected) > 0 {
gotErr = err
break
}
time.Sleep(10 * time.Millisecond)
}
if len(collected) > maxItems {
t.Errorf("expected at most %d block hashes in capped filter queue, got %d", maxItems, len(collected))
}
if len(collected) == 0 {
t.Error("expected at least some block hashes to be received")
if gotErr == nil {
t.Error("expected filter to be uninstalled and return an error, but got none")
} else if !errors.Is(gotErr, errFilterNotFound) {
t.Errorf("expected errFilterNotFound, got %v", gotErr)
}
}
// TestLogFilterQueueCap verifies that the log polling filter drops the oldest logs
// when the queue exceeds MaxPendingItems.
// TestLogFilterQueueCap verifies that the log polling filter
// uninstalls the filter when the queue exceeds MaxPendingItems.
func TestLogFilterQueueCap(t *testing.T) {
t.Parallel()
@ -1061,32 +1051,21 @@ func TestLogFilterQueueCap(t *testing.T) {
time.Sleep(50 * time.Millisecond) // give subscription time to install
backend.logsFeed.Send(allLogs)
// Poll until we get some results or timeout
var collected []*types.Log
// Poll until we get an error or timeout
timeout := time.Now().Add(2 * time.Second)
var gotErr error
for time.Now().Before(timeout) {
results, err := api.GetFilterChanges(fid)
_, err := api.GetFilterChanges(fid)
if err != nil {
t.Fatalf("GetFilterChanges error: %v", err)
}
collected = append(collected, results.([]*types.Log)...)
if len(collected) > 0 {
gotErr = err
break
}
time.Sleep(10 * time.Millisecond)
}
if len(collected) > maxItems {
t.Errorf("expected at most %d logs in capped filter queue, got %d", maxItems, len(collected))
}
if len(collected) == 0 {
t.Error("expected at least some logs to be received")
}
// The logs we receive must be the NEWEST (highest BlockNumber),
// because we drop the oldest.
for _, l := range collected {
if l.BlockNumber <= uint64(len(allLogs)-maxItems) {
t.Errorf("received dropped log with BlockNumber %d (expected only blocks > %d)", l.BlockNumber, len(allLogs)-maxItems)
}
if gotErr == nil {
t.Error("expected filter to be uninstalled and return an error, but got none")
} else if !errors.Is(gotErr, errFilterNotFound) {
t.Errorf("expected errFilterNotFound, got %v", gotErr)
}
}