diff --git a/cmd/workload/filtertest.go b/cmd/workload/filtertest.go index 9f0b6cab44..d77cbc5768 100644 --- a/cmd/workload/filtertest.go +++ b/cmd/workload/filtertest.go @@ -182,13 +182,14 @@ func (s *filterTestSuite) loadQueries() error { // filterQuery is a single query for testing. type filterQuery struct { - FromBlock int64 `json:"fromBlock"` - ToBlock int64 `json:"toBlock"` - Address []common.Address `json:"address"` - Topics [][]common.Hash `json:"topics"` - ResultHash *common.Hash `json:"resultHash,omitempty"` - results []types.Log - Err error `json:"error,omitempty"` + FromBlock int64 `json:"fromBlock"` + ToBlock int64 `json:"toBlock"` + lastBlockHash common.Hash + Address []common.Address `json:"address"` + Topics [][]common.Hash `json:"topics"` + ResultHash *common.Hash `json:"resultHash,omitempty"` + results []types.Log + Err error `json:"error,omitempty"` } func (fq *filterQuery) isWildcard() bool { diff --git a/cmd/workload/filtertestfuzz.go b/cmd/workload/filtertestfuzz.go new file mode 100644 index 0000000000..3549f4db56 --- /dev/null +++ b/cmd/workload/filtertestfuzz.go @@ -0,0 +1,337 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of go-ethereum. +// +// go-ethereum is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// go-ethereum is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with go-ethereum. If not, see . + +package main + +import ( + "context" + "fmt" + "math/big" + "reflect" + "slices" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/lru" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" + "github.com/urfave/cli/v2" +) + +const maxFilterRangeForTestFuzz = 300 + +var ( + filterFuzzCommand = &cli.Command{ + Name: "filterfuzz", + Usage: "Generates queries and compares results against matches derived from receipts", + ArgsUsage: "", + Action: filterFuzzCmd, + Flags: []cli.Flag{}, + } +) + +// filterFuzzCmd is the main function of the filter fuzzer. +func filterFuzzCmd(ctx *cli.Context) error { + f := newFilterTestGen(ctx, maxFilterRangeForTestFuzz) + var lastHead *types.Header + headerCache := lru.NewCache[common.Hash, *types.Header](200) + + commonAncestor := func(oldPtr, newPtr *types.Header) *types.Header { + if oldPtr == nil || newPtr == nil { + return nil + } + if newPtr.Number.Uint64() > oldPtr.Number.Uint64()+100 || oldPtr.Number.Uint64() > newPtr.Number.Uint64()+100 { + return nil + } + for oldPtr.Hash() != newPtr.Hash() { + if newPtr.Number.Uint64() >= oldPtr.Number.Uint64() { + if parent, _ := headerCache.Get(newPtr.ParentHash); parent != nil { + newPtr = parent + } else { + newPtr, _ = getHeaderByHash(f.client, newPtr.ParentHash) + if newPtr == nil { + return nil + } + headerCache.Add(newPtr.Hash(), newPtr) + } + } + if oldPtr.Number.Uint64() > newPtr.Number.Uint64() { + oldPtr, _ = headerCache.Get(oldPtr.ParentHash) + if oldPtr == nil { + return nil + } + } + } + return newPtr + } + + fetchHead := func() (*types.Header, bool) { + currentHead, err := getLatestHeader(f.client) + if err != nil { + fmt.Println("Could not fetch head block", err) + return nil, false + } + headerCache.Add(currentHead.Hash(), currentHead) + if lastHead != nil && currentHead.Hash() == lastHead.Hash() { + return currentHead, false + } + f.blockLimit = currentHead.Number.Int64() + ca := commonAncestor(lastHead, currentHead) + fmt.Print("*** New head ", f.blockLimit) + if ca == nil { + fmt.Println(" ") + } else { + if reorged := lastHead.Number.Uint64() - ca.Number.Uint64(); reorged > 0 { + fmt.Print(" reorged ", reorged) + } + if missed := currentHead.Number.Uint64() - ca.Number.Uint64() - 1; missed > 0 { + fmt.Print(" missed ", missed) + } + fmt.Println() + } + lastHead = currentHead + return currentHead, true + } + + tryExtendQuery := func(query *filterQuery) *filterQuery { + for { + extQuery := f.extendRange(query) + if extQuery == nil { + return query + } + extQuery.checkLastBlockHash(f.client) + extQuery.run(f.client, nil) + if extQuery.Err == nil && len(extQuery.results) == 0 { + // query is useless now due to major reorg; abandon and continue + fmt.Println("Zero length results") + return nil + } + if extQuery.Err != nil { + extQuery.printError() + return nil + } + if len(extQuery.results) > maxFilterResultSize { + return query + } + query = extQuery + } + } + + var ( + mmQuery *filterQuery + mmRetry, mmNextRetry int + ) + +mainLoop: + for { + select { + case <-ctx.Done(): + return nil + default: + } + var query *filterQuery + if mmQuery != nil { + if mmRetry == 0 { + query = mmQuery + mmRetry = mmNextRetry + mmNextRetry *= 2 + query.checkLastBlockHash(f.client) + query.run(f.client, nil) + if query.Err != nil { + query.printError() + continue + } + fmt.Println("Retrying query from:", query.FromBlock, "to:", query.ToBlock, "results:", len(query.results)) + } else { + mmRetry-- + } + } + if query == nil { + currentHead, isNewHead := fetchHead() + if currentHead == nil { + select { + case <-ctx.Done(): + return nil + case <-time.After(time.Second): + } + continue mainLoop + } + if isNewHead { + query = f.newHeadSeedQuery(currentHead.Number.Int64()) + } else { + query = f.newQuery() + } + query.checkLastBlockHash(f.client) + query.run(f.client, nil) + if query.Err != nil { + query.printError() + continue + } + fmt.Println("New query from:", query.FromBlock, "to:", query.ToBlock, "results:", len(query.results)) + if len(query.results) == 0 || len(query.results) > maxFilterResultSize { + continue mainLoop + } + if query = tryExtendQuery(query); query == nil { + continue mainLoop + } + } + if !query.checkLastBlockHash(f.client) { + fmt.Println("Reorg during search") + continue mainLoop + } + // now we have a new query; check results + results, err := query.getResultsFromReceipts(f.client) + if err != nil { + fmt.Println("Could not fetch results from receipts", err) + continue mainLoop + } + if !query.checkLastBlockHash(f.client) { + fmt.Println("Reorg during search") + continue mainLoop + } + if !reflect.DeepEqual(query.results, results) { + fmt.Println("Results mismatch from:", query.FromBlock, "to:", query.ToBlock, "addresses:", query.Address, "topics:", query.Topics) + resShared, resGetLogs, resReceipts := compareResults(query.results, results) + fmt.Println(" shared:", len(resShared)) + fmt.Println(" only from getLogs:", len(resGetLogs), resGetLogs) + fmt.Println(" only from receipts:", len(resReceipts), resReceipts) + if mmQuery != query { + mmQuery = query + mmRetry = 0 + mmNextRetry = 1 + } + continue mainLoop + } + fmt.Println("Successful query from:", query.FromBlock, "to:", query.ToBlock, "results:", len(query.results)) + f.storeQuery(query) + } +} + +func compareResults(a, b []types.Log) (shared, onlya, onlyb []types.Log) { + for len(a) > 0 && len(b) > 0 { + if reflect.DeepEqual(a[0], b[0]) { + shared = append(shared, a[0]) + a = a[1:] + b = b[1:] + } else { + for i := 1; ; i++ { + if i >= len(a) { // b[0] not found in a + onlyb = append(onlyb, b[0]) + b = b[1:] + break + } + if i >= len(b) { // a[0] not found in b + onlya = append(onlya, a[0]) + a = a[1:] + break + } + if reflect.DeepEqual(b[0], a[i]) { // a[:i] not found in b + onlya = append(onlya, a[:i]...) + a = a[i:] + break + } + if reflect.DeepEqual(a[0], b[i]) { // b[:i] not found in a + onlyb = append(onlyb, b[:i]...) + b = b[i:] + break + } + } + } + } + onlya = append(onlya, a...) + onlyb = append(onlyb, b...) + return +} + +func getLatestHeader(client *client) (*types.Header, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + return client.Eth.HeaderByNumber(ctx, big.NewInt(int64(rpc.LatestBlockNumber))) +} + +func getHeaderByHash(client *client, hash common.Hash) (*types.Header, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + return client.Eth.HeaderByHash(ctx, hash) +} + +// newHeadSeedQuery creates a query that gets all logs from the latest head. +func (s *filterTestGen) newHeadSeedQuery(head int64) *filterQuery { + return &filterQuery{ + FromBlock: head, + ToBlock: head, + } +} + +func (fq *filterQuery) checkLastBlockHash(client *client) bool { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + header, err := client.Eth.HeaderByNumber(ctx, big.NewInt(fq.ToBlock)) + if err != nil { + fmt.Println("Cound not fetch last block hash of query number:", fq.ToBlock, "error:", err) + fq.lastBlockHash = common.Hash{} + return false + } + hash := header.Hash() + if fq.lastBlockHash == hash { + return true + } + fq.lastBlockHash = hash + return false +} + +func (fq *filterQuery) filterLog(log *types.Log) bool { + if len(fq.Address) > 0 && !slices.Contains(fq.Address, log.Address) { + return false + } + // If the to filtered topics is greater than the amount of topics in logs, skip. + if len(fq.Topics) > len(log.Topics) { + return false + } + for i, sub := range fq.Topics { + if len(sub) == 0 { + continue // empty rule set == wildcard + } + if !slices.Contains(sub, log.Topics[i]) { + return false + } + } + return true +} + +func (fq *filterQuery) getResultsFromReceipts(client *client) ([]types.Log, error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + var results []types.Log + for blockNumber := fq.FromBlock; blockNumber <= fq.ToBlock; blockNumber++ { + receipts, err := client.Eth.BlockReceipts(ctx, rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(blockNumber))) + if err != nil { + return nil, err + } + for _, receipt := range receipts { + for _, log := range receipt.Logs { + if fq.filterLog(log) { + results = append(results, *log) + } + } + } + } + return results, nil +} diff --git a/cmd/workload/filtertestgen.go b/cmd/workload/filtertestgen.go index 6d1f639819..603e3dea67 100644 --- a/cmd/workload/filtertestgen.go +++ b/cmd/workload/filtertestgen.go @@ -32,6 +32,17 @@ import ( "github.com/urfave/cli/v2" ) +const ( + // Parameter of the random filter query generator. + maxFilterRangeForTestGen = 100000000000 + maxFilterResultSize = 1000 + filterBuckets = 10 + maxFilterBucketSize = 100 + filterSeedChance = 10 + filterMergeChance = 45 + filterExtendChance = 50 +) + var ( filterGenerateCommand = &cli.Command{ Name: "filtergen", @@ -58,7 +69,7 @@ var ( // filterGenCmd is the main function of the filter tests generator. func filterGenCmd(ctx *cli.Context) error { - f := newFilterTestGen(ctx) + f := newFilterTestGen(ctx, maxFilterRangeForTestGen) lastWrite := time.Now() for { select { @@ -67,7 +78,7 @@ func filterGenCmd(ctx *cli.Context) error { default: } - f.updateFinalizedBlock() + f.setLimitToFinalizedBlock() query := f.newQuery() query.run(f.client, nil) if query.Err != nil { @@ -75,7 +86,7 @@ func filterGenCmd(ctx *cli.Context) error { exit("filter query failed") } if len(query.results) > 0 && len(query.results) <= maxFilterResultSize { - for { + for rand.Intn(100) < filterExtendChance { extQuery := f.extendRange(query) if extQuery == nil { break @@ -108,39 +119,32 @@ func filterGenCmd(ctx *cli.Context) error { // filterTestGen is the filter query test generator. type filterTestGen struct { - client *client - queryFile string + client *client + queryFile string + maxFilterRange int64 - finalizedBlock int64 - queries [filterBuckets][]*filterQuery + blockLimit int64 + queries [filterBuckets][]*filterQuery } -func newFilterTestGen(ctx *cli.Context) *filterTestGen { +func newFilterTestGen(ctx *cli.Context, maxFilterRange int64) *filterTestGen { return &filterTestGen{ - client: makeClient(ctx), - queryFile: ctx.String(filterQueryFileFlag.Name), + client: makeClient(ctx), + queryFile: ctx.String(filterQueryFileFlag.Name), + maxFilterRange: maxFilterRange, } } -func (s *filterTestGen) updateFinalizedBlock() { - s.finalizedBlock = mustGetFinalizedBlock(s.client) +func (s *filterTestGen) setLimitToFinalizedBlock() { + s.blockLimit = mustGetFinalizedBlock(s.client) } -const ( - // Parameter of the random filter query generator. - maxFilterRange = 10000000 - maxFilterResultSize = 300 - filterBuckets = 10 - maxFilterBucketSize = 100 - filterSeedChance = 10 - filterMergeChance = 45 -) - // storeQuery adds a filter query to the output file. func (s *filterTestGen) storeQuery(query *filterQuery) { query.ResultHash = new(common.Hash) *query.ResultHash = query.calculateHash() - logRatio := math.Log(float64(len(query.results))*float64(s.finalizedBlock)/float64(query.ToBlock+1-query.FromBlock)) / math.Log(float64(s.finalizedBlock)*maxFilterResultSize) + maxFilterRange := min(s.maxFilterRange, s.blockLimit) + logRatio := math.Log(float64(len(query.results))*float64(maxFilterRange)/float64(query.ToBlock+1-query.FromBlock)) / math.Log(float64(maxFilterRange)*maxFilterResultSize) bucket := int(math.Floor(logRatio * filterBuckets)) if bucket >= filterBuckets { bucket = filterBuckets - 1 @@ -160,13 +164,13 @@ func (s *filterTestGen) storeQuery(query *filterQuery) { func (s *filterTestGen) extendRange(q *filterQuery) *filterQuery { rangeLen := q.ToBlock + 1 - q.FromBlock extLen := rand.Int63n(rangeLen) + 1 - if rangeLen+extLen > s.finalizedBlock { + if rangeLen+extLen > min(s.maxFilterRange, s.blockLimit) { return nil } extBefore := min(rand.Int63n(extLen+1), q.FromBlock) extAfter := extLen - extBefore - if q.ToBlock+extAfter > s.finalizedBlock { - d := q.ToBlock + extAfter - s.finalizedBlock + if q.ToBlock+extAfter > s.blockLimit { + d := q.ToBlock + extAfter - s.blockLimit extAfter -= d if extBefore+d <= q.FromBlock { extBefore += d @@ -203,7 +207,7 @@ func (s *filterTestGen) newQuery() *filterQuery { // newSeedQuery creates a query that gets all logs in a random non-finalized block. func (s *filterTestGen) newSeedQuery() *filterQuery { - block := rand.Int63n(s.finalizedBlock + 1) + block := rand.Int63n(s.blockLimit + 1) return &filterQuery{ FromBlock: block, ToBlock: block, @@ -358,6 +362,7 @@ func (s *filterTestGen) writeQueries() { func mustGetFinalizedBlock(client *client) int64 { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() + header, err := client.Eth.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber))) if err != nil { exit(fmt.Errorf("could not fetch finalized header (error: %v)", err)) diff --git a/cmd/workload/main.go b/cmd/workload/main.go index 32618d6a79..8ac0e5b6cb 100644 --- a/cmd/workload/main.go +++ b/cmd/workload/main.go @@ -49,6 +49,7 @@ func init() { filterGenerateCommand, traceGenerateCommand, filterPerfCommand, + filterFuzzCommand, } }