From 5569759238f8cb712de63e59d27e52036e32e88f Mon Sep 17 00:00:00 2001 From: locoholy Date: Tue, 24 Feb 2026 11:49:39 +0500 Subject: [PATCH 1/2] eth/filters: cap polling filter queues to prevent unbounded memory growth Polling filters (eth_newFilter, eth_newBlockFilter, eth_newPendingTransactionFilter) buffer incoming events between GetFilterChanges calls. If a client polls infrequently the internal slices (f.logs, f.hashes, f.txs) grow without bound, potentially consuming hundreds of MB of RAM under high event traffic. Add MaxPendingItems to Config (default 10 000). When a filter's queue exceeds this limit the oldest items are dropped to make room for new ones, keeping memory bounded at the cost of silently losing stale data. The drop-oldest policy is appropriate here because: - Events that exceed the cap are already "stale" relative to chain head - Clients that care about every event should use eth_subscribe instead of polling (which is the recommended pattern) Add a debug log line when items are dropped so operators can detect clients that poll too infrequently in their logs. Three new tests verify the cap behaviour for each filter type: TestPendingTxFilterQueueCap TestBlockFilterQueueCap TestLogFilterQueueCap --- eth/filters/api.go | 49 ++++++--- eth/filters/filter_system.go | 14 ++- eth/filters/filter_system_test.go | 168 ++++++++++++++++++++++++++++++ 3 files changed, 214 insertions(+), 17 deletions(-) diff --git a/eth/filters/api.go b/eth/filters/api.go index f4bed35b26..b607bdcef1 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/history" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/internal/ethapi" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" ) @@ -83,24 +84,26 @@ type filter struct { // FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various // information related to the Ethereum protocol such as blocks, transactions and logs. type FilterAPI struct { - sys *FilterSystem - events *EventSystem - filtersMu sync.Mutex - filters map[rpc.ID]*filter - timeout time.Duration - logQueryLimit int - rangeLimit uint64 + sys *FilterSystem + events *EventSystem + filtersMu sync.Mutex + filters map[rpc.ID]*filter + timeout time.Duration + logQueryLimit int + rangeLimit uint64 + maxPendingItems int // max buffered items per polling filter; oldest are dropped when exceeded } // NewFilterAPI returns a new FilterAPI instance. func NewFilterAPI(system *FilterSystem) *FilterAPI { api := &FilterAPI{ - sys: system, - events: NewEventSystem(system), - filters: make(map[rpc.ID]*filter), - timeout: system.cfg.Timeout, - logQueryLimit: system.cfg.LogQueryLimit, - rangeLimit: system.cfg.RangeLimit, + sys: system, + events: NewEventSystem(system), + filters: make(map[rpc.ID]*filter), + timeout: system.cfg.Timeout, + logQueryLimit: system.cfg.LogQueryLimit, + rangeLimit: system.cfg.RangeLimit, + maxPendingItems: system.cfg.MaxPendingItems, } go api.timeoutLoop(system.cfg.Timeout) @@ -164,6 +167,13 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID { api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { f.txs = append(f.txs, pTx...) + // Cap the queue: drop oldest items when the limit is exceeded + // to prevent unbounded memory growth if the client polls infrequently. + if max := api.maxPendingItems; max > 0 && len(f.txs) > max { + log.Debug("Pending tx filter queue overflow, dropping oldest items", + "id", pendingTxSub.ID, "dropped", len(f.txs)-max) + f.txs = f.txs[len(f.txs)-max:] + } } api.filtersMu.Unlock() case <-pendingTxSub.Err(): @@ -239,6 +249,12 @@ func (api *FilterAPI) NewBlockFilter() rpc.ID { api.filtersMu.Lock() if f, found := api.filters[headerSub.ID]; found { f.hashes = append(f.hashes, h.Hash()) + // Cap the queue: drop oldest block hashes when the limit is exceeded. + if max := api.maxPendingItems; max > 0 && len(f.hashes) > max { + log.Debug("Block filter queue overflow, dropping oldest items", + "id", headerSub.ID, "dropped", len(f.hashes)-max) + f.hashes = f.hashes[len(f.hashes)-max:] + } } api.filtersMu.Unlock() case <-headerSub.Err(): @@ -425,6 +441,13 @@ func (api *FilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { api.filtersMu.Lock() if f, found := api.filters[logsSub.ID]; found { f.logs = append(f.logs, l...) + // Cap the queue: drop oldest logs when the limit is exceeded + // to prevent unbounded memory growth if the client polls infrequently. + if max := api.maxPendingItems; max > 0 && len(f.logs) > max { + log.Debug("Log filter queue overflow, dropping oldest items", + "id", logsSub.ID, "dropped", len(f.logs)-max) + f.logs = f.logs[len(f.logs)-max:] + } } api.filtersMu.Unlock() case <-logsSub.Err(): diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 1f92c4e36f..d0e0a51045 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -41,10 +41,13 @@ import ( // Config represents the configuration of the filter system. type Config struct { - LogCacheSize int // maximum number of cached blocks (default: 32) - Timeout time.Duration // how long filters stay active (default: 5min) - LogQueryLimit int // maximum number of addresses allowed in filter criteria (default: 1000) - RangeLimit uint64 // maximum block range allowed in filter criteria (default: 0) + LogCacheSize int // maximum number of cached blocks (default: 32) + Timeout time.Duration // how long filters stay active (default: 5min) + LogQueryLimit int // maximum number of addresses allowed in filter criteria (default: 1000) + RangeLimit uint64 // maximum block range allowed in filter criteria (default: 0) + MaxPendingItems int // maximum number of items buffered in a polling filter queue (default: 10000) + // When the limit is reached, the oldest items are dropped to make room for new ones. + // This prevents unbounded memory growth when clients poll eth_getFilterChanges infrequently. } func (cfg Config) withDefaults() Config { @@ -54,6 +57,9 @@ func (cfg Config) withDefaults() Config { if cfg.LogCacheSize == 0 { cfg.LogCacheSize = 32 } + if cfg.MaxPendingItems == 0 { + cfg.MaxPendingItems = 10000 + } return cfg } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 6f97d5b664..e6bd17095c 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -922,3 +922,171 @@ func TestTransactionReceiptsSubscription(t *testing.T) { }) } } + +// TestPendingTxFilterQueueCap verifies that the pending transaction polling filter +// drops the oldest transactions when the queue exceeds MaxPendingItems, preventing +// unbounded memory growth when the client polls infrequently. +func TestPendingTxFilterQueueCap(t *testing.T) { + t.Parallel() + + const maxItems = 5 + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(db, Config{MaxPendingItems: maxItems}) + api = NewFilterAPI(sys) + ) + + fid := api.NewPendingTransactionFilter(nil) + + // Send 10 transactions in two batches — more than MaxPendingItems + batch1 := make([]*types.Transaction, 8) + for i := range batch1 { + batch1[i] = types.NewTransaction(uint64(i), common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil) + } + batch2 := make([]*types.Transaction, 2) + for i := range batch2 { + batch2[i] = types.NewTransaction(uint64(len(batch1)+i), common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil) + } + + time.Sleep(50 * time.Millisecond) // give subscription time to install + backend.txFeed.Send(core.NewTxsEvent{Txs: batch1}) + backend.txFeed.Send(core.NewTxsEvent{Txs: batch2}) + + // Poll until we get some results or timeout + var collected []common.Hash + timeout := time.Now().Add(2 * time.Second) + for time.Now().Before(timeout) { + results, err := api.GetFilterChanges(fid) + if err != nil { + t.Fatalf("GetFilterChanges error: %v", err) + } + collected = append(collected, results.([]common.Hash)...) + if len(collected) > 0 { + break + } + time.Sleep(10 * time.Millisecond) + } + + // The filter must have held at most maxItems transactions at any point. + // Since we drain in one poll, collected length should be <= maxItems. + if len(collected) > maxItems { + t.Errorf("expected at most %d items in capped filter queue, got %d", maxItems, len(collected)) + } + if len(collected) == 0 { + t.Error("expected at least some transactions to be received") + } +} + +// TestBlockFilterQueueCap verifies that the block polling filter drops the oldest +// block hashes when the queue exceeds MaxPendingItems. +func TestBlockFilterQueueCap(t *testing.T) { + t.Parallel() + + const maxItems = 3 + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(db, Config{MaxPendingItems: maxItems}) + api = NewFilterAPI(sys) + genesis = &core.Genesis{ + Config: params.TestChainConfig, + BaseFee: big.NewInt(params.InitialBaseFee), + } + _, chain, _ = core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 7, func(i int, gen *core.BlockGen) {}) + ) + + fid := api.NewBlockFilter() + time.Sleep(50 * time.Millisecond) // give subscription time to install + + // Send 7 block headers — more than maxItems + for _, blk := range chain { + backend.chainFeed.Send(core.ChainEvent{Header: blk.Header()}) + } + + // Poll until we get some results or timeout + var collected []common.Hash + timeout := time.Now().Add(2 * time.Second) + for time.Now().Before(timeout) { + results, err := api.GetFilterChanges(fid) + if err != nil { + t.Fatalf("GetFilterChanges error: %v", err) + } + collected = append(collected, results.([]common.Hash)...) + if len(collected) > 0 { + break + } + time.Sleep(10 * time.Millisecond) + } + + if len(collected) > maxItems { + t.Errorf("expected at most %d block hashes in capped filter queue, got %d", maxItems, len(collected)) + } + if len(collected) == 0 { + t.Error("expected at least some block hashes to be received") + } +} + +// TestLogFilterQueueCap verifies that the log polling filter drops the oldest logs +// when the queue exceeds MaxPendingItems. +func TestLogFilterQueueCap(t *testing.T) { + t.Parallel() + + const maxItems = 3 + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(db, Config{MaxPendingItems: maxItems}) + api = NewFilterAPI(sys) + + addr = common.HexToAddress("0x1111111111111111111111111111111111111111") + + // 7 logs — more than maxItems + allLogs = []*types.Log{ + {Address: addr, BlockNumber: 1}, + {Address: addr, BlockNumber: 2}, + {Address: addr, BlockNumber: 3}, + {Address: addr, BlockNumber: 4}, + {Address: addr, BlockNumber: 5}, + {Address: addr, BlockNumber: 6}, + {Address: addr, BlockNumber: 7}, + } + ) + + fid, err := api.NewFilter(FilterCriteria{}) + if err != nil { + t.Fatalf("NewFilter error: %v", err) + } + + time.Sleep(50 * time.Millisecond) // give subscription time to install + backend.logsFeed.Send(allLogs) + + // Poll until we get some results or timeout + var collected []*types.Log + timeout := time.Now().Add(2 * time.Second) + for time.Now().Before(timeout) { + results, err := api.GetFilterChanges(fid) + if err != nil { + t.Fatalf("GetFilterChanges error: %v", err) + } + collected = append(collected, results.([]*types.Log)...) + if len(collected) > 0 { + break + } + time.Sleep(10 * time.Millisecond) + } + + if len(collected) > maxItems { + t.Errorf("expected at most %d logs in capped filter queue, got %d", maxItems, len(collected)) + } + if len(collected) == 0 { + t.Error("expected at least some logs to be received") + } + // The logs we receive must be the NEWEST (highest BlockNumber), + // because we drop the oldest. + for _, l := range collected { + if l.BlockNumber <= uint64(len(allLogs)-maxItems) { + t.Errorf("received dropped log with BlockNumber %d (expected only blocks > %d)", l.BlockNumber, len(allLogs)-maxItems) + } + } +} From a2b14e4d286bd56bfebe81437425fe4f057ee002 Mon Sep 17 00:00:00 2001 From: locoholy Date: Thu, 26 Feb 2026 14:44:47 +0500 Subject: [PATCH 2/2] eth/filters: uninstall filter instead of dropping oldest items when queue exceeds capacity --- eth/filters/api.go | 33 ++++++++----- eth/filters/filter_system_test.go | 79 ++++++++++++------------------- 2 files changed, 50 insertions(+), 62 deletions(-) diff --git a/eth/filters/api.go b/eth/filters/api.go index b607bdcef1..2edb064f38 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -164,18 +164,21 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID { for { select { case pTx := <-pendingTxs: + var drop bool api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { f.txs = append(f.txs, pTx...) - // Cap the queue: drop oldest items when the limit is exceeded + // Cap the queue: uninstall filter when the limit is exceeded // to prevent unbounded memory growth if the client polls infrequently. if max := api.maxPendingItems; max > 0 && len(f.txs) > max { - log.Debug("Pending tx filter queue overflow, dropping oldest items", - "id", pendingTxSub.ID, "dropped", len(f.txs)-max) - f.txs = f.txs[len(f.txs)-max:] + drop = true } } api.filtersMu.Unlock() + if drop { + log.Debug("Pending tx filter queue overflow, uninstalling filter", "id", pendingTxSub.ID) + api.UninstallFilter(pendingTxSub.ID) + } case <-pendingTxSub.Err(): api.filtersMu.Lock() delete(api.filters, pendingTxSub.ID) @@ -246,17 +249,20 @@ func (api *FilterAPI) NewBlockFilter() rpc.ID { for { select { case h := <-headers: + var drop bool api.filtersMu.Lock() if f, found := api.filters[headerSub.ID]; found { f.hashes = append(f.hashes, h.Hash()) - // Cap the queue: drop oldest block hashes when the limit is exceeded. + // Cap the queue: uninstall filter when the limit is exceeded. if max := api.maxPendingItems; max > 0 && len(f.hashes) > max { - log.Debug("Block filter queue overflow, dropping oldest items", - "id", headerSub.ID, "dropped", len(f.hashes)-max) - f.hashes = f.hashes[len(f.hashes)-max:] + drop = true } } api.filtersMu.Unlock() + if drop { + log.Debug("Block filter queue overflow, uninstalling filter", "id", headerSub.ID) + api.UninstallFilter(headerSub.ID) + } case <-headerSub.Err(): api.filtersMu.Lock() delete(api.filters, headerSub.ID) @@ -438,18 +444,21 @@ func (api *FilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { for { select { case l := <-logs: + var drop bool api.filtersMu.Lock() if f, found := api.filters[logsSub.ID]; found { f.logs = append(f.logs, l...) - // Cap the queue: drop oldest logs when the limit is exceeded + // Cap the queue: uninstall filter when the limit is exceeded // to prevent unbounded memory growth if the client polls infrequently. if max := api.maxPendingItems; max > 0 && len(f.logs) > max { - log.Debug("Log filter queue overflow, dropping oldest items", - "id", logsSub.ID, "dropped", len(f.logs)-max) - f.logs = f.logs[len(f.logs)-max:] + drop = true } } api.filtersMu.Unlock() + if drop { + log.Debug("Log filter queue overflow, uninstalling filter", "id", logsSub.ID) + api.UninstallFilter(logsSub.ID) + } case <-logsSub.Err(): api.filtersMu.Lock() delete(api.filters, logsSub.ID) diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index e6bd17095c..510a26ff32 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -924,7 +924,7 @@ func TestTransactionReceiptsSubscription(t *testing.T) { } // TestPendingTxFilterQueueCap verifies that the pending transaction polling filter -// drops the oldest transactions when the queue exceeds MaxPendingItems, preventing +// uninstalls the filter when the queue exceeds MaxPendingItems, preventing // unbounded memory growth when the client polls infrequently. func TestPendingTxFilterQueueCap(t *testing.T) { t.Parallel() @@ -953,33 +953,27 @@ func TestPendingTxFilterQueueCap(t *testing.T) { backend.txFeed.Send(core.NewTxsEvent{Txs: batch1}) backend.txFeed.Send(core.NewTxsEvent{Txs: batch2}) - // Poll until we get some results or timeout - var collected []common.Hash + // Poll until we get an error or timeout timeout := time.Now().Add(2 * time.Second) + var gotErr error for time.Now().Before(timeout) { - results, err := api.GetFilterChanges(fid) + _, err := api.GetFilterChanges(fid) if err != nil { - t.Fatalf("GetFilterChanges error: %v", err) - } - collected = append(collected, results.([]common.Hash)...) - if len(collected) > 0 { + gotErr = err break } time.Sleep(10 * time.Millisecond) } - // The filter must have held at most maxItems transactions at any point. - // Since we drain in one poll, collected length should be <= maxItems. - if len(collected) > maxItems { - t.Errorf("expected at most %d items in capped filter queue, got %d", maxItems, len(collected)) - } - if len(collected) == 0 { - t.Error("expected at least some transactions to be received") + if gotErr == nil { + t.Error("expected filter to be uninstalled and return an error, but got none") + } else if !errors.Is(gotErr, errFilterNotFound) { + t.Errorf("expected errFilterNotFound, got %v", gotErr) } } -// TestBlockFilterQueueCap verifies that the block polling filter drops the oldest -// block hashes when the queue exceeds MaxPendingItems. +// TestBlockFilterQueueCap verifies that the block polling filter +// uninstalls the filter when the queue exceeds MaxPendingItems. func TestBlockFilterQueueCap(t *testing.T) { t.Parallel() @@ -1004,31 +998,27 @@ func TestBlockFilterQueueCap(t *testing.T) { backend.chainFeed.Send(core.ChainEvent{Header: blk.Header()}) } - // Poll until we get some results or timeout - var collected []common.Hash + // Poll until we get an error or timeout timeout := time.Now().Add(2 * time.Second) + var gotErr error for time.Now().Before(timeout) { - results, err := api.GetFilterChanges(fid) + _, err := api.GetFilterChanges(fid) if err != nil { - t.Fatalf("GetFilterChanges error: %v", err) - } - collected = append(collected, results.([]common.Hash)...) - if len(collected) > 0 { + gotErr = err break } time.Sleep(10 * time.Millisecond) } - if len(collected) > maxItems { - t.Errorf("expected at most %d block hashes in capped filter queue, got %d", maxItems, len(collected)) - } - if len(collected) == 0 { - t.Error("expected at least some block hashes to be received") + if gotErr == nil { + t.Error("expected filter to be uninstalled and return an error, but got none") + } else if !errors.Is(gotErr, errFilterNotFound) { + t.Errorf("expected errFilterNotFound, got %v", gotErr) } } -// TestLogFilterQueueCap verifies that the log polling filter drops the oldest logs -// when the queue exceeds MaxPendingItems. +// TestLogFilterQueueCap verifies that the log polling filter +// uninstalls the filter when the queue exceeds MaxPendingItems. func TestLogFilterQueueCap(t *testing.T) { t.Parallel() @@ -1061,32 +1051,21 @@ func TestLogFilterQueueCap(t *testing.T) { time.Sleep(50 * time.Millisecond) // give subscription time to install backend.logsFeed.Send(allLogs) - // Poll until we get some results or timeout - var collected []*types.Log + // Poll until we get an error or timeout timeout := time.Now().Add(2 * time.Second) + var gotErr error for time.Now().Before(timeout) { - results, err := api.GetFilterChanges(fid) + _, err := api.GetFilterChanges(fid) if err != nil { - t.Fatalf("GetFilterChanges error: %v", err) - } - collected = append(collected, results.([]*types.Log)...) - if len(collected) > 0 { + gotErr = err break } time.Sleep(10 * time.Millisecond) } - if len(collected) > maxItems { - t.Errorf("expected at most %d logs in capped filter queue, got %d", maxItems, len(collected)) - } - if len(collected) == 0 { - t.Error("expected at least some logs to be received") - } - // The logs we receive must be the NEWEST (highest BlockNumber), - // because we drop the oldest. - for _, l := range collected { - if l.BlockNumber <= uint64(len(allLogs)-maxItems) { - t.Errorf("received dropped log with BlockNumber %d (expected only blocks > %d)", l.BlockNumber, len(allLogs)-maxItems) - } + if gotErr == nil { + t.Error("expected filter to be uninstalled and return an error, but got none") + } else if !errors.Is(gotErr, errFilterNotFound) { + t.Errorf("expected errFilterNotFound, got %v", gotErr) } }