diff --git a/eth/filters/api.go b/eth/filters/api.go index e4ade96598..9c3d14b2f5 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/history" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/internal/ethapi" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" ) @@ -83,24 +84,26 @@ type filter struct { // FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various // information related to the Ethereum protocol such as blocks, transactions and logs. type FilterAPI struct { - sys *FilterSystem - events *EventSystem - filtersMu sync.Mutex - filters map[rpc.ID]*filter - timeout time.Duration - logQueryLimit int - rangeLimit uint64 + sys *FilterSystem + events *EventSystem + filtersMu sync.Mutex + filters map[rpc.ID]*filter + timeout time.Duration + logQueryLimit int + rangeLimit uint64 + maxPendingItems int // max buffered items per polling filter; oldest are dropped when exceeded } // NewFilterAPI returns a new FilterAPI instance. func NewFilterAPI(system *FilterSystem) *FilterAPI { api := &FilterAPI{ - sys: system, - events: NewEventSystem(system), - filters: make(map[rpc.ID]*filter), - timeout: system.cfg.Timeout, - logQueryLimit: system.cfg.LogQueryLimit, - rangeLimit: system.cfg.RangeLimit, + sys: system, + events: NewEventSystem(system), + filters: make(map[rpc.ID]*filter), + timeout: system.cfg.Timeout, + logQueryLimit: system.cfg.LogQueryLimit, + rangeLimit: system.cfg.RangeLimit, + maxPendingItems: system.cfg.MaxPendingItems, } go api.timeoutLoop(system.cfg.Timeout) @@ -161,11 +164,21 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID { for { select { case pTx := <-pendingTxs: + var drop bool api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { f.txs = append(f.txs, pTx...) + // Cap the queue: uninstall filter when the limit is exceeded + // to prevent unbounded memory growth if the client polls infrequently. + if max := api.maxPendingItems; max > 0 && len(f.txs) > max { + drop = true + } } api.filtersMu.Unlock() + if drop { + log.Debug("Pending tx filter queue overflow, uninstalling filter", "id", pendingTxSub.ID) + api.UninstallFilter(pendingTxSub.ID) + } case <-pendingTxSub.Err(): api.filtersMu.Lock() delete(api.filters, pendingTxSub.ID) @@ -238,11 +251,20 @@ func (api *FilterAPI) NewBlockFilter() rpc.ID { for { select { case h := <-headers: + var drop bool api.filtersMu.Lock() if f, found := api.filters[headerSub.ID]; found { f.hashes = append(f.hashes, h.Hash()) + // Cap the queue: uninstall filter when the limit is exceeded. + if max := api.maxPendingItems; max > 0 && len(f.hashes) > max { + drop = true + } } api.filtersMu.Unlock() + if drop { + log.Debug("Block filter queue overflow, uninstalling filter", "id", headerSub.ID) + api.UninstallFilter(headerSub.ID) + } case <-headerSub.Err(): api.filtersMu.Lock() delete(api.filters, headerSub.ID) @@ -426,11 +448,21 @@ func (api *FilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { for { select { case l := <-logs: + var drop bool api.filtersMu.Lock() if f, found := api.filters[logsSub.ID]; found { f.logs = append(f.logs, l...) + // Cap the queue: uninstall filter when the limit is exceeded + // to prevent unbounded memory growth if the client polls infrequently. + if max := api.maxPendingItems; max > 0 && len(f.logs) > max { + drop = true + } } api.filtersMu.Unlock() + if drop { + log.Debug("Log filter queue overflow, uninstalling filter", "id", logsSub.ID) + api.UninstallFilter(logsSub.ID) + } case <-logsSub.Err(): api.filtersMu.Lock() delete(api.filters, logsSub.ID) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 1f92c4e36f..d0e0a51045 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -41,10 +41,13 @@ import ( // Config represents the configuration of the filter system. type Config struct { - LogCacheSize int // maximum number of cached blocks (default: 32) - Timeout time.Duration // how long filters stay active (default: 5min) - LogQueryLimit int // maximum number of addresses allowed in filter criteria (default: 1000) - RangeLimit uint64 // maximum block range allowed in filter criteria (default: 0) + LogCacheSize int // maximum number of cached blocks (default: 32) + Timeout time.Duration // how long filters stay active (default: 5min) + LogQueryLimit int // maximum number of addresses allowed in filter criteria (default: 1000) + RangeLimit uint64 // maximum block range allowed in filter criteria (default: 0) + MaxPendingItems int // maximum number of items buffered in a polling filter queue (default: 10000) + // When the limit is reached, the oldest items are dropped to make room for new ones. + // This prevents unbounded memory growth when clients poll eth_getFilterChanges infrequently. } func (cfg Config) withDefaults() Config { @@ -54,6 +57,9 @@ func (cfg Config) withDefaults() Config { if cfg.LogCacheSize == 0 { cfg.LogCacheSize = 32 } + if cfg.MaxPendingItems == 0 { + cfg.MaxPendingItems = 10000 + } return cfg } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 6f97d5b664..510a26ff32 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -922,3 +922,150 @@ func TestTransactionReceiptsSubscription(t *testing.T) { }) } } + +// TestPendingTxFilterQueueCap verifies that the pending transaction polling filter +// uninstalls the filter when the queue exceeds MaxPendingItems, preventing +// unbounded memory growth when the client polls infrequently. +func TestPendingTxFilterQueueCap(t *testing.T) { + t.Parallel() + + const maxItems = 5 + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(db, Config{MaxPendingItems: maxItems}) + api = NewFilterAPI(sys) + ) + + fid := api.NewPendingTransactionFilter(nil) + + // Send 10 transactions in two batches — more than MaxPendingItems + batch1 := make([]*types.Transaction, 8) + for i := range batch1 { + batch1[i] = types.NewTransaction(uint64(i), common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil) + } + batch2 := make([]*types.Transaction, 2) + for i := range batch2 { + batch2[i] = types.NewTransaction(uint64(len(batch1)+i), common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil) + } + + time.Sleep(50 * time.Millisecond) // give subscription time to install + backend.txFeed.Send(core.NewTxsEvent{Txs: batch1}) + backend.txFeed.Send(core.NewTxsEvent{Txs: batch2}) + + // Poll until we get an error or timeout + timeout := time.Now().Add(2 * time.Second) + var gotErr error + for time.Now().Before(timeout) { + _, err := api.GetFilterChanges(fid) + if err != nil { + gotErr = err + break + } + time.Sleep(10 * time.Millisecond) + } + + if gotErr == nil { + t.Error("expected filter to be uninstalled and return an error, but got none") + } else if !errors.Is(gotErr, errFilterNotFound) { + t.Errorf("expected errFilterNotFound, got %v", gotErr) + } +} + +// TestBlockFilterQueueCap verifies that the block polling filter +// uninstalls the filter when the queue exceeds MaxPendingItems. +func TestBlockFilterQueueCap(t *testing.T) { + t.Parallel() + + const maxItems = 3 + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(db, Config{MaxPendingItems: maxItems}) + api = NewFilterAPI(sys) + genesis = &core.Genesis{ + Config: params.TestChainConfig, + BaseFee: big.NewInt(params.InitialBaseFee), + } + _, chain, _ = core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 7, func(i int, gen *core.BlockGen) {}) + ) + + fid := api.NewBlockFilter() + time.Sleep(50 * time.Millisecond) // give subscription time to install + + // Send 7 block headers — more than maxItems + for _, blk := range chain { + backend.chainFeed.Send(core.ChainEvent{Header: blk.Header()}) + } + + // Poll until we get an error or timeout + timeout := time.Now().Add(2 * time.Second) + var gotErr error + for time.Now().Before(timeout) { + _, err := api.GetFilterChanges(fid) + if err != nil { + gotErr = err + break + } + time.Sleep(10 * time.Millisecond) + } + + if gotErr == nil { + t.Error("expected filter to be uninstalled and return an error, but got none") + } else if !errors.Is(gotErr, errFilterNotFound) { + t.Errorf("expected errFilterNotFound, got %v", gotErr) + } +} + +// TestLogFilterQueueCap verifies that the log polling filter +// uninstalls the filter when the queue exceeds MaxPendingItems. +func TestLogFilterQueueCap(t *testing.T) { + t.Parallel() + + const maxItems = 3 + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(db, Config{MaxPendingItems: maxItems}) + api = NewFilterAPI(sys) + + addr = common.HexToAddress("0x1111111111111111111111111111111111111111") + + // 7 logs — more than maxItems + allLogs = []*types.Log{ + {Address: addr, BlockNumber: 1}, + {Address: addr, BlockNumber: 2}, + {Address: addr, BlockNumber: 3}, + {Address: addr, BlockNumber: 4}, + {Address: addr, BlockNumber: 5}, + {Address: addr, BlockNumber: 6}, + {Address: addr, BlockNumber: 7}, + } + ) + + fid, err := api.NewFilter(FilterCriteria{}) + if err != nil { + t.Fatalf("NewFilter error: %v", err) + } + + time.Sleep(50 * time.Millisecond) // give subscription time to install + backend.logsFeed.Send(allLogs) + + // Poll until we get an error or timeout + timeout := time.Now().Add(2 * time.Second) + var gotErr error + for time.Now().Before(timeout) { + _, err := api.GetFilterChanges(fid) + if err != nil { + gotErr = err + break + } + time.Sleep(10 * time.Millisecond) + } + + if gotErr == nil { + t.Error("expected filter to be uninstalled and return an error, but got none") + } else if !errors.Is(gotErr, errFilterNotFound) { + t.Errorf("expected errFilterNotFound, got %v", gotErr) + } +}