mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-24 08:49:29 +00:00
Merge bfa97d5579 into 12eabbd76d
This commit is contained in:
commit
b6bfcd99fc
2 changed files with 231 additions and 40 deletions
|
|
@ -557,48 +557,53 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
|
||||||
// (pending)Log filters return []Log.
|
// (pending)Log filters return []Log.
|
||||||
func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
|
func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
|
||||||
api.filtersMu.Lock()
|
api.filtersMu.Lock()
|
||||||
defer api.filtersMu.Unlock()
|
f, found := api.filters[id]
|
||||||
|
if !found {
|
||||||
chainConfig := api.sys.backend.ChainConfig()
|
api.filtersMu.Unlock()
|
||||||
latest := api.sys.backend.CurrentHeader()
|
return []interface{}{}, errFilterNotFound
|
||||||
|
|
||||||
if f, found := api.filters[id]; found {
|
|
||||||
if !f.deadline.Stop() {
|
|
||||||
// timer expired but filter is not yet removed in timeout loop
|
|
||||||
// receive timer value and reset timer
|
|
||||||
<-f.deadline.C
|
|
||||||
}
|
|
||||||
f.deadline.Reset(api.timeout)
|
|
||||||
|
|
||||||
switch f.typ {
|
|
||||||
case BlocksSubscription:
|
|
||||||
hashes := f.hashes
|
|
||||||
f.hashes = nil
|
|
||||||
return returnHashes(hashes), nil
|
|
||||||
case PendingTransactionsSubscription:
|
|
||||||
if f.fullTx {
|
|
||||||
txs := make([]*ethapi.RPCTransaction, 0, len(f.txs))
|
|
||||||
for _, tx := range f.txs {
|
|
||||||
txs = append(txs, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig))
|
|
||||||
}
|
|
||||||
f.txs = nil
|
|
||||||
return txs, nil
|
|
||||||
} else {
|
|
||||||
hashes := make([]common.Hash, 0, len(f.txs))
|
|
||||||
for _, tx := range f.txs {
|
|
||||||
hashes = append(hashes, tx.Hash())
|
|
||||||
}
|
|
||||||
f.txs = nil
|
|
||||||
return hashes, nil
|
|
||||||
}
|
|
||||||
case LogsSubscription:
|
|
||||||
logs := f.logs
|
|
||||||
f.logs = nil
|
|
||||||
return returnLogs(logs), nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if !f.deadline.Stop() {
|
||||||
|
// timer expired but filter is not yet removed in timeout loop
|
||||||
|
// receive timer value and reset timer
|
||||||
|
<-f.deadline.C
|
||||||
|
}
|
||||||
|
f.deadline.Reset(api.timeout)
|
||||||
|
|
||||||
return []interface{}{}, errFilterNotFound
|
// Snapshot the accumulated state and nil the slices so that new
|
||||||
|
// events keep collecting while we format the response outside the lock.
|
||||||
|
typ := f.typ
|
||||||
|
fullTx := f.fullTx
|
||||||
|
hashes := f.hashes
|
||||||
|
txs := f.txs
|
||||||
|
logs := f.logs
|
||||||
|
f.hashes = nil
|
||||||
|
f.txs = nil
|
||||||
|
f.logs = nil
|
||||||
|
api.filtersMu.Unlock()
|
||||||
|
|
||||||
|
switch typ {
|
||||||
|
case BlocksSubscription:
|
||||||
|
return returnHashes(hashes), nil
|
||||||
|
case PendingTransactionsSubscription:
|
||||||
|
if fullTx {
|
||||||
|
chainConfig := api.sys.backend.ChainConfig()
|
||||||
|
latest := api.sys.backend.CurrentHeader()
|
||||||
|
result := make([]*ethapi.RPCTransaction, 0, len(txs))
|
||||||
|
for _, tx := range txs {
|
||||||
|
result = append(result, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig))
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
result := make([]common.Hash, 0, len(txs))
|
||||||
|
for _, tx := range txs {
|
||||||
|
result = append(result, tx.Hash())
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
case LogsSubscription:
|
||||||
|
return returnLogs(logs), nil
|
||||||
|
default:
|
||||||
|
return []interface{}{}, errFilterNotFound
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
|
// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
|
||||||
|
|
|
||||||
|
|
@ -19,9 +19,17 @@ package filters
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/big"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/consensus/ethash"
|
||||||
|
"github.com/ethereum/go-ethereum/core"
|
||||||
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/internal/ethapi"
|
||||||
|
"github.com/ethereum/go-ethereum/params"
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -183,3 +191,181 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
|
||||||
t.Fatalf("expected 0 topics, got %d topics", len(test7.Topics[2]))
|
t.Fatalf("expected 0 topics, got %d topics", len(test7.Topics[2]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pollFilterChanges calls GetFilterChanges in a retry loop until a non-empty
|
||||||
|
// result is returned or the timeout expires.
|
||||||
|
func pollFilterChanges(t *testing.T, api *FilterAPI, id rpc.ID, timeout time.Duration) interface{} {
|
||||||
|
t.Helper()
|
||||||
|
deadline := time.Now().Add(timeout)
|
||||||
|
for {
|
||||||
|
changes, err := api.GetFilterChanges(id)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetFilterChanges: %v", err)
|
||||||
|
}
|
||||||
|
switch v := changes.(type) {
|
||||||
|
case []*types.Log:
|
||||||
|
if len(v) > 0 {
|
||||||
|
return changes
|
||||||
|
}
|
||||||
|
case []common.Hash:
|
||||||
|
if len(v) > 0 {
|
||||||
|
return changes
|
||||||
|
}
|
||||||
|
case []*ethapi.RPCTransaction:
|
||||||
|
if len(v) > 0 {
|
||||||
|
return changes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
t.Fatal("timed out waiting for filter results")
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetFilterChangesDrainLogs(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
var (
|
||||||
|
db = rawdb.NewMemoryDatabase()
|
||||||
|
backend, sys = newTestFilterSystem(db, Config{})
|
||||||
|
api = NewFilterAPI(sys)
|
||||||
|
)
|
||||||
|
id, err := api.NewFilter(FilterCriteria{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewFilter: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
want := &types.Log{
|
||||||
|
Address: common.HexToAddress("0x1111111111111111111111111111111111111111"),
|
||||||
|
Topics: []common.Hash{common.HexToHash("0x01")},
|
||||||
|
BlockNumber: 7,
|
||||||
|
}
|
||||||
|
if n := backend.logsFeed.Send([]*types.Log{want}); n == 0 {
|
||||||
|
t.Fatal("log event not delivered")
|
||||||
|
}
|
||||||
|
|
||||||
|
first := pollFilterChanges(t, api, id, time.Second).([]*types.Log)
|
||||||
|
if len(first) != 1 {
|
||||||
|
t.Fatalf("expected 1 log, got %d", len(first))
|
||||||
|
}
|
||||||
|
if first[0].Address != want.Address || first[0].BlockNumber != want.BlockNumber {
|
||||||
|
t.Fatalf("unexpected log: address=%s block=%d", first[0].Address, first[0].BlockNumber)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second poll must be empty (drain semantics).
|
||||||
|
second, err := api.GetFilterChanges(id)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetFilterChanges: %v", err)
|
||||||
|
}
|
||||||
|
if logs := second.([]*types.Log); len(logs) != 0 {
|
||||||
|
t.Fatalf("expected empty second poll, got %d logs", len(logs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetFilterChangesDrainBlocks(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
genesis := &core.Genesis{
|
||||||
|
Config: params.TestChainConfig,
|
||||||
|
BaseFee: big.NewInt(params.InitialBaseFee),
|
||||||
|
}
|
||||||
|
db, chain, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 2, func(int, *core.BlockGen) {})
|
||||||
|
blockchain, err := core.NewBlockChain(db, genesis, ethash.NewFaker(), nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewBlockChain: %v", err)
|
||||||
|
}
|
||||||
|
if n, err := blockchain.InsertChain(chain[:1]); err != nil {
|
||||||
|
t.Fatalf("InsertChain block %d: %v", n, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
backend, sys := newTestFilterSystem(db, Config{})
|
||||||
|
api := NewFilterAPI(sys)
|
||||||
|
id := api.NewBlockFilter()
|
||||||
|
|
||||||
|
if n := backend.chainFeed.Send(core.ChainEvent{Header: chain[1].Header()}); n == 0 {
|
||||||
|
t.Fatal("chain event not delivered")
|
||||||
|
}
|
||||||
|
|
||||||
|
hashes := pollFilterChanges(t, api, id, time.Second).([]common.Hash)
|
||||||
|
if len(hashes) != 1 {
|
||||||
|
t.Fatalf("expected 1 block hash, got %d", len(hashes))
|
||||||
|
}
|
||||||
|
if hashes[0] != chain[1].Hash() {
|
||||||
|
t.Fatalf("expected hash %x, got %x", chain[1].Hash(), hashes[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second poll must be empty.
|
||||||
|
second, err := api.GetFilterChanges(id)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetFilterChanges: %v", err)
|
||||||
|
}
|
||||||
|
if h := second.([]common.Hash); len(h) != 0 {
|
||||||
|
t.Fatalf("expected empty second poll, got %d hashes", len(h))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetFilterChangesDrainPendingTxFullTx(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
var (
|
||||||
|
db = rawdb.NewMemoryDatabase()
|
||||||
|
backend, sys = newTestFilterSystem(db, Config{})
|
||||||
|
api = NewFilterAPI(sys)
|
||||||
|
)
|
||||||
|
|
||||||
|
fullTx := true
|
||||||
|
id := api.NewPendingTransactionFilter(&fullTx)
|
||||||
|
|
||||||
|
tx := types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil)
|
||||||
|
backend.txFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx}})
|
||||||
|
|
||||||
|
first := pollFilterChanges(t, api, id, time.Second).([]*ethapi.RPCTransaction)
|
||||||
|
if len(first) != 1 {
|
||||||
|
t.Fatalf("expected 1 pending tx, got %d", len(first))
|
||||||
|
}
|
||||||
|
if first[0].Hash != tx.Hash() {
|
||||||
|
t.Fatalf("expected tx hash %x, got %x", tx.Hash(), first[0].Hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second poll must be empty.
|
||||||
|
second, err := api.GetFilterChanges(id)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetFilterChanges: %v", err)
|
||||||
|
}
|
||||||
|
if txs := second.([]*ethapi.RPCTransaction); len(txs) != 0 {
|
||||||
|
t.Fatalf("expected empty second poll, got %d txs", len(txs))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetFilterChangesDrainPendingTxHashOnly(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
var (
|
||||||
|
db = rawdb.NewMemoryDatabase()
|
||||||
|
backend, sys = newTestFilterSystem(db, Config{})
|
||||||
|
api = NewFilterAPI(sys)
|
||||||
|
)
|
||||||
|
|
||||||
|
id := api.NewPendingTransactionFilter(nil)
|
||||||
|
|
||||||
|
tx := types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil)
|
||||||
|
backend.txFeed.Send(core.NewTxsEvent{Txs: []*types.Transaction{tx}})
|
||||||
|
|
||||||
|
first := pollFilterChanges(t, api, id, time.Second).([]common.Hash)
|
||||||
|
if len(first) != 1 {
|
||||||
|
t.Fatalf("expected 1 tx hash, got %d", len(first))
|
||||||
|
}
|
||||||
|
if first[0] != tx.Hash() {
|
||||||
|
t.Fatalf("expected hash %x, got %x", tx.Hash(), first[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second poll must be empty.
|
||||||
|
second, err := api.GetFilterChanges(id)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetFilterChanges: %v", err)
|
||||||
|
}
|
||||||
|
if h := second.([]common.Hash); len(h) != 0 {
|
||||||
|
t.Fatalf("expected empty second poll, got %d hashes", len(h))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue