From ac818c7460e95bbb4f368d0ebb75bb3597b528cf Mon Sep 17 00:00:00 2001 From: Mark Liu Date: Sat, 7 Mar 2026 07:35:40 +0000 Subject: [PATCH] eth/filters: reduce lock scope in GetFilterChanges --- eth/filters/api.go | 73 ++++++++++--------- eth/filters/api_test.go | 156 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 194 insertions(+), 35 deletions(-) diff --git a/eth/filters/api.go b/eth/filters/api.go index f4bed35b26..c3d2c8d71f 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -550,45 +550,48 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo // (pending)Log filters return []Log. func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { api.filtersMu.Lock() - defer api.filtersMu.Unlock() + f, found := api.filters[id] + if !found { + api.filtersMu.Unlock() + return []interface{}{}, errFilterNotFound + } + if !f.deadline.Stop() { + // timer expired but filter is not yet removed in timeout loop + // receive timer value and reset timer + <-f.deadline.C + } + f.deadline.Reset(api.timeout) - chainConfig := api.sys.backend.ChainConfig() - latest := api.sys.backend.CurrentHeader() + typ := f.typ + fullTx := f.fullTx + hashes := f.hashes + txs := f.txs + logs := f.logs + f.hashes = nil + f.txs = nil + f.logs = nil + api.filtersMu.Unlock() - if f, found := api.filters[id]; found { - if !f.deadline.Stop() { - // timer expired but filter is not yet removed in timeout loop - // receive timer value and reset timer - <-f.deadline.C - } - f.deadline.Reset(api.timeout) - - switch f.typ { - case BlocksSubscription: - hashes := f.hashes - f.hashes = nil - return returnHashes(hashes), nil - case PendingTransactionsSubscription: - if f.fullTx { - txs := make([]*ethapi.RPCTransaction, 0, len(f.txs)) - for _, tx := range f.txs { - txs = append(txs, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig)) - } - f.txs = nil - return txs, nil - } else { - hashes := make([]common.Hash, 0, len(f.txs)) - for _, tx := range f.txs { - hashes = append(hashes, tx.Hash()) - } - f.txs = nil - return hashes, nil + switch typ { + case BlocksSubscription: + return returnHashes(hashes), nil + case PendingTransactionsSubscription: + if fullTx { + chainConfig := api.sys.backend.ChainConfig() + latest := api.sys.backend.CurrentHeader() + result := make([]*ethapi.RPCTransaction, 0, len(txs)) + for _, tx := range txs { + result = append(result, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig)) } - case LogsSubscription: - logs := f.logs - f.logs = nil - return returnLogs(logs), nil + return result, nil } + hashes := make([]common.Hash, 0, len(txs)) + for _, tx := range txs { + hashes = append(hashes, tx.Hash()) + } + return hashes, nil + case LogsSubscription: + return returnLogs(logs), nil } return []interface{}{}, errFilterNotFound diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go index 822bc826f6..ff8ae6aff1 100644 --- a/eth/filters/api_test.go +++ b/eth/filters/api_test.go @@ -19,9 +19,17 @@ package filters import ( "encoding/json" "fmt" + "math/big" "testing" + "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/internal/ethapi" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" ) @@ -183,3 +191,151 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { t.Fatalf("expected 0 topics, got %d topics", len(test7.Topics[2])) } } + +func TestGetFilterChangesDrainSemanticsLogs(t *testing.T) { + t.Parallel() + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(db, Config{}) + api = NewFilterAPI(sys) + ) + id, err := api.NewFilter(FilterCriteria{}) + if err != nil { + t.Fatalf("failed to create filter: %v", err) + } + + want := &types.Log{ + Address: common.HexToAddress("0x1000000000000000000000000000000000000001"), + Topics: []common.Hash{common.HexToHash("0x01")}, + BlockNumber: 7, + } + if nsend := backend.logsFeed.Send([]*types.Log{want}); nsend == 0 { + t.Fatal("logs event not delivered") + } + + var first []*types.Log + timeout := time.Now().Add(time.Second) + for { + changes, err := api.GetFilterChanges(id) + if err != nil { + t.Fatalf("failed to fetch filter changes: %v", err) + } + first = changes.([]*types.Log) + if len(first) > 0 || time.Now().After(timeout) { + break + } + time.Sleep(10 * time.Millisecond) + } + if len(first) != 1 { + t.Fatalf("expected first poll to return 1 log, got %d", len(first)) + } + if first[0].Address != want.Address || first[0].BlockNumber != want.BlockNumber { + t.Fatalf("unexpected log returned: got address=%s block=%d", first[0].Address.Hex(), first[0].BlockNumber) + } + + changes, err := api.GetFilterChanges(id) + if err != nil { + t.Fatalf("failed to fetch drained changes: %v", err) + } + second := changes.([]*types.Log) + if len(second) != 0 { + t.Fatalf("expected second poll to be empty, got %d logs", len(second)) + } +} + +func TestGetFilterChangesHeadBoundaryIncludesHeadBlock(t *testing.T) { + t.Parallel() + + genesis := &core.Genesis{ + Config: params.TestChainConfig, + BaseFee: big.NewInt(params.InitialBaseFee), + } + db, chain, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 2, func(i int, gen *core.BlockGen) {}) + blockchain, err := core.NewBlockChain(db, genesis, ethash.NewFaker(), nil) + if err != nil { + t.Fatalf("failed to create blockchain: %v", err) + } + if n, err := blockchain.InsertChain(chain[:1]); err != nil { + t.Fatalf("failed to insert block %d: %v", n, err) + } + + backend, sys := newTestFilterSystem(db, Config{}) + api := NewFilterAPI(sys) + id := api.NewBlockFilter() + + head := backend.CurrentHeader() + if head.Number.Uint64() != chain[0].NumberU64() { + t.Fatalf("expected filter creation head %d, got %d", chain[0].NumberU64(), head.Number.Uint64()) + } + if nsend := backend.chainFeed.Send(core.ChainEvent{Header: chain[1].Header()}); nsend == 0 { + t.Fatal("chain event not delivered") + } + + var hashes []common.Hash + timeout := time.Now().Add(time.Second) + for { + changes, err := api.GetFilterChanges(id) + if err != nil { + t.Fatalf("failed to fetch filter changes: %v", err) + } + hashes = changes.([]common.Hash) + if len(hashes) > 0 || time.Now().After(timeout) { + break + } + time.Sleep(10 * time.Millisecond) + } + if len(hashes) != 1 { + t.Fatalf("expected 1 new head hash, got %d", len(hashes)) + } + if hashes[0] != chain[1].Hash() { + t.Fatalf("expected head hash %x, got %x", chain[1].Hash(), hashes[0]) + } +} + +func TestGetFilterChangesDrainSemanticsPendingTx(t *testing.T) { + t.Parallel() + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(db, Config{}) + api = NewFilterAPI(sys) + ) + + fullTx := true + id := api.NewPendingTransactionFilter(&fullTx) + + tx := types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil) + backend.txFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx}}) + + // Poll until the tx arrives. + var first []*ethapi.RPCTransaction + timeout := time.Now().Add(time.Second) + for { + changes, err := api.GetFilterChanges(id) + if err != nil { + t.Fatalf("failed to fetch filter changes: %v", err) + } + first = changes.([]*ethapi.RPCTransaction) + if len(first) > 0 || time.Now().After(timeout) { + break + } + time.Sleep(10 * time.Millisecond) + } + if len(first) != 1 { + t.Fatalf("expected 1 pending tx, got %d", len(first)) + } + if first[0].Hash != tx.Hash() { + t.Fatalf("expected tx hash %x, got %x", tx.Hash(), first[0].Hash) + } + + // Second poll must be empty (drain semantics). + changes, err := api.GetFilterChanges(id) + if err != nil { + t.Fatalf("failed to fetch drained changes: %v", err) + } + second := changes.([]*ethapi.RPCTransaction) + if len(second) != 0 { + t.Fatalf("expected second poll to be empty, got %d txs", len(second)) + } +}