eth/filters: reduce lock scope in GetFilterChanges

This commit is contained in:
Mark Liu 2026-03-07 07:35:40 +00:00
parent 0d043d071e
commit ac818c7460
2 changed files with 194 additions and 35 deletions

View file

@ -550,45 +550,48 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
// (pending)Log filters return []Log.
func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
f, found := api.filters[id]
if !found {
api.filtersMu.Unlock()
return []interface{}{}, errFilterNotFound
}
if !f.deadline.Stop() {
// timer expired but filter is not yet removed in timeout loop
// receive timer value and reset timer
<-f.deadline.C
}
f.deadline.Reset(api.timeout)
chainConfig := api.sys.backend.ChainConfig()
latest := api.sys.backend.CurrentHeader()
typ := f.typ
fullTx := f.fullTx
hashes := f.hashes
txs := f.txs
logs := f.logs
f.hashes = nil
f.txs = nil
f.logs = nil
api.filtersMu.Unlock()
if f, found := api.filters[id]; found {
if !f.deadline.Stop() {
// timer expired but filter is not yet removed in timeout loop
// receive timer value and reset timer
<-f.deadline.C
}
f.deadline.Reset(api.timeout)
switch f.typ {
case BlocksSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
case PendingTransactionsSubscription:
if f.fullTx {
txs := make([]*ethapi.RPCTransaction, 0, len(f.txs))
for _, tx := range f.txs {
txs = append(txs, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig))
}
f.txs = nil
return txs, nil
} else {
hashes := make([]common.Hash, 0, len(f.txs))
for _, tx := range f.txs {
hashes = append(hashes, tx.Hash())
}
f.txs = nil
return hashes, nil
switch typ {
case BlocksSubscription:
return returnHashes(hashes), nil
case PendingTransactionsSubscription:
if fullTx {
chainConfig := api.sys.backend.ChainConfig()
latest := api.sys.backend.CurrentHeader()
result := make([]*ethapi.RPCTransaction, 0, len(txs))
for _, tx := range txs {
result = append(result, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig))
}
case LogsSubscription:
logs := f.logs
f.logs = nil
return returnLogs(logs), nil
return result, nil
}
hashes := make([]common.Hash, 0, len(txs))
for _, tx := range txs {
hashes = append(hashes, tx.Hash())
}
return hashes, nil
case LogsSubscription:
return returnLogs(logs), nil
}
return []interface{}{}, errFilterNotFound

View file

@ -19,9 +19,17 @@ package filters
import (
"encoding/json"
"fmt"
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
)
@ -183,3 +191,151 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
t.Fatalf("expected 0 topics, got %d topics", len(test7.Topics[2]))
}
}
func TestGetFilterChangesDrainSemanticsLogs(t *testing.T) {
t.Parallel()
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(db, Config{})
api = NewFilterAPI(sys)
)
id, err := api.NewFilter(FilterCriteria{})
if err != nil {
t.Fatalf("failed to create filter: %v", err)
}
want := &types.Log{
Address: common.HexToAddress("0x1000000000000000000000000000000000000001"),
Topics: []common.Hash{common.HexToHash("0x01")},
BlockNumber: 7,
}
if nsend := backend.logsFeed.Send([]*types.Log{want}); nsend == 0 {
t.Fatal("logs event not delivered")
}
var first []*types.Log
timeout := time.Now().Add(time.Second)
for {
changes, err := api.GetFilterChanges(id)
if err != nil {
t.Fatalf("failed to fetch filter changes: %v", err)
}
first = changes.([]*types.Log)
if len(first) > 0 || time.Now().After(timeout) {
break
}
time.Sleep(10 * time.Millisecond)
}
if len(first) != 1 {
t.Fatalf("expected first poll to return 1 log, got %d", len(first))
}
if first[0].Address != want.Address || first[0].BlockNumber != want.BlockNumber {
t.Fatalf("unexpected log returned: got address=%s block=%d", first[0].Address.Hex(), first[0].BlockNumber)
}
changes, err := api.GetFilterChanges(id)
if err != nil {
t.Fatalf("failed to fetch drained changes: %v", err)
}
second := changes.([]*types.Log)
if len(second) != 0 {
t.Fatalf("expected second poll to be empty, got %d logs", len(second))
}
}
func TestGetFilterChangesHeadBoundaryIncludesHeadBlock(t *testing.T) {
t.Parallel()
genesis := &core.Genesis{
Config: params.TestChainConfig,
BaseFee: big.NewInt(params.InitialBaseFee),
}
db, chain, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 2, func(i int, gen *core.BlockGen) {})
blockchain, err := core.NewBlockChain(db, genesis, ethash.NewFaker(), nil)
if err != nil {
t.Fatalf("failed to create blockchain: %v", err)
}
if n, err := blockchain.InsertChain(chain[:1]); err != nil {
t.Fatalf("failed to insert block %d: %v", n, err)
}
backend, sys := newTestFilterSystem(db, Config{})
api := NewFilterAPI(sys)
id := api.NewBlockFilter()
head := backend.CurrentHeader()
if head.Number.Uint64() != chain[0].NumberU64() {
t.Fatalf("expected filter creation head %d, got %d", chain[0].NumberU64(), head.Number.Uint64())
}
if nsend := backend.chainFeed.Send(core.ChainEvent{Header: chain[1].Header()}); nsend == 0 {
t.Fatal("chain event not delivered")
}
var hashes []common.Hash
timeout := time.Now().Add(time.Second)
for {
changes, err := api.GetFilterChanges(id)
if err != nil {
t.Fatalf("failed to fetch filter changes: %v", err)
}
hashes = changes.([]common.Hash)
if len(hashes) > 0 || time.Now().After(timeout) {
break
}
time.Sleep(10 * time.Millisecond)
}
if len(hashes) != 1 {
t.Fatalf("expected 1 new head hash, got %d", len(hashes))
}
if hashes[0] != chain[1].Hash() {
t.Fatalf("expected head hash %x, got %x", chain[1].Hash(), hashes[0])
}
}
func TestGetFilterChangesDrainSemanticsPendingTx(t *testing.T) {
t.Parallel()
var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(db, Config{})
api = NewFilterAPI(sys)
)
fullTx := true
id := api.NewPendingTransactionFilter(&fullTx)
tx := types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil)
backend.txFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx}})
// Poll until the tx arrives.
var first []*ethapi.RPCTransaction
timeout := time.Now().Add(time.Second)
for {
changes, err := api.GetFilterChanges(id)
if err != nil {
t.Fatalf("failed to fetch filter changes: %v", err)
}
first = changes.([]*ethapi.RPCTransaction)
if len(first) > 0 || time.Now().After(timeout) {
break
}
time.Sleep(10 * time.Millisecond)
}
if len(first) != 1 {
t.Fatalf("expected 1 pending tx, got %d", len(first))
}
if first[0].Hash != tx.Hash() {
t.Fatalf("expected tx hash %x, got %x", tx.Hash(), first[0].Hash)
}
// Second poll must be empty (drain semantics).
changes, err := api.GetFilterChanges(id)
if err != nil {
t.Fatalf("failed to fetch drained changes: %v", err)
}
second := changes.([]*ethapi.RPCTransaction)
if len(second) != 0 {
t.Fatalf("expected second poll to be empty, got %d txs", len(second))
}
}