eth/filters: reduce lock scope in GetFilterChanges

GetFilterChanges held filtersMu for the entire call, including
pending-tx RPC formatting via NewRPCPendingTransaction. That blocks the
goroutines that append events to filters, and with unbuffered channels
the backpressure stalls event.Feed.Send — so polling clients see empty
results during high-throughput periods.

Copy the accumulated slices and nil the filter state under the lock,
then release it before doing any formatting. Same drain-on-read
semantics, just a shorter critical section.

- Move ChainConfig()/CurrentHeader() calls outside the lock — only the
  PendingTransactionsSubscription fullTx branch needs them
- Tests: drain semantics for logs, blocks, pending txs (fullTx and
  hash-only paths)

Related: #28838
Signed-off-by: Mark Liu <mark@prove.com.au>
This commit is contained in:
Mark Liu 2026-03-12 17:07:38 +11:00
parent 95b9a2ed77
commit bfa97d5579
2 changed files with 231 additions and 40 deletions

View file

@ -554,48 +554,53 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
// (pending)Log filters return []Log. // (pending)Log filters return []Log.
func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock() api.filtersMu.Lock()
defer api.filtersMu.Unlock() f, found := api.filters[id]
if !found {
chainConfig := api.sys.backend.ChainConfig() api.filtersMu.Unlock()
latest := api.sys.backend.CurrentHeader() return []interface{}{}, errFilterNotFound
if f, found := api.filters[id]; found {
if !f.deadline.Stop() {
// timer expired but filter is not yet removed in timeout loop
// receive timer value and reset timer
<-f.deadline.C
}
f.deadline.Reset(api.timeout)
switch f.typ {
case BlocksSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
case PendingTransactionsSubscription:
if f.fullTx {
txs := make([]*ethapi.RPCTransaction, 0, len(f.txs))
for _, tx := range f.txs {
txs = append(txs, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig))
}
f.txs = nil
return txs, nil
} else {
hashes := make([]common.Hash, 0, len(f.txs))
for _, tx := range f.txs {
hashes = append(hashes, tx.Hash())
}
f.txs = nil
return hashes, nil
}
case LogsSubscription:
logs := f.logs
f.logs = nil
return returnLogs(logs), nil
}
} }
if !f.deadline.Stop() {
// timer expired but filter is not yet removed in timeout loop
// receive timer value and reset timer
<-f.deadline.C
}
f.deadline.Reset(api.timeout)
return []interface{}{}, errFilterNotFound // Snapshot the accumulated state and nil the slices so that new
// events keep collecting while we format the response outside the lock.
typ := f.typ
fullTx := f.fullTx
hashes := f.hashes
txs := f.txs
logs := f.logs
f.hashes = nil
f.txs = nil
f.logs = nil
api.filtersMu.Unlock()
switch typ {
case BlocksSubscription:
return returnHashes(hashes), nil
case PendingTransactionsSubscription:
if fullTx {
chainConfig := api.sys.backend.ChainConfig()
latest := api.sys.backend.CurrentHeader()
result := make([]*ethapi.RPCTransaction, 0, len(txs))
for _, tx := range txs {
result = append(result, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig))
}
return result, nil
}
result := make([]common.Hash, 0, len(txs))
for _, tx := range txs {
result = append(result, tx.Hash())
}
return result, nil
case LogsSubscription:
return returnLogs(logs), nil
default:
return []interface{}{}, errFilterNotFound
}
} }
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, // returnHashes is a helper that will return an empty hash array case the given hash array is nil,

View file

@ -19,9 +19,17 @@ package filters
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"math/big"
"testing" "testing"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
@ -183,3 +191,181 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
t.Fatalf("expected 0 topics, got %d topics", len(test7.Topics[2])) t.Fatalf("expected 0 topics, got %d topics", len(test7.Topics[2]))
} }
} }
// pollFilterChanges calls GetFilterChanges in a retry loop until a non-empty
// result is returned or the timeout expires.
func pollFilterChanges(t *testing.T, api *FilterAPI, id rpc.ID, timeout time.Duration) interface{} {
t.Helper()
deadline := time.Now().Add(timeout)
for {
changes, err := api.GetFilterChanges(id)
if err != nil {
t.Fatalf("GetFilterChanges: %v", err)
}
switch v := changes.(type) {
case []*types.Log:
if len(v) > 0 {
return changes
}
case []common.Hash:
if len(v) > 0 {
return changes
}
case []*ethapi.RPCTransaction:
if len(v) > 0 {
return changes
}
}
if time.Now().After(deadline) {
t.Fatal("timed out waiting for filter results")
}
time.Sleep(10 * time.Millisecond)
}
}
func TestGetFilterChangesDrainLogs(t *testing.T) {
t.Parallel()
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(db, Config{})
api = NewFilterAPI(sys)
)
id, err := api.NewFilter(FilterCriteria{})
if err != nil {
t.Fatalf("NewFilter: %v", err)
}
want := &types.Log{
Address: common.HexToAddress("0x1111111111111111111111111111111111111111"),
Topics: []common.Hash{common.HexToHash("0x01")},
BlockNumber: 7,
}
if n := backend.logsFeed.Send([]*types.Log{want}); n == 0 {
t.Fatal("log event not delivered")
}
first := pollFilterChanges(t, api, id, time.Second).([]*types.Log)
if len(first) != 1 {
t.Fatalf("expected 1 log, got %d", len(first))
}
if first[0].Address != want.Address || first[0].BlockNumber != want.BlockNumber {
t.Fatalf("unexpected log: address=%s block=%d", first[0].Address, first[0].BlockNumber)
}
// Second poll must be empty (drain semantics).
second, err := api.GetFilterChanges(id)
if err != nil {
t.Fatalf("GetFilterChanges: %v", err)
}
if logs := second.([]*types.Log); len(logs) != 0 {
t.Fatalf("expected empty second poll, got %d logs", len(logs))
}
}
func TestGetFilterChangesDrainBlocks(t *testing.T) {
t.Parallel()
genesis := &core.Genesis{
Config: params.TestChainConfig,
BaseFee: big.NewInt(params.InitialBaseFee),
}
db, chain, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 2, func(int, *core.BlockGen) {})
blockchain, err := core.NewBlockChain(db, genesis, ethash.NewFaker(), nil)
if err != nil {
t.Fatalf("NewBlockChain: %v", err)
}
if n, err := blockchain.InsertChain(chain[:1]); err != nil {
t.Fatalf("InsertChain block %d: %v", n, err)
}
backend, sys := newTestFilterSystem(db, Config{})
api := NewFilterAPI(sys)
id := api.NewBlockFilter()
if n := backend.chainFeed.Send(core.ChainEvent{Header: chain[1].Header()}); n == 0 {
t.Fatal("chain event not delivered")
}
hashes := pollFilterChanges(t, api, id, time.Second).([]common.Hash)
if len(hashes) != 1 {
t.Fatalf("expected 1 block hash, got %d", len(hashes))
}
if hashes[0] != chain[1].Hash() {
t.Fatalf("expected hash %x, got %x", chain[1].Hash(), hashes[0])
}
// Second poll must be empty.
second, err := api.GetFilterChanges(id)
if err != nil {
t.Fatalf("GetFilterChanges: %v", err)
}
if h := second.([]common.Hash); len(h) != 0 {
t.Fatalf("expected empty second poll, got %d hashes", len(h))
}
}
func TestGetFilterChangesDrainPendingTxFullTx(t *testing.T) {
t.Parallel()
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(db, Config{})
api = NewFilterAPI(sys)
)
fullTx := true
id := api.NewPendingTransactionFilter(&fullTx)
tx := types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil)
backend.txFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx}})
first := pollFilterChanges(t, api, id, time.Second).([]*ethapi.RPCTransaction)
if len(first) != 1 {
t.Fatalf("expected 1 pending tx, got %d", len(first))
}
if first[0].Hash != tx.Hash() {
t.Fatalf("expected tx hash %x, got %x", tx.Hash(), first[0].Hash)
}
// Second poll must be empty.
second, err := api.GetFilterChanges(id)
if err != nil {
t.Fatalf("GetFilterChanges: %v", err)
}
if txs := second.([]*ethapi.RPCTransaction); len(txs) != 0 {
t.Fatalf("expected empty second poll, got %d txs", len(txs))
}
}
func TestGetFilterChangesDrainPendingTxHashOnly(t *testing.T) {
t.Parallel()
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(db, Config{})
api = NewFilterAPI(sys)
)
id := api.NewPendingTransactionFilter(nil)
tx := types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil)
backend.txFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx}})
first := pollFilterChanges(t, api, id, time.Second).([]common.Hash)
if len(first) != 1 {
t.Fatalf("expected 1 tx hash, got %d", len(first))
}
if first[0] != tx.Hash() {
t.Fatalf("expected hash %x, got %x", tx.Hash(), first[0])
}
// Second poll must be empty.
second, err := api.GetFilterChanges(id)
if err != nil {
t.Fatalf("GetFilterChanges: %v", err)
}
if h := second.([]common.Hash); len(h) != 0 {
t.Fatalf("expected empty second poll, got %d hashes", len(h))
}
}