eth/filters: cap polling filter queues to prevent unbounded memory growth

Polling filters (eth_newFilter, eth_newBlockFilter,
eth_newPendingTransactionFilter) buffer incoming events between
GetFilterChanges calls. If a client polls infrequently the internal
slices (f.logs, f.hashes, f.txs) grow without bound, potentially
consuming hundreds of MB of RAM under high event traffic.

Add MaxPendingItems to Config (default 10 000). When a filter's queue
exceeds this limit the oldest items are dropped to make room for new
ones, keeping memory bounded at the cost of silently losing stale data.

The drop-oldest policy is appropriate here because:
- Events that exceed the cap are already "stale" relative to chain head
- Clients that care about every event should use eth_subscribe instead
  of polling (which is the recommended pattern)

Add a debug log line when items are dropped so operators can detect
clients that poll too infrequently in their logs.

Three new tests verify the cap behaviour for each filter type:
  TestPendingTxFilterQueueCap
  TestBlockFilterQueueCap
  TestLogFilterQueueCap
This commit is contained in:
locoholy 2026-02-24 11:49:39 +05:00
parent 82fad31540
commit 5569759238
3 changed files with 214 additions and 17 deletions

View file

@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core/history"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
@ -83,24 +84,26 @@ type filter struct {
// FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such as blocks, transactions and logs.
type FilterAPI struct {
sys *FilterSystem
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
timeout time.Duration
logQueryLimit int
rangeLimit uint64
sys *FilterSystem
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
timeout time.Duration
logQueryLimit int
rangeLimit uint64
maxPendingItems int // max buffered items per polling filter; oldest are dropped when exceeded
}
// NewFilterAPI returns a new FilterAPI instance.
func NewFilterAPI(system *FilterSystem) *FilterAPI {
api := &FilterAPI{
sys: system,
events: NewEventSystem(system),
filters: make(map[rpc.ID]*filter),
timeout: system.cfg.Timeout,
logQueryLimit: system.cfg.LogQueryLimit,
rangeLimit: system.cfg.RangeLimit,
sys: system,
events: NewEventSystem(system),
filters: make(map[rpc.ID]*filter),
timeout: system.cfg.Timeout,
logQueryLimit: system.cfg.LogQueryLimit,
rangeLimit: system.cfg.RangeLimit,
maxPendingItems: system.cfg.MaxPendingItems,
}
go api.timeoutLoop(system.cfg.Timeout)
@ -164,6 +167,13 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID]; found {
f.txs = append(f.txs, pTx...)
// Cap the queue: drop oldest items when the limit is exceeded
// to prevent unbounded memory growth if the client polls infrequently.
if max := api.maxPendingItems; max > 0 && len(f.txs) > max {
log.Debug("Pending tx filter queue overflow, dropping oldest items",
"id", pendingTxSub.ID, "dropped", len(f.txs)-max)
f.txs = f.txs[len(f.txs)-max:]
}
}
api.filtersMu.Unlock()
case <-pendingTxSub.Err():
@ -239,6 +249,12 @@ func (api *FilterAPI) NewBlockFilter() rpc.ID {
api.filtersMu.Lock()
if f, found := api.filters[headerSub.ID]; found {
f.hashes = append(f.hashes, h.Hash())
// Cap the queue: drop oldest block hashes when the limit is exceeded.
if max := api.maxPendingItems; max > 0 && len(f.hashes) > max {
log.Debug("Block filter queue overflow, dropping oldest items",
"id", headerSub.ID, "dropped", len(f.hashes)-max)
f.hashes = f.hashes[len(f.hashes)-max:]
}
}
api.filtersMu.Unlock()
case <-headerSub.Err():
@ -425,6 +441,13 @@ func (api *FilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
api.filtersMu.Lock()
if f, found := api.filters[logsSub.ID]; found {
f.logs = append(f.logs, l...)
// Cap the queue: drop oldest logs when the limit is exceeded
// to prevent unbounded memory growth if the client polls infrequently.
if max := api.maxPendingItems; max > 0 && len(f.logs) > max {
log.Debug("Log filter queue overflow, dropping oldest items",
"id", logsSub.ID, "dropped", len(f.logs)-max)
f.logs = f.logs[len(f.logs)-max:]
}
}
api.filtersMu.Unlock()
case <-logsSub.Err():

View file

@ -41,10 +41,13 @@ import (
// Config represents the configuration of the filter system.
type Config struct {
LogCacheSize int // maximum number of cached blocks (default: 32)
Timeout time.Duration // how long filters stay active (default: 5min)
LogQueryLimit int // maximum number of addresses allowed in filter criteria (default: 1000)
RangeLimit uint64 // maximum block range allowed in filter criteria (default: 0)
LogCacheSize int // maximum number of cached blocks (default: 32)
Timeout time.Duration // how long filters stay active (default: 5min)
LogQueryLimit int // maximum number of addresses allowed in filter criteria (default: 1000)
RangeLimit uint64 // maximum block range allowed in filter criteria (default: 0)
MaxPendingItems int // maximum number of items buffered in a polling filter queue (default: 10000)
// When the limit is reached, the oldest items are dropped to make room for new ones.
// This prevents unbounded memory growth when clients poll eth_getFilterChanges infrequently.
}
func (cfg Config) withDefaults() Config {
@ -54,6 +57,9 @@ func (cfg Config) withDefaults() Config {
if cfg.LogCacheSize == 0 {
cfg.LogCacheSize = 32
}
if cfg.MaxPendingItems == 0 {
cfg.MaxPendingItems = 10000
}
return cfg
}

View file

@ -922,3 +922,171 @@ func TestTransactionReceiptsSubscription(t *testing.T) {
})
}
}
// TestPendingTxFilterQueueCap verifies that the pending transaction polling filter
// drops the oldest transactions when the queue exceeds MaxPendingItems, preventing
// unbounded memory growth when the client polls infrequently.
func TestPendingTxFilterQueueCap(t *testing.T) {
t.Parallel()
const maxItems = 5
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(db, Config{MaxPendingItems: maxItems})
api = NewFilterAPI(sys)
)
fid := api.NewPendingTransactionFilter(nil)
// Send 10 transactions in two batches — more than MaxPendingItems
batch1 := make([]*types.Transaction, 8)
for i := range batch1 {
batch1[i] = types.NewTransaction(uint64(i), common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil)
}
batch2 := make([]*types.Transaction, 2)
for i := range batch2 {
batch2[i] = types.NewTransaction(uint64(len(batch1)+i), common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil)
}
time.Sleep(50 * time.Millisecond) // give subscription time to install
backend.txFeed.Send(core.NewTxsEvent{Txs: batch1})
backend.txFeed.Send(core.NewTxsEvent{Txs: batch2})
// Poll until we get some results or timeout
var collected []common.Hash
timeout := time.Now().Add(2 * time.Second)
for time.Now().Before(timeout) {
results, err := api.GetFilterChanges(fid)
if err != nil {
t.Fatalf("GetFilterChanges error: %v", err)
}
collected = append(collected, results.([]common.Hash)...)
if len(collected) > 0 {
break
}
time.Sleep(10 * time.Millisecond)
}
// The filter must have held at most maxItems transactions at any point.
// Since we drain in one poll, collected length should be <= maxItems.
if len(collected) > maxItems {
t.Errorf("expected at most %d items in capped filter queue, got %d", maxItems, len(collected))
}
if len(collected) == 0 {
t.Error("expected at least some transactions to be received")
}
}
// TestBlockFilterQueueCap verifies that the block polling filter drops the oldest
// block hashes when the queue exceeds MaxPendingItems.
func TestBlockFilterQueueCap(t *testing.T) {
t.Parallel()
const maxItems = 3
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(db, Config{MaxPendingItems: maxItems})
api = NewFilterAPI(sys)
genesis = &core.Genesis{
Config: params.TestChainConfig,
BaseFee: big.NewInt(params.InitialBaseFee),
}
_, chain, _ = core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 7, func(i int, gen *core.BlockGen) {})
)
fid := api.NewBlockFilter()
time.Sleep(50 * time.Millisecond) // give subscription time to install
// Send 7 block headers — more than maxItems
for _, blk := range chain {
backend.chainFeed.Send(core.ChainEvent{Header: blk.Header()})
}
// Poll until we get some results or timeout
var collected []common.Hash
timeout := time.Now().Add(2 * time.Second)
for time.Now().Before(timeout) {
results, err := api.GetFilterChanges(fid)
if err != nil {
t.Fatalf("GetFilterChanges error: %v", err)
}
collected = append(collected, results.([]common.Hash)...)
if len(collected) > 0 {
break
}
time.Sleep(10 * time.Millisecond)
}
if len(collected) > maxItems {
t.Errorf("expected at most %d block hashes in capped filter queue, got %d", maxItems, len(collected))
}
if len(collected) == 0 {
t.Error("expected at least some block hashes to be received")
}
}
// TestLogFilterQueueCap verifies that the log polling filter drops the oldest logs
// when the queue exceeds MaxPendingItems.
func TestLogFilterQueueCap(t *testing.T) {
t.Parallel()
const maxItems = 3
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(db, Config{MaxPendingItems: maxItems})
api = NewFilterAPI(sys)
addr = common.HexToAddress("0x1111111111111111111111111111111111111111")
// 7 logs — more than maxItems
allLogs = []*types.Log{
{Address: addr, BlockNumber: 1},
{Address: addr, BlockNumber: 2},
{Address: addr, BlockNumber: 3},
{Address: addr, BlockNumber: 4},
{Address: addr, BlockNumber: 5},
{Address: addr, BlockNumber: 6},
{Address: addr, BlockNumber: 7},
}
)
fid, err := api.NewFilter(FilterCriteria{})
if err != nil {
t.Fatalf("NewFilter error: %v", err)
}
time.Sleep(50 * time.Millisecond) // give subscription time to install
backend.logsFeed.Send(allLogs)
// Poll until we get some results or timeout
var collected []*types.Log
timeout := time.Now().Add(2 * time.Second)
for time.Now().Before(timeout) {
results, err := api.GetFilterChanges(fid)
if err != nil {
t.Fatalf("GetFilterChanges error: %v", err)
}
collected = append(collected, results.([]*types.Log)...)
if len(collected) > 0 {
break
}
time.Sleep(10 * time.Millisecond)
}
if len(collected) > maxItems {
t.Errorf("expected at most %d logs in capped filter queue, got %d", maxItems, len(collected))
}
if len(collected) == 0 {
t.Error("expected at least some logs to be received")
}
// The logs we receive must be the NEWEST (highest BlockNumber),
// because we drop the oldest.
for _, l := range collected {
if l.BlockNumber <= uint64(len(allLogs)-maxItems) {
t.Errorf("received dropped log with BlockNumber %d (expected only blocks > %d)", l.BlockNumber, len(allLogs)-maxItems)
}
}
}