This commit is contained in:
locoholy 2026-05-21 21:55:09 -07:00 committed by GitHub
commit 3fe7ec6db9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 202 additions and 17 deletions

View file

@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/core/history"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)
@ -83,24 +84,26 @@ type filter struct {
// FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such as blocks, transactions and logs.
type FilterAPI struct {
sys *FilterSystem
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
timeout time.Duration
logQueryLimit int
rangeLimit uint64
sys *FilterSystem
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
timeout time.Duration
logQueryLimit int
rangeLimit uint64
maxPendingItems int // max buffered items per polling filter; oldest are dropped when exceeded
}
// NewFilterAPI returns a new FilterAPI instance.
func NewFilterAPI(system *FilterSystem) *FilterAPI {
api := &FilterAPI{
sys: system,
events: NewEventSystem(system),
filters: make(map[rpc.ID]*filter),
timeout: system.cfg.Timeout,
logQueryLimit: system.cfg.LogQueryLimit,
rangeLimit: system.cfg.RangeLimit,
sys: system,
events: NewEventSystem(system),
filters: make(map[rpc.ID]*filter),
timeout: system.cfg.Timeout,
logQueryLimit: system.cfg.LogQueryLimit,
rangeLimit: system.cfg.RangeLimit,
maxPendingItems: system.cfg.MaxPendingItems,
}
go api.timeoutLoop(system.cfg.Timeout)
@ -161,11 +164,21 @@ func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID {
for {
select {
case pTx := <-pendingTxs:
var drop bool
api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID]; found {
f.txs = append(f.txs, pTx...)
// Cap the queue: uninstall filter when the limit is exceeded
// to prevent unbounded memory growth if the client polls infrequently.
if max := api.maxPendingItems; max > 0 && len(f.txs) > max {
drop = true
}
}
api.filtersMu.Unlock()
if drop {
log.Debug("Pending tx filter queue overflow, uninstalling filter", "id", pendingTxSub.ID)
api.UninstallFilter(pendingTxSub.ID)
}
case <-pendingTxSub.Err():
api.filtersMu.Lock()
delete(api.filters, pendingTxSub.ID)
@ -238,11 +251,20 @@ func (api *FilterAPI) NewBlockFilter() rpc.ID {
for {
select {
case h := <-headers:
var drop bool
api.filtersMu.Lock()
if f, found := api.filters[headerSub.ID]; found {
f.hashes = append(f.hashes, h.Hash())
// Cap the queue: uninstall filter when the limit is exceeded.
if max := api.maxPendingItems; max > 0 && len(f.hashes) > max {
drop = true
}
}
api.filtersMu.Unlock()
if drop {
log.Debug("Block filter queue overflow, uninstalling filter", "id", headerSub.ID)
api.UninstallFilter(headerSub.ID)
}
case <-headerSub.Err():
api.filtersMu.Lock()
delete(api.filters, headerSub.ID)
@ -426,11 +448,21 @@ func (api *FilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
for {
select {
case l := <-logs:
var drop bool
api.filtersMu.Lock()
if f, found := api.filters[logsSub.ID]; found {
f.logs = append(f.logs, l...)
// Cap the queue: uninstall filter when the limit is exceeded
// to prevent unbounded memory growth if the client polls infrequently.
if max := api.maxPendingItems; max > 0 && len(f.logs) > max {
drop = true
}
}
api.filtersMu.Unlock()
if drop {
log.Debug("Log filter queue overflow, uninstalling filter", "id", logsSub.ID)
api.UninstallFilter(logsSub.ID)
}
case <-logsSub.Err():
api.filtersMu.Lock()
delete(api.filters, logsSub.ID)

View file

@ -41,10 +41,13 @@ import (
// Config represents the configuration of the filter system.
type Config struct {
LogCacheSize int // maximum number of cached blocks (default: 32)
Timeout time.Duration // how long filters stay active (default: 5min)
LogQueryLimit int // maximum number of addresses allowed in filter criteria (default: 1000)
RangeLimit uint64 // maximum block range allowed in filter criteria (default: 0)
LogCacheSize int // maximum number of cached blocks (default: 32)
Timeout time.Duration // how long filters stay active (default: 5min)
LogQueryLimit int // maximum number of addresses allowed in filter criteria (default: 1000)
RangeLimit uint64 // maximum block range allowed in filter criteria (default: 0)
MaxPendingItems int // maximum number of items buffered in a polling filter queue (default: 10000)
// When the limit is reached, the oldest items are dropped to make room for new ones.
// This prevents unbounded memory growth when clients poll eth_getFilterChanges infrequently.
}
func (cfg Config) withDefaults() Config {
@ -54,6 +57,9 @@ func (cfg Config) withDefaults() Config {
if cfg.LogCacheSize == 0 {
cfg.LogCacheSize = 32
}
if cfg.MaxPendingItems == 0 {
cfg.MaxPendingItems = 10000
}
return cfg
}

View file

@ -922,3 +922,150 @@ func TestTransactionReceiptsSubscription(t *testing.T) {
})
}
}
// TestPendingTxFilterQueueCap verifies that the pending transaction polling filter
// uninstalls the filter when the queue exceeds MaxPendingItems, preventing
// unbounded memory growth when the client polls infrequently.
func TestPendingTxFilterQueueCap(t *testing.T) {
t.Parallel()
const maxItems = 5
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(db, Config{MaxPendingItems: maxItems})
api = NewFilterAPI(sys)
)
fid := api.NewPendingTransactionFilter(nil)
// Send 10 transactions in two batches — more than MaxPendingItems
batch1 := make([]*types.Transaction, 8)
for i := range batch1 {
batch1[i] = types.NewTransaction(uint64(i), common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil)
}
batch2 := make([]*types.Transaction, 2)
for i := range batch2 {
batch2[i] = types.NewTransaction(uint64(len(batch1)+i), common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil)
}
time.Sleep(50 * time.Millisecond) // give subscription time to install
backend.txFeed.Send(core.NewTxsEvent{Txs: batch1})
backend.txFeed.Send(core.NewTxsEvent{Txs: batch2})
// Poll until we get an error or timeout
timeout := time.Now().Add(2 * time.Second)
var gotErr error
for time.Now().Before(timeout) {
_, err := api.GetFilterChanges(fid)
if err != nil {
gotErr = err
break
}
time.Sleep(10 * time.Millisecond)
}
if gotErr == nil {
t.Error("expected filter to be uninstalled and return an error, but got none")
} else if !errors.Is(gotErr, errFilterNotFound) {
t.Errorf("expected errFilterNotFound, got %v", gotErr)
}
}
// TestBlockFilterQueueCap verifies that the block polling filter
// uninstalls the filter when the queue exceeds MaxPendingItems.
func TestBlockFilterQueueCap(t *testing.T) {
t.Parallel()
const maxItems = 3
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(db, Config{MaxPendingItems: maxItems})
api = NewFilterAPI(sys)
genesis = &core.Genesis{
Config: params.TestChainConfig,
BaseFee: big.NewInt(params.InitialBaseFee),
}
_, chain, _ = core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 7, func(i int, gen *core.BlockGen) {})
)
fid := api.NewBlockFilter()
time.Sleep(50 * time.Millisecond) // give subscription time to install
// Send 7 block headers — more than maxItems
for _, blk := range chain {
backend.chainFeed.Send(core.ChainEvent{Header: blk.Header()})
}
// Poll until we get an error or timeout
timeout := time.Now().Add(2 * time.Second)
var gotErr error
for time.Now().Before(timeout) {
_, err := api.GetFilterChanges(fid)
if err != nil {
gotErr = err
break
}
time.Sleep(10 * time.Millisecond)
}
if gotErr == nil {
t.Error("expected filter to be uninstalled and return an error, but got none")
} else if !errors.Is(gotErr, errFilterNotFound) {
t.Errorf("expected errFilterNotFound, got %v", gotErr)
}
}
// TestLogFilterQueueCap verifies that the log polling filter
// uninstalls the filter when the queue exceeds MaxPendingItems.
func TestLogFilterQueueCap(t *testing.T) {
t.Parallel()
const maxItems = 3
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(db, Config{MaxPendingItems: maxItems})
api = NewFilterAPI(sys)
addr = common.HexToAddress("0x1111111111111111111111111111111111111111")
// 7 logs — more than maxItems
allLogs = []*types.Log{
{Address: addr, BlockNumber: 1},
{Address: addr, BlockNumber: 2},
{Address: addr, BlockNumber: 3},
{Address: addr, BlockNumber: 4},
{Address: addr, BlockNumber: 5},
{Address: addr, BlockNumber: 6},
{Address: addr, BlockNumber: 7},
}
)
fid, err := api.NewFilter(FilterCriteria{})
if err != nil {
t.Fatalf("NewFilter error: %v", err)
}
time.Sleep(50 * time.Millisecond) // give subscription time to install
backend.logsFeed.Send(allLogs)
// Poll until we get an error or timeout
timeout := time.Now().Add(2 * time.Second)
var gotErr error
for time.Now().Before(timeout) {
_, err := api.GetFilterChanges(fid)
if err != nil {
gotErr = err
break
}
time.Sleep(10 * time.Millisecond)
}
if gotErr == nil {
t.Error("expected filter to be uninstalled and return an error, but got none")
} else if !errors.Is(gotErr, errFilterNotFound) {
t.Errorf("expected errFilterNotFound, got %v", gotErr)
}
}