diff --git a/eth/filters/api.go b/eth/filters/api.go index 2cb72dc114..eaf5509af3 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -554,48 +554,53 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo // (pending)Log filters return []Log. func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { api.filtersMu.Lock() - defer api.filtersMu.Unlock() - - chainConfig := api.sys.backend.ChainConfig() - latest := api.sys.backend.CurrentHeader() - - if f, found := api.filters[id]; found { - if !f.deadline.Stop() { - // timer expired but filter is not yet removed in timeout loop - // receive timer value and reset timer - <-f.deadline.C - } - f.deadline.Reset(api.timeout) - - switch f.typ { - case BlocksSubscription: - hashes := f.hashes - f.hashes = nil - return returnHashes(hashes), nil - case PendingTransactionsSubscription: - if f.fullTx { - txs := make([]*ethapi.RPCTransaction, 0, len(f.txs)) - for _, tx := range f.txs { - txs = append(txs, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig)) - } - f.txs = nil - return txs, nil - } else { - hashes := make([]common.Hash, 0, len(f.txs)) - for _, tx := range f.txs { - hashes = append(hashes, tx.Hash()) - } - f.txs = nil - return hashes, nil - } - case LogsSubscription: - logs := f.logs - f.logs = nil - return returnLogs(logs), nil - } + f, found := api.filters[id] + if !found { + api.filtersMu.Unlock() + return []interface{}{}, errFilterNotFound } + if !f.deadline.Stop() { + // timer expired but filter is not yet removed in timeout loop + // receive timer value and reset timer + <-f.deadline.C + } + f.deadline.Reset(api.timeout) - return []interface{}{}, errFilterNotFound + // Snapshot the accumulated state and nil the slices so that new + // events keep collecting while we format the response outside the lock. + typ := f.typ + fullTx := f.fullTx + hashes := f.hashes + txs := f.txs + logs := f.logs + f.hashes = nil + f.txs = nil + f.logs = nil + api.filtersMu.Unlock() + + switch typ { + case BlocksSubscription: + return returnHashes(hashes), nil + case PendingTransactionsSubscription: + if fullTx { + chainConfig := api.sys.backend.ChainConfig() + latest := api.sys.backend.CurrentHeader() + result := make([]*ethapi.RPCTransaction, 0, len(txs)) + for _, tx := range txs { + result = append(result, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig)) + } + return result, nil + } + result := make([]common.Hash, 0, len(txs)) + for _, tx := range txs { + result = append(result, tx.Hash()) + } + return result, nil + case LogsSubscription: + return returnLogs(logs), nil + default: + return []interface{}{}, errFilterNotFound + } } // returnHashes is a helper that will return an empty hash array case the given hash array is nil, diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go index 822bc826f6..735538c872 100644 --- a/eth/filters/api_test.go +++ b/eth/filters/api_test.go @@ -19,9 +19,17 @@ package filters import ( "encoding/json" "fmt" + "math/big" "testing" + "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/internal/ethapi" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" ) @@ -183,3 +191,181 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { t.Fatalf("expected 0 topics, got %d topics", len(test7.Topics[2])) } } + +// pollFilterChanges calls GetFilterChanges in a retry loop until a non-empty +// result is returned or the timeout expires. +func pollFilterChanges(t *testing.T, api *FilterAPI, id rpc.ID, timeout time.Duration) interface{} { + t.Helper() + deadline := time.Now().Add(timeout) + for { + changes, err := api.GetFilterChanges(id) + if err != nil { + t.Fatalf("GetFilterChanges: %v", err) + } + switch v := changes.(type) { + case []*types.Log: + if len(v) > 0 { + return changes + } + case []common.Hash: + if len(v) > 0 { + return changes + } + case []*ethapi.RPCTransaction: + if len(v) > 0 { + return changes + } + } + if time.Now().After(deadline) { + t.Fatal("timed out waiting for filter results") + } + time.Sleep(10 * time.Millisecond) + } +} + +func TestGetFilterChangesDrainLogs(t *testing.T) { + t.Parallel() + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(db, Config{}) + api = NewFilterAPI(sys) + ) + id, err := api.NewFilter(FilterCriteria{}) + if err != nil { + t.Fatalf("NewFilter: %v", err) + } + + want := &types.Log{ + Address: common.HexToAddress("0x1111111111111111111111111111111111111111"), + Topics: []common.Hash{common.HexToHash("0x01")}, + BlockNumber: 7, + } + if n := backend.logsFeed.Send([]*types.Log{want}); n == 0 { + t.Fatal("log event not delivered") + } + + first := pollFilterChanges(t, api, id, time.Second).([]*types.Log) + if len(first) != 1 { + t.Fatalf("expected 1 log, got %d", len(first)) + } + if first[0].Address != want.Address || first[0].BlockNumber != want.BlockNumber { + t.Fatalf("unexpected log: address=%s block=%d", first[0].Address, first[0].BlockNumber) + } + + // Second poll must be empty (drain semantics). + second, err := api.GetFilterChanges(id) + if err != nil { + t.Fatalf("GetFilterChanges: %v", err) + } + if logs := second.([]*types.Log); len(logs) != 0 { + t.Fatalf("expected empty second poll, got %d logs", len(logs)) + } +} + +func TestGetFilterChangesDrainBlocks(t *testing.T) { + t.Parallel() + + genesis := &core.Genesis{ + Config: params.TestChainConfig, + BaseFee: big.NewInt(params.InitialBaseFee), + } + db, chain, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 2, func(int, *core.BlockGen) {}) + blockchain, err := core.NewBlockChain(db, genesis, ethash.NewFaker(), nil) + if err != nil { + t.Fatalf("NewBlockChain: %v", err) + } + if n, err := blockchain.InsertChain(chain[:1]); err != nil { + t.Fatalf("InsertChain block %d: %v", n, err) + } + + backend, sys := newTestFilterSystem(db, Config{}) + api := NewFilterAPI(sys) + id := api.NewBlockFilter() + + if n := backend.chainFeed.Send(core.ChainEvent{Header: chain[1].Header()}); n == 0 { + t.Fatal("chain event not delivered") + } + + hashes := pollFilterChanges(t, api, id, time.Second).([]common.Hash) + if len(hashes) != 1 { + t.Fatalf("expected 1 block hash, got %d", len(hashes)) + } + if hashes[0] != chain[1].Hash() { + t.Fatalf("expected hash %x, got %x", chain[1].Hash(), hashes[0]) + } + + // Second poll must be empty. + second, err := api.GetFilterChanges(id) + if err != nil { + t.Fatalf("GetFilterChanges: %v", err) + } + if h := second.([]common.Hash); len(h) != 0 { + t.Fatalf("expected empty second poll, got %d hashes", len(h)) + } +} + +func TestGetFilterChangesDrainPendingTxFullTx(t *testing.T) { + t.Parallel() + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(db, Config{}) + api = NewFilterAPI(sys) + ) + + fullTx := true + id := api.NewPendingTransactionFilter(&fullTx) + + tx := types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil) + backend.txFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx}}) + + first := pollFilterChanges(t, api, id, time.Second).([]*ethapi.RPCTransaction) + if len(first) != 1 { + t.Fatalf("expected 1 pending tx, got %d", len(first)) + } + if first[0].Hash != tx.Hash() { + t.Fatalf("expected tx hash %x, got %x", tx.Hash(), first[0].Hash) + } + + // Second poll must be empty. + second, err := api.GetFilterChanges(id) + if err != nil { + t.Fatalf("GetFilterChanges: %v", err) + } + if txs := second.([]*ethapi.RPCTransaction); len(txs) != 0 { + t.Fatalf("expected empty second poll, got %d txs", len(txs)) + } +} + +func TestGetFilterChangesDrainPendingTxHashOnly(t *testing.T) { + t.Parallel() + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(db, Config{}) + api = NewFilterAPI(sys) + ) + + id := api.NewPendingTransactionFilter(nil) + + tx := types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil) + backend.txFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx}}) + + first := pollFilterChanges(t, api, id, time.Second).([]common.Hash) + if len(first) != 1 { + t.Fatalf("expected 1 tx hash, got %d", len(first)) + } + if first[0] != tx.Hash() { + t.Fatalf("expected hash %x, got %x", tx.Hash(), first[0]) + } + + // Second poll must be empty. + second, err := api.GetFilterChanges(id) + if err != nil { + t.Fatalf("GetFilterChanges: %v", err) + } + if h := second.([]common.Hash); len(h) != 0 { + t.Fatalf("expected empty second poll, got %d hashes", len(h)) + } +}