eth/filters: retrieve logs in async #27135 (#1388)

This commit is contained in:
Daniel Liu 2025-08-29 05:29:38 +08:00 committed by GitHub
parent 031ea75eca
commit 59568b167f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 331 additions and 141 deletions

View file

@ -33,6 +33,8 @@ import (
)
var (
errInvalidTopic = errors.New("invalid topic(s)")
errFilterNotFound = errors.New("filter not found")
errExceedMaxTopics = errors.New("exceed max topics")
)
@ -384,7 +386,7 @@ func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Lo
api.filtersMu.Unlock()
if !found || f.typ != LogsSubscription {
return nil, errors.New("filter not found")
return nil, errFilterNotFound
}
var filter *Filter
@ -447,7 +449,7 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
}
}
return []interface{}{}, errors.New("filter not found")
return []interface{}{}, errFilterNotFound
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
@ -559,11 +561,11 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
}
args.Topics[i] = append(args.Topics[i], parsed)
} else {
return errors.New("invalid topic(s)")
return errInvalidTopic
}
}
default:
return errors.New("invalid topic(s)")
return errInvalidTopic
}
}
}

View file

@ -106,32 +106,31 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
return f.blockLogs(ctx, header)
}
// Short-cut if all we care about is pending logs
if f.begin == rpc.PendingBlockNumber.Int64() {
if f.end != rpc.PendingBlockNumber.Int64() {
return nil, errors.New("invalid block range")
}
return f.pendingLogs()
}
// Figure out the limits of the filter range
header, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if header == nil {
return nil, nil
}
var (
err error
head = header.Number.Int64()
pending = f.end == rpc.PendingBlockNumber.Int64()
beginPending = f.begin == rpc.PendingBlockNumber.Int64()
endPending = f.end == rpc.PendingBlockNumber.Int64()
)
// special case for pending logs
if beginPending && !endPending {
return nil, errors.New("invalid block range")
}
// Short-cut if all we care about is pending logs
if beginPending && endPending {
return f.pendingLogs(), nil
}
resolveSpecial := func(number int64) (int64, error) {
var hdr *types.Header
switch number {
case rpc.LatestBlockNumber.Int64():
return head, nil
case rpc.PendingBlockNumber.Int64():
case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64():
// we should return head here since we've already captured
// that we need to get the pending logs in the pending boolean above
return head, nil
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if hdr == nil {
return 0, errors.New("latest header not found")
}
case rpc.CommittedBlockNumber.Int64():
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.CommittedBlockNumber)
if hdr == nil {
@ -142,57 +141,92 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
}
return hdr.Number.Int64(), nil
}
var err error
// range query need to resolve the special begin/end block number
if f.begin, err = resolveSpecial(f.begin); err != nil {
return nil, err
}
if f.end, err = resolveSpecial(f.end); err != nil {
return nil, err
}
// Gather all indexed logs, and finish with non indexed ones
logChan, errChan := f.rangeLogsAsync(ctx)
var logs []*types.Log
for {
select {
case log := <-logChan:
logs = append(logs, log)
case err := <-errChan:
if err != nil {
// if an error occurs during extraction, we do return the extracted data
return logs, err
}
// Append the pending ones
if endPending {
pendingLogs := f.pendingLogs()
logs = append(logs, pendingLogs...)
}
return logs, nil
}
}
}
// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously,
// it creates and returns two channels: one for delivering log data, and one for reporting errors.
func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan error) {
var (
logs []*types.Log
end = uint64(f.end)
size, sections = f.sys.backend.BloomStatus()
logChan = make(chan *types.Log)
errChan = make(chan error)
)
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
logs, err = f.indexedLogs(ctx, end)
} else {
logs, err = f.indexedLogs(ctx, indexed-1)
go func() {
defer func() {
close(errChan)
close(logChan)
}()
// Gather all indexed logs, and finish with non indexed ones
var (
end = uint64(f.end)
size, sections = f.sys.backend.BloomStatus()
err error
)
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
indexed = end + 1
}
if err = f.indexedLogs(ctx, indexed-1, logChan); err != nil {
errChan <- err
return
}
}
if err != nil {
return logs, err
if err := f.unindexedLogs(ctx, end, logChan); err != nil {
errChan <- err
return
}
}
rest, err := f.unindexedLogs(ctx, end)
logs = append(logs, rest...)
if pending {
pendingLogs, err := f.pendingLogs()
if err != nil {
return nil, err
}
logs = append(logs, pendingLogs...)
}
return logs, err
errChan <- nil
}()
return logChan, errChan
}
// indexedLogs returns the logs matching the filter criteria based on the bloom
// bits indexed available locally or via the network.
func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
// Create a matcher session and request servicing from the backend
matches := make(chan uint64, 64)
session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
if err != nil {
return nil, err
return err
}
defer session.Close()
f.sys.backend.ServiceFilter(ctx, session)
// Iterate over the matches until exhausted or context closed
var logs []*types.Log
for {
select {
case number, ok := <-matches:
@ -202,44 +236,50 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
if err == nil {
f.begin = int64(end) + 1
}
return logs, err
return err
}
f.begin = int64(number) + 1
// Retrieve the suggested block and pull any truly matching logs
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
if header == nil || err != nil {
return logs, err
return err
}
found, err := f.blockLogs(ctx, header)
if err != nil {
return logs, err
return err
}
for _, log := range found {
logChan <- log
}
logs = append(logs, found...)
case <-ctx.Done():
return logs, ctx.Err()
return ctx.Err()
}
}
}
// indexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
var logs []*types.Log
func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
for ; f.begin <= int64(end); f.begin++ {
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
if header == nil || err != nil {
return logs, err
return err
}
found, err := f.blockLogs(ctx, header)
if err != nil {
return logs, err
return err
}
for _, log := range found {
select {
case logChan <- log:
case <-ctx.Done():
return ctx.Err()
}
}
logs = append(logs, found...)
}
return logs, nil
return nil
}
// blockLogs returns the logs matching the filter criteria within a single block.
@ -286,19 +326,19 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ
}
// pendingLogs returns the logs matching the filter criteria within the pending block.
func (f *Filter) pendingLogs() ([]*types.Log, error) {
func (f *Filter) pendingLogs() []*types.Log {
block, receipts := f.sys.backend.PendingBlockAndReceipts()
if block == nil {
return nil, errors.New("pending state not available")
return nil
}
if bloomFilter(block.Bloom(), f.addresses, f.topics) {
var unfiltered []*types.Log
for _, r := range receipts {
unfiltered = append(unfiltered, r.Logs...)
}
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics), nil
return filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
}
return nil, nil
return nil
}
func includes(addresses []common.Address, a common.Address) bool {

View file

@ -50,6 +50,8 @@ type testBackend struct {
rmLogsFeed event.Feed
pendingLogsFeed event.Feed
chainFeed event.Feed
pendingBlock *types.Block
pendingReceipts types.Receipts
}
func (b *testBackend) ChainConfig() *params.ChainConfig {
@ -115,7 +117,7 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint
}
func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return nil, nil
return b.pendingBlock, b.pendingReceipts
}
func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {

View file

@ -18,15 +18,19 @@ package filters
import (
"context"
"encoding/json"
"math/big"
"reflect"
"strings"
"testing"
"time"
"github.com/XinFinOrg/XDPoSChain/accounts/abi"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/ethash"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/core/vm"
"github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/params"
"github.com/XinFinOrg/XDPoSChain/rpc"
@ -92,116 +96,258 @@ func BenchmarkFilters(b *testing.B) {
}
func TestFilters(t *testing.T) {
dir := t.TempDir()
config := *params.TestChainConfig
config.Eip1559Block = big.NewInt(0)
var (
db, _ = rawdb.NewLevelDBDatabase(dir, 0, 0, "", false)
_, sys = newTestFilterSystem(t, db, Config{})
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(t, db, Config{})
// Sender account
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
signer = types.NewLondonSigner(big.NewInt(1))
// Logging contract
contract = common.Address{0xfe}
contract2 = common.Address{0xff}
abiStr = `[{"inputs":[],"name":"log0","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"t1","type":"uint256"}],"name":"log1","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"t1","type":"uint256"},{"internalType":"uint256","name":"t2","type":"uint256"}],"name":"log2","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"t1","type":"uint256"},{"internalType":"uint256","name":"t2","type":"uint256"},{"internalType":"uint256","name":"t3","type":"uint256"}],"name":"log3","outputs":[],"stateMutability":"nonpayable","type":"function"},{"inputs":[{"internalType":"uint256","name":"t1","type":"uint256"},{"internalType":"uint256","name":"t2","type":"uint256"},{"internalType":"uint256","name":"t3","type":"uint256"},{"internalType":"uint256","name":"t4","type":"uint256"}],"name":"log4","outputs":[],"stateMutability":"nonpayable","type":"function"}]`
/*
// SPDX-License-Identifier: GPL-3.0
pragma solidity >=0.7.0 <0.9.0;
contract Logger {
function log0() external {
assembly {
log0(0, 0)
}
}
function log1(uint t1) external {
assembly {
log1(0, 0, t1)
}
}
function log2(uint t1, uint t2) external {
assembly {
log2(0, 0, t1, t2)
}
}
function log3(uint t1, uint t2, uint t3) external {
assembly {
log3(0, 0, t1, t2, t3)
}
}
function log4(uint t1, uint t2, uint t3, uint t4) external {
assembly {
log4(0, 0, t1, t2, t3, t4)
}
}
}
*/
bytecode = common.FromHex("608060405234801561001057600080fd5b50600436106100575760003560e01c80630aa731851461005c5780632a4c08961461006657806378b9a1f314610082578063c670f8641461009e578063c683d6a3146100ba575b600080fd5b6100646100d6565b005b610080600480360381019061007b9190610143565b6100dc565b005b61009c60048036038101906100979190610196565b6100e8565b005b6100b860048036038101906100b391906101d6565b6100f2565b005b6100d460048036038101906100cf9190610203565b6100fa565b005b600080a0565b808284600080a3505050565b8082600080a25050565b80600080a150565b80828486600080a450505050565b600080fd5b6000819050919050565b6101208161010d565b811461012b57600080fd5b50565b60008135905061013d81610117565b92915050565b60008060006060848603121561015c5761015b610108565b5b600061016a8682870161012e565b935050602061017b8682870161012e565b925050604061018c8682870161012e565b9150509250925092565b600080604083850312156101ad576101ac610108565b5b60006101bb8582860161012e565b92505060206101cc8582860161012e565b9150509250929050565b6000602082840312156101ec576101eb610108565b5b60006101fa8482850161012e565b91505092915050565b6000806000806080858703121561021d5761021c610108565b5b600061022b8782880161012e565b945050602061023c8782880161012e565b935050604061024d8782880161012e565b925050606061025e8782880161012e565b9150509295919450925056fea264697066735822122073a4b156f487e59970dc1ef449cc0d51467268f676033a17188edafcee861f9864736f6c63430008110033")
hash1 = common.BytesToHash([]byte("topic1"))
hash2 = common.BytesToHash([]byte("topic2"))
hash3 = common.BytesToHash([]byte("topic3"))
hash4 = common.BytesToHash([]byte("topic4"))
)
defer db.Close()
hash5 = common.BytesToHash([]byte("topic5"))
genesis := core.GenesisBlockForTesting(db, addr, big.NewInt(1000000))
chain, receipts := core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 1000, func(i int, gen *core.BlockGen) {
gspec = &core.Genesis{
Config: &config,
Alloc: types.GenesisAlloc{
addr: {Balance: big.NewInt(0).Mul(big.NewInt(100), big.NewInt(params.Ether))},
contract: {Balance: big.NewInt(0), Code: bytecode},
contract2: {Balance: big.NewInt(0), Code: bytecode},
},
BaseFee: big.NewInt(params.InitialBaseFee),
}
)
contractABI, err := abi.JSON(strings.NewReader(abiStr))
if err != nil {
t.Fatal(err)
}
// Hack: GenerateChainWithGenesis creates a new db.
// Commit the genesis manually and use GenerateChain.
genesis := gspec.MustCommit(db)
chain, _ := core.GenerateChain(&config, genesis, ethash.NewFaker(), db, 1000, func(i int, gen *core.BlockGen) {
switch i {
case 1:
receipt := types.NewReceipt(nil, false, 0)
receipt.Logs = []*types.Log{
{
Address: addr,
Topics: []common.Hash{hash1},
},
data, err := contractABI.Pack("log1", hash1.Big())
if err != nil {
t.Fatal(err)
}
gen.AddUncheckedReceipt(receipt)
gen.AddUncheckedTx(types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(1), 1, big.NewInt(2100), nil))
tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{
Nonce: 0,
GasPrice: gen.BaseFee(),
Gas: 30000,
To: &contract,
Data: data,
}), signer, key1)
gen.AddTx(tx)
tx2, _ := types.SignTx(types.NewTx(&types.LegacyTx{
Nonce: 1,
GasPrice: gen.BaseFee(),
Gas: 30000,
To: &contract2,
Data: data,
}), signer, key1)
gen.AddTx(tx2)
case 2:
receipt := types.NewReceipt(nil, false, 0)
receipt.Logs = []*types.Log{
{
Address: addr,
Topics: []common.Hash{hash2},
},
data, err := contractABI.Pack("log2", hash2.Big(), hash1.Big())
if err != nil {
t.Fatal(err)
}
gen.AddUncheckedReceipt(receipt)
gen.AddUncheckedTx(types.NewTransaction(2, common.HexToAddress("0x2"), big.NewInt(2), 2, big.NewInt(2100), nil))
tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{
Nonce: 2,
GasPrice: gen.BaseFee(),
Gas: 30000,
To: &contract,
Data: data,
}), signer, key1)
gen.AddTx(tx)
case 998:
receipt := types.NewReceipt(nil, false, 0)
receipt.Logs = []*types.Log{
{
Address: addr,
Topics: []common.Hash{hash3},
},
data, err := contractABI.Pack("log1", hash3.Big())
if err != nil {
t.Fatal(err)
}
gen.AddUncheckedReceipt(receipt)
gen.AddUncheckedTx(types.NewTransaction(998, common.HexToAddress("0x998"), big.NewInt(998), 998, big.NewInt(2100), nil))
tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{
Nonce: 3,
GasPrice: gen.BaseFee(),
Gas: 30000,
To: &contract2,
Data: data,
}), signer, key1)
gen.AddTx(tx)
case 999:
receipt := types.NewReceipt(nil, false, 0)
receipt.Logs = []*types.Log{
{
Address: addr,
Topics: []common.Hash{hash4},
},
data, err := contractABI.Pack("log1", hash4.Big())
if err != nil {
t.Fatal(err)
}
gen.AddUncheckedReceipt(receipt)
gen.AddUncheckedTx(types.NewTransaction(999, common.HexToAddress("0x999"), big.NewInt(999), 999, big.NewInt(2100), nil))
tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{
Nonce: 4,
GasPrice: gen.BaseFee(),
Gas: 30000,
To: &contract,
Data: data,
}), signer, key1)
gen.AddTx(tx)
}
})
for i, block := range chain {
rawdb.WriteBlock(db, block)
rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64())
rawdb.WriteHeadBlockHash(db, block.Hash())
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i])
bc, err := core.NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{})
if err != nil {
t.Fatal(err)
}
_, err = bc.InsertChain(chain)
if err != nil {
t.Fatal(err)
}
filter := sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{addr}, [][]common.Hash{{hash1, hash2, hash3, hash4}})
logs, _ := filter.Logs(context.Background())
if len(logs) != 4 {
t.Error("expected 4 log, got", len(logs))
}
// TODO(daniel): ref PR #27135
// Set block 998 as Finalized (-3)
// bc.SetFinalized(chain[998].Header())
// Generate pending block
pchain, preceipts := core.GenerateChain(gspec.Config, chain[len(chain)-1], ethash.NewFaker(), db, 1, func(i int, gen *core.BlockGen) {
data, err := contractABI.Pack("log1", hash5.Big())
if err != nil {
t.Fatal(err)
}
tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{
Nonce: 5,
GasPrice: gen.BaseFee(),
Gas: 30000,
To: &contract,
Data: data,
}), signer, key1)
gen.AddTx(tx)
})
sys.backend.(*testBackend).pendingBlock = pchain[0]
sys.backend.(*testBackend).pendingReceipts = preceipts[0]
for i, tc := range []struct {
f *Filter
wantHashes []common.Hash
f *Filter
want string
err string
}{
{
sys.NewRangeFilter(900, 999, []common.Address{addr}, [][]common.Hash{{hash3}}),
[]common.Hash{hash3},
f: sys.NewBlockFilter(chain[2].Hash(), []common.Address{contract}, nil),
want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696332","0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x3","transactionHash":"0xac8b8343d69a5c46fef5af7158f42a389bc84093a88e97e184cc4263cf85dc54","transactionIndex":"0x0","blockHash":"0x6dca03904b22bf701dae59c7135dc3e0b578bd4c577d1c111d9d97776090ae09","logIndex":"0x0","removed":false}]`,
}, {
sys.NewRangeFilter(990, int64(rpc.LatestBlockNumber), []common.Address{addr}, [][]common.Hash{{hash3}}),
[]common.Hash{hash3},
f: sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{contract}, [][]common.Hash{{hash1, hash2, hash3, hash4}}),
want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x2","transactionHash":"0x3ebf3c1ea6e0282cfe845f273b2cc7e97f02145e6d0577641cd1a82d532a19c1","transactionIndex":"0x0","blockHash":"0x0f4dea85fc816b6fd5eb4a0ba86eb00cc7d3397785336870bebd27f181a722da","logIndex":"0x0","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696332","0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x3","transactionHash":"0xac8b8343d69a5c46fef5af7158f42a389bc84093a88e97e184cc4263cf85dc54","transactionIndex":"0x0","blockHash":"0x6dca03904b22bf701dae59c7135dc3e0b578bd4c577d1c111d9d97776090ae09","logIndex":"0x0","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x21fd39694cbcc8cc5046b3b7d5200101edf9c85218da613a8851eb5e3d195241","transactionIndex":"0x0","blockHash":"0x8a956d79ca6468ff23c97615a4aa24a55bdaff78767ee28d3e2e02ecb407a0de","logIndex":"0x0","removed":false}]`,
}, {
sys.NewRangeFilter(1, 10, nil, [][]common.Hash{{hash1, hash2}}),
[]common.Hash{hash1, hash2},
f: sys.NewRangeFilter(900, 999, []common.Address{contract}, [][]common.Hash{{hash3}}),
}, {
sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), nil, [][]common.Hash{{common.BytesToHash([]byte("fail"))}}),
nil,
f: sys.NewRangeFilter(990, int64(rpc.LatestBlockNumber), []common.Address{contract2}, [][]common.Hash{{hash3}}),
want: `[{"address":"0xff00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696333"],"data":"0x","blockNumber":"0x3e7","transactionHash":"0x7c42465b2bd34fb8e87bab0001ab97c47b6d57c65f17efe43e81461fef2f05a4","transactionIndex":"0x0","blockHash":"0x5112c98f7517100552d30734c46356db10494c90bb3bd0af90ef9ace2e692ad2","logIndex":"0x0","removed":false}]`,
}, {
sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{common.BytesToAddress([]byte("failmenow"))}, nil),
nil,
f: sys.NewRangeFilter(1, 10, []common.Address{contract}, [][]common.Hash{{hash2}, {hash1}}),
want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696332","0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x3","transactionHash":"0xac8b8343d69a5c46fef5af7158f42a389bc84093a88e97e184cc4263cf85dc54","transactionIndex":"0x0","blockHash":"0x6dca03904b22bf701dae59c7135dc3e0b578bd4c577d1c111d9d97776090ae09","logIndex":"0x0","removed":false}]`,
}, {
sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), nil, [][]common.Hash{{common.BytesToHash([]byte("fail"))}, {hash1}}),
nil,
f: sys.NewRangeFilter(1, 10, nil, [][]common.Hash{{hash1, hash2}}),
want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x2","transactionHash":"0x3ebf3c1ea6e0282cfe845f273b2cc7e97f02145e6d0577641cd1a82d532a19c1","transactionIndex":"0x0","blockHash":"0x0f4dea85fc816b6fd5eb4a0ba86eb00cc7d3397785336870bebd27f181a722da","logIndex":"0x0","removed":false},{"address":"0xff00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x2","transactionHash":"0x55b3a22ae885f9441ff8bd98fbfe54cc1b84799606aca159fac8d7a56551e426","transactionIndex":"0x1","blockHash":"0x0f4dea85fc816b6fd5eb4a0ba86eb00cc7d3397785336870bebd27f181a722da","logIndex":"0x1","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696332","0x0000000000000000000000000000000000000000000000000000746f70696331"],"data":"0x","blockNumber":"0x3","transactionHash":"0xac8b8343d69a5c46fef5af7158f42a389bc84093a88e97e184cc4263cf85dc54","transactionIndex":"0x0","blockHash":"0x6dca03904b22bf701dae59c7135dc3e0b578bd4c577d1c111d9d97776090ae09","logIndex":"0x0","removed":false}]`,
}, {
f: sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), nil, [][]common.Hash{{common.BytesToHash([]byte("fail"))}}),
}, {
f: sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{common.BytesToAddress([]byte("failmenow"))}, nil),
}, {
f: sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), nil, [][]common.Hash{{common.BytesToHash([]byte("fail"))}, {hash1}}),
}, {
f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.LatestBlockNumber), nil, nil),
want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x21fd39694cbcc8cc5046b3b7d5200101edf9c85218da613a8851eb5e3d195241","transactionIndex":"0x0","blockHash":"0x8a956d79ca6468ff23c97615a4aa24a55bdaff78767ee28d3e2e02ecb407a0de","logIndex":"0x0","removed":false}]`,
}, {
f: sys.NewRangeFilter(int64(rpc.CommittedBlockNumber), int64(rpc.LatestBlockNumber), nil, nil),
err: "committed header not found",
}, {
f: sys.NewRangeFilter(int64(rpc.CommittedBlockNumber), int64(rpc.CommittedBlockNumber), nil, nil),
err: "committed header not found",
}, {
f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.CommittedBlockNumber), nil, nil),
err: "committed header not found",
}, {
f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.PendingBlockNumber), nil, nil),
want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x87d02f2dddb1941ff179ae5d5fbb123afb9c5f71220045bc1c48f3872be24d4a","transactionIndex":"0x0","blockHash":"0x5f2b35a350840476a43aa23c6ea031d6db277aef337775ebb8421df64f17723f","logIndex":"0x0","removed":false}]`,
}, {
f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.PendingBlockNumber), nil, nil),
want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x21fd39694cbcc8cc5046b3b7d5200101edf9c85218da613a8851eb5e3d195241","transactionIndex":"0x0","blockHash":"0x8a956d79ca6468ff23c97615a4aa24a55bdaff78767ee28d3e2e02ecb407a0de","logIndex":"0x0","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x87d02f2dddb1941ff179ae5d5fbb123afb9c5f71220045bc1c48f3872be24d4a","transactionIndex":"0x0","blockHash":"0x5f2b35a350840476a43aa23c6ea031d6db277aef337775ebb8421df64f17723f","logIndex":"0x0","removed":false}]`,
}, {
f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.LatestBlockNumber), nil, nil),
err: "invalid block range",
},
} {
logs, _ := tc.f.Logs(context.Background())
var haveHashes []common.Hash
for _, l := range logs {
haveHashes = append(haveHashes, l.Topics[0])
logs, err := tc.f.Logs(context.Background())
if err == nil && tc.err != "" {
t.Fatalf("test %d, expected error %q, got nil", i, tc.err)
} else if err != nil && err.Error() != tc.err {
t.Fatalf("test %d, expected error %q, got %q", i, tc.err, err.Error())
}
if have, want := len(haveHashes), len(tc.wantHashes); have != want {
t.Fatalf("test %d, have %d logs, want %d", i, have, want)
}
if len(haveHashes) == 0 {
if tc.want == "" && len(logs) == 0 {
continue
}
if !reflect.DeepEqual(tc.wantHashes, haveHashes) {
t.Fatalf("test %d, have %v want %v", i, haveHashes, tc.wantHashes)
have, err := json.Marshal(logs)
if err != nil {
t.Fatal(err)
}
if string(have) != tc.want {
t.Fatalf("test %d, have:\n%s\nwant:\n%s", i, have, tc.want)
}
}
t.Run("timeout", func(t *testing.T) {
f := sys.NewRangeFilter(0, -1, nil, nil)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
defer cancel()
_, err := f.Logs(ctx)
if err == nil {
t.Fatal("expected error")
}
if err != context.DeadlineExceeded {
t.Fatalf("expected context.DeadlineExceeded, got %v", err)
}
})
}