mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-24 08:49:29 +00:00
Merge 3c1b7bd7ff into 12eabbd76d
This commit is contained in:
commit
07f67a3f45
5 changed files with 168 additions and 12 deletions
|
|
@ -484,8 +484,12 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
|
|||
if begin >= 0 && begin < int64(api.events.backend.HistoryPruningCutoff()) {
|
||||
return nil, &history.PrunedHistoryError{}
|
||||
}
|
||||
// Construct the range filter
|
||||
filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics, api.rangeLimit)
|
||||
// Construct the range filter, respecting optional result-count limit
|
||||
if crit.Limit != nil && *crit.Limit > 0 {
|
||||
filter = api.sys.NewRangeFilterWithLimit(begin, end, crit.Addresses, crit.Topics, api.rangeLimit, *crit.Limit)
|
||||
} else {
|
||||
filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics, api.rangeLimit)
|
||||
}
|
||||
}
|
||||
|
||||
// Run the filter and return all the logs
|
||||
|
|
@ -493,7 +497,7 @@ func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*type
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return returnLogs(logs), err
|
||||
return returnLogs(logs), nil
|
||||
}
|
||||
|
||||
// UninstallFilter removes the filter with the given filter id.
|
||||
|
|
@ -627,6 +631,7 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
|
|||
ToBlock *rpc.BlockNumber `json:"toBlock"`
|
||||
Addresses interface{} `json:"address"`
|
||||
Topics []interface{} `json:"topics"`
|
||||
Limit *hexutil.Uint64 `json:"limit"`
|
||||
}
|
||||
|
||||
var raw input
|
||||
|
|
@ -725,6 +730,11 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
|
|||
}
|
||||
}
|
||||
|
||||
if raw.Limit != nil {
|
||||
limitVal := uint64(*raw.Limit)
|
||||
args.Limit = &limitVal
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
|
|||
var (
|
||||
fromBlock rpc.BlockNumber = 0x123435
|
||||
toBlock rpc.BlockNumber = 0xabcdef
|
||||
limit uint64 = 0x2
|
||||
address0 = common.HexToAddress("70c87d191324e6712a591f304b4eedef6ad9bb9d")
|
||||
address1 = common.HexToAddress("9b2055d370f73ec7d8a03e965129118dc8f5bf83")
|
||||
topic0 = common.HexToHash("3ac225168df54212a25c1c01fd35bebfea408fdac2e31ddd6f80a4bbf9a5f1ca")
|
||||
|
|
@ -53,6 +54,9 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
|
|||
if len(test0.Topics) != 0 {
|
||||
t.Fatalf("expected 0 topics, got %d topics", len(test0.Topics))
|
||||
}
|
||||
if test0.Limit != nil {
|
||||
t.Fatalf("expected nil limit, got %d", *test0.Limit)
|
||||
}
|
||||
|
||||
// from, to block number
|
||||
var test1 FilterCriteria
|
||||
|
|
@ -66,6 +70,22 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
|
|||
if test1.ToBlock.Int64() != toBlock.Int64() {
|
||||
t.Fatalf("expected ToBlock %d, got %d", toBlock, test1.ToBlock)
|
||||
}
|
||||
if test1.Limit != nil {
|
||||
t.Fatalf("expected nil limit, got %d", *test1.Limit)
|
||||
}
|
||||
|
||||
// limit
|
||||
var testLimit FilterCriteria
|
||||
vector = fmt.Sprintf(`{"limit":"0x%x"}`, limit)
|
||||
if err := json.Unmarshal([]byte(vector), &testLimit); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if testLimit.Limit == nil {
|
||||
t.Fatal("expected non-nil limit")
|
||||
}
|
||||
if *testLimit.Limit != limit {
|
||||
t.Fatalf("expected limit %d, got %d", limit, *testLimit.Limit)
|
||||
}
|
||||
|
||||
// single address
|
||||
var test2 FilterCriteria
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ type Filter struct {
|
|||
|
||||
rangeLogsTestHook chan rangeLogsTestEvent
|
||||
rangeLimit uint64
|
||||
limit uint64 // Maximum number of logs to return; 0 means no limit
|
||||
}
|
||||
|
||||
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
|
||||
|
|
@ -59,6 +60,14 @@ func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Add
|
|||
return filter
|
||||
}
|
||||
|
||||
// NewRangeFilterWithLimit creates a new filter with a maximum result count.
|
||||
// When limit is non-zero, the returned result is capped to that many logs.
|
||||
func (sys *FilterSystem) NewRangeFilterWithLimit(begin, end int64, addresses []common.Address, topics [][]common.Hash, rangeLimit, limit uint64) *Filter {
|
||||
filter := sys.NewRangeFilter(begin, end, addresses, topics, rangeLimit)
|
||||
filter.limit = limit
|
||||
return filter
|
||||
}
|
||||
|
||||
// NewBlockFilter creates a new filter which directly inspects the contents of
|
||||
// a block to figure out whether it is interesting or not.
|
||||
func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
|
||||
|
|
@ -148,7 +157,14 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
|
|||
if f.rangeLimit != 0 && (end-begin) > f.rangeLimit {
|
||||
return nil, invalidParamsErr("exceed maximum block range %d", f.rangeLimit)
|
||||
}
|
||||
return f.rangeLogs(ctx, begin, end)
|
||||
logs, err := f.rangeLogs(ctx, begin, end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if f.limit > 0 && uint64(len(logs)) > f.limit {
|
||||
logs = logs[:f.limit]
|
||||
}
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
const (
|
||||
|
|
@ -450,6 +466,11 @@ func (f *Filter) unindexedLogs(ctx context.Context, chainView *filtermaps.ChainV
|
|||
return matches, err
|
||||
}
|
||||
matches = append(matches, found...)
|
||||
// Early exit once the limit is reached on the unindexed path, avoiding
|
||||
// a full raw-block scan when callers only need the first N matches.
|
||||
if f.limit > 0 && uint64(len(matches)) >= f.limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
log.Debug("Performed unindexed log search", "begin", begin, "end", end, "matches", len(matches), "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return matches, nil
|
||||
|
|
|
|||
|
|
@ -48,18 +48,30 @@ func makeReceipt(addr common.Address) *types.Receipt {
|
|||
}
|
||||
|
||||
func BenchmarkFiltersIndexed(b *testing.B) {
|
||||
benchmarkFilters(b, 0, false)
|
||||
benchmarkFilters(b, 0, false, 0, 4)
|
||||
}
|
||||
|
||||
func BenchmarkFiltersHalfIndexed(b *testing.B) {
|
||||
benchmarkFilters(b, 50000, false)
|
||||
benchmarkFilters(b, 50000, false, 0, 4)
|
||||
}
|
||||
|
||||
func BenchmarkFiltersUnindexed(b *testing.B) {
|
||||
benchmarkFilters(b, 0, true)
|
||||
benchmarkFilters(b, 0, true, 0, 4)
|
||||
}
|
||||
|
||||
func benchmarkFilters(b *testing.B, history uint64, noHistory bool) {
|
||||
func BenchmarkFiltersIndexedLimit1(b *testing.B) {
|
||||
benchmarkFilters(b, 0, false, 1, 1)
|
||||
}
|
||||
|
||||
func BenchmarkFiltersHalfIndexedLimit1(b *testing.B) {
|
||||
benchmarkFilters(b, 50000, false, 1, 1)
|
||||
}
|
||||
|
||||
func BenchmarkFiltersUnindexedLimit1(b *testing.B) {
|
||||
benchmarkFilters(b, 0, true, 1, 1)
|
||||
}
|
||||
|
||||
func benchmarkFilters(b *testing.B, history uint64, noHistory bool, limit uint64, expected int) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend, sys = newTestFilterSystem(db, Config{})
|
||||
|
|
@ -110,12 +122,18 @@ func benchmarkFilters(b *testing.B, history uint64, noHistory bool) {
|
|||
backend.startFilterMaps(history, noHistory, filtermaps.DefaultParams)
|
||||
defer backend.stopFilterMaps()
|
||||
|
||||
filter := sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{addr1, addr2, addr3, addr4}, nil, 0)
|
||||
var filter *Filter
|
||||
if limit > 0 {
|
||||
filter = sys.NewRangeFilterWithLimit(0, int64(rpc.LatestBlockNumber), []common.Address{addr1, addr2, addr3, addr4}, nil, 0, limit)
|
||||
} else {
|
||||
filter = sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{addr1, addr2, addr3, addr4}, nil, 0)
|
||||
}
|
||||
ctx := context.Background()
|
||||
for b.Loop() {
|
||||
filter.begin = 0
|
||||
logs, _ := filter.Logs(context.Background())
|
||||
if len(logs) != 4 {
|
||||
b.Fatal("expected 4 logs, got", len(logs))
|
||||
logs, _ := filter.Logs(ctx)
|
||||
if len(logs) != expected {
|
||||
b.Fatal("expected", expected, "logs, got", len(logs))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -132,6 +150,88 @@ func TestFiltersUnindexed(t *testing.T) {
|
|||
testFilters(t, 0, true)
|
||||
}
|
||||
|
||||
func TestFiltersIndexedWithLimit(t *testing.T) {
|
||||
testFiltersWithLimit(t, 0, false)
|
||||
}
|
||||
|
||||
func TestFiltersHalfIndexedWithLimit(t *testing.T) {
|
||||
testFiltersWithLimit(t, 500, false)
|
||||
}
|
||||
|
||||
func TestFiltersUnindexedWithLimit(t *testing.T) {
|
||||
testFiltersWithLimit(t, 0, true)
|
||||
}
|
||||
|
||||
func testFiltersWithLimit(t *testing.T, history uint64, noHistory bool) {
|
||||
const (
|
||||
totalBlocks = 1000
|
||||
queryLimit uint64 = 2
|
||||
)
|
||||
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend, sys = newTestFilterSystem(db, Config{})
|
||||
target = common.BytesToAddress([]byte("target"))
|
||||
other = common.BytesToAddress([]byte("other"))
|
||||
gspec = &core.Genesis{
|
||||
BaseFee: big.NewInt(params.InitialBaseFee),
|
||||
Config: params.TestChainConfig,
|
||||
}
|
||||
)
|
||||
defer db.Close()
|
||||
|
||||
_, err := gspec.Commit(db, triedb.NewDatabase(db, nil), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
addReceiptLog := func(gen *core.BlockGen, nonce uint64, addr common.Address) {
|
||||
gen.AddUncheckedReceipt(makeReceipt(addr))
|
||||
gen.AddUncheckedTx(types.NewTransaction(nonce, common.HexToAddress("0x999"), big.NewInt(1), 21000, gen.BaseFee(), nil))
|
||||
}
|
||||
chain, receipts := core.GenerateChain(gspec.Config, gspec.ToBlock(), ethash.NewFaker(), db, totalBlocks, func(i int, gen *core.BlockGen) {
|
||||
switch i {
|
||||
case 2, 5, 900:
|
||||
addReceiptLog(gen, 999, target)
|
||||
case 990:
|
||||
addReceiptLog(gen, 1000, other)
|
||||
}
|
||||
})
|
||||
for i, block := range chain {
|
||||
rawdb.WriteBlock(db, block)
|
||||
rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64())
|
||||
rawdb.WriteHeadBlockHash(db, block.Hash())
|
||||
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i])
|
||||
}
|
||||
backend.startFilterMaps(history, noHistory, filtermaps.DefaultParams)
|
||||
defer backend.stopFilterMaps()
|
||||
|
||||
allFilter := sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{target}, nil, 0)
|
||||
allLogs, err := allFilter.Logs(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(allLogs) != 3 {
|
||||
t.Fatalf("expected 3 total logs, got %d", len(allLogs))
|
||||
}
|
||||
|
||||
limitedFilter := sys.NewRangeFilterWithLimit(0, int64(rpc.LatestBlockNumber), []common.Address{target}, nil, 0, queryLimit)
|
||||
limitedLogs, err := limitedFilter.Logs(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(limitedLogs) != int(queryLimit) {
|
||||
t.Fatalf("expected %d logs with limit, got %d", queryLimit, len(limitedLogs))
|
||||
}
|
||||
if limitedLogs[0].BlockNumber != 3 || limitedLogs[1].BlockNumber != 6 {
|
||||
t.Fatalf("unexpected limited block numbers: got [%d %d], want [3 6]", limitedLogs[0].BlockNumber, limitedLogs[1].BlockNumber)
|
||||
}
|
||||
for i, log := range limitedLogs {
|
||||
if log.Address != target {
|
||||
t.Fatalf("unexpected address in limited log #%d: got %x, want %x", i, log.Address, target)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testFilters(t *testing.T, history uint64, noHistory bool) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
|
|
|
|||
|
|
@ -206,6 +206,11 @@ type FilterQuery struct {
|
|||
// {{A}, {B}} matches topic A in first position AND B in second position
|
||||
// {{A, B}, {C, D}} matches topic (A OR B) in first position AND (C OR D) in second position
|
||||
Topics [][]common.Hash
|
||||
|
||||
// Limit caps the maximum number of logs returned.
|
||||
// When non-nil and non-zero, at most Limit matching logs are returned.
|
||||
// Example: set Limit to 1 to receive a single matching event.
|
||||
Limit *uint64
|
||||
}
|
||||
|
||||
// LogFilterer provides access to contract log events using a one-off query or continuous
|
||||
|
|
|
|||
Loading…
Reference in a new issue