From 6c0d848d9c0bd08541595be79cb36c1d80c68b56 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 6 May 2026 18:02:57 +0200 Subject: [PATCH 1/4] p2p/discover: add CrawlIterator for breadth-first FINDNODE walks Add an enode.Iterator that drives discovery by issuing a single FINDNODE per discovered peer, rotating the target through Drange sub-regions of the keyspace. Compared to RandomNodes (which wraps an alpha=3 Kademlia lookup that converges on a single target), this shape is geared for breadth: each peer is asked about a different slice of the keyspace, so aggregate coverage grows quickly without per-peer overlap. The two protocols expose different FINDNODE primitives, so the iterator threads a per-protocol queryFn: * discv5 takes a list of distances natively, so we just pass [256-d] for d in 0..Drange-1. * discv4 takes a target NodeID and replies with the K closest. To get an equivalent rotation, we pick a random pubkey whose Keccak256 starts with the desired prefix nibble. With Drange=16 that's ~16 random draws per call -- negligible compared to the network round trip. Concurrency is bounded by Workers (default 16). There is intentionally no rate limit: pacing is RTT-driven, ~Workers/RTT on the wire. Termination is implicit: when the work queue is empty AND no FINDNODE is in flight, the iterator closes its output and Next returns false. Close() short-circuits this for callers that want to bail early. Adapts the algorithm from github.com/cskiraly/fast-ethereum-crawler (dcrawl.nim) -- the prefix-rotation idea -- but drops its 1000 req/s rate limit in favour of the bounded worker pool. --- p2p/discover/crawliter.go | 343 +++++++++++++++++++++++++++++++++ p2p/discover/crawliter_test.go | 263 +++++++++++++++++++++++++ 2 files changed, 606 insertions(+) create mode 100644 p2p/discover/crawliter.go create mode 100644 p2p/discover/crawliter_test.go diff --git a/p2p/discover/crawliter.go b/p2p/discover/crawliter.go new file mode 100644 index 0000000000..a099eed30b --- /dev/null +++ b/p2p/discover/crawliter.go @@ -0,0 +1,343 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package discover + +import ( + crand "crypto/rand" + "sync" + "sync/atomic" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/discover/v4wire" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +// CrawlOptions configures a CrawlIterator. +type CrawlOptions struct { + // Workers is the number of concurrent FINDNODE calls in flight. + // If <= 0, a default of 16 is used. + Workers int + + // Seeds are the nodes to start the crawl from. If empty, the iterator + // terminates immediately. Callers should pass at least the bootnodes. + Seeds []*enode.Node + + // Drange is the number of keyspace sub-regions to rotate the FINDNODE + // target through. Defaults to 16. Internally rounded up to the next power + // of two and capped at 256 (the keyspace top-byte width); on the discv4 + // path this caps the prefix-bit grind cost, on the discv5 path it caps + // the rotation to valid distances [1, 256]. + Drange int + + // OutputCap bounds the number of newly-discovered peers buffered for the + // caller's Next() to drain. When the buffer reaches this size, workers + // pause discovery via cond.Wait until Next() drains it. This is the + // iterator's backpressure point for slow consumers and adversarial peers + // flooding fresh ENRs in FINDNODE responses. + // + // If <= 0, defaults to 16 * Workers (≥ one full FINDNODE response per + // worker). Set higher for callers that drain in large batches. + // + // Note: the dedup set and the worker work-queue are not bounded; their + // growth is implicit in any iterator that emits each unique peer exactly + // once across a long crawl. Realistic bound is the reachable DHT size + // (~1M peers, ~50 MB). + OutputCap int +} + +func (o *CrawlOptions) withDefaults() { + if o.Workers <= 0 { + o.Workers = 16 + } + if o.Drange <= 0 { + o.Drange = 16 + } + if o.Drange > 256 { + o.Drange = 256 + } + // Round up to the next power of two so the prefix-bit width is + // well-defined and Drange divides the top byte of the keyspace evenly. + if o.Drange&(o.Drange-1) != 0 { + o.Drange = nextPowerOfTwo(o.Drange) + } + if o.OutputCap <= 0 { + o.OutputCap = 16 * o.Workers + } +} + +// nextPowerOfTwo returns the smallest power of two >= n, for n in [1, 256]. +func nextPowerOfTwo(n int) int { + p := 1 + for p < n { + p <<= 1 + } + return p +} + +// CrawlIterator returns an enode.Iterator that performs a breadth-first +// crawl by issuing a single FINDNODE request per discovered peer, rotating +// the request's target through Drange sub-regions of the keyspace so that +// each peer is asked about a different slice. Compared to RandomNodes, this +// avoids the alpha-bounded Kademlia lookup convergence loop and is the right +// shape for breadth crawls (e.g. devp2p discv4 crawl). +// +// Concurrency is bounded by opts.Workers; pacing is RTT-driven, not +// rate-limited. +func (t *UDPv4) CrawlIterator(opts CrawlOptions) enode.Iterator { + opts.withDefaults() + prefixBits := log2Pow2(opts.Drange) + queryFn := func(dst *enode.Node, d int) ([]*enode.Node, error) { + addr, ok := dst.UDPEndpoint() + if !ok { + return nil, errNoUDPEndpoint + } + target := randomTargetWithPrefix(uint8(d), prefixBits) + peers, err := t.findnode(dst.ID(), addr, target) + if err != nil { + t.log.Trace("FINDNODE failed", "id", dst.ID(), "err", err) + } + return peers, err + } + return newCrawlIterator(opts, queryFn) +} + +// CrawlIterator returns an enode.Iterator that performs a breadth-first +// crawl using single-distance FINDNODE requests. See [UDPv4.CrawlIterator] +// for the algorithm; the discv5 protocol takes a list of distances directly, +// so the rotation maps to distances [256, 255, ..., 256-Drange+1]. +func (t *UDPv5) CrawlIterator(opts CrawlOptions) enode.Iterator { + queryFn := func(dst *enode.Node, d int) ([]*enode.Node, error) { + dist := uint(256 - d) + peers, err := t.Findnode(dst, []uint{dist}) + if err != nil { + t.log.Trace("FINDNODE failed", "id", dst.ID(), "err", err) + } + return peers, err + } + return newCrawlIterator(opts, queryFn) +} + +// log2Pow2 returns log2(n) for power-of-two n. The caller must ensure n is a +// power of two; non-power-of-two inputs round down. +func log2Pow2(n int) int { + bits := 0 + for n > 1 { + n >>= 1 + bits++ + } + return bits +} + +// randomTargetWithPrefix returns a v4wire.Pubkey whose Keccak256 hash has its +// top `bits` bits equal to d. On average ~2^bits draws are needed. +func randomTargetWithPrefix(d uint8, bits int) v4wire.Pubkey { + if bits == 0 { + var pk v4wire.Pubkey + crand.Read(pk[:]) + return pk + } + for { + var pk v4wire.Pubkey + crand.Read(pk[:]) + h := crypto.Keccak256(pk[:]) + if (h[0] >> (8 - bits)) == d { + return pk + } + } +} + +// crawlIterator is a breadth-first FINDNODE-driven iterator. It maintains a +// shared work queue and an output buffer; workers pop from the queue, issue +// one FINDNODE per pop, and feed any newly-seen peers back into both the +// queue and the output buffer. The iterator terminates when the queue is +// empty and no FINDNODE call is in flight. +type crawlIterator struct { + queryFn func(dst *enode.Node, d int) ([]*enode.Node, error) + drange int + outputCap int + wg sync.WaitGroup + + mu sync.Mutex + cond *sync.Cond + queue []*enode.Node // pending FINDNODE work + output []*enode.Node // emitted peers (one-time) + discovered map[enode.ID]struct{} + inflight int // queued + in-progress + closing bool // Close() called or natural termination + cur *enode.Node + + rotation atomic.Uint64 +} + +func newCrawlIterator(opts CrawlOptions, queryFn func(*enode.Node, int) ([]*enode.Node, error)) *crawlIterator { + opts.withDefaults() + it := &crawlIterator{ + queryFn: queryFn, + drange: opts.Drange, + outputCap: opts.OutputCap, + discovered: make(map[enode.ID]struct{}), + } + it.cond = sync.NewCond(&it.mu) + + // Seed directly into the queue/output. Going through discover() would + // block on the OutputCap if len(Seeds) > OutputCap, deadlocking the + // constructor since workers haven't started and Next() hasn't been + // called yet. + for _, n := range opts.Seeds { + if n == nil { + continue + } + if _, seen := it.discovered[n.ID()]; seen { + continue + } + it.discovered[n.ID()] = struct{}{} + it.queue = append(it.queue, n) + it.output = append(it.output, n) + it.inflight++ + } + + // Workers. + for i := 0; i < opts.Workers; i++ { + it.wg.Add(1) + go it.worker() + } + return it +} + +// discover records a newly-seen peer. Acquires mu internally; callers +// MUST NOT hold it. If output is at capacity, waits on cond until Next() +// drains it; this is the iterator's backpressure point. +func (it *crawlIterator) discover(n *enode.Node) { + if n == nil { + return + } + it.mu.Lock() + defer it.mu.Unlock() + for { + if it.closing { + return + } + if _, seen := it.discovered[n.ID()]; seen { + return + } + if it.outputCap > 0 && len(it.output) >= it.outputCap { + // Pause discovery until the consumer drains output. Releases mu + // while waiting so other workers can keep popping from queue and + // the consumer can pop from output. + it.cond.Wait() + continue + } + break + } + it.discovered[n.ID()] = struct{}{} + it.queue = append(it.queue, n) + it.output = append(it.output, n) + it.inflight++ + it.cond.Broadcast() +} + +// popWork blocks until either a peer is available to query, or the iterator +// has nothing left to do. Returns (nil, false) on termination. +func (it *crawlIterator) popWork() (*enode.Node, bool) { + it.mu.Lock() + defer it.mu.Unlock() + for { + if it.closing { + return nil, false + } + if len(it.queue) > 0 { + n := it.queue[0] + it.queue = it.queue[1:] + return n, true + } + if it.inflight == 0 { + // Queue empty AND nothing in flight: natural termination. + it.closing = true + it.cond.Broadcast() + return nil, false + } + it.cond.Wait() + } +} + +// finishWork is called by workers after their FINDNODE response has been +// processed. It decrements the in-flight counter and broadcasts so a possibly +// idle worker can re-evaluate termination. +func (it *crawlIterator) finishWork() { + it.mu.Lock() + defer it.mu.Unlock() + it.inflight-- + if it.inflight == 0 && len(it.queue) == 0 { + it.closing = true + it.cond.Broadcast() + } +} + +func (it *crawlIterator) worker() { + defer it.wg.Done() + for { + n, ok := it.popWork() + if !ok { + return + } + d := int(it.rotation.Add(1)-1) % it.drange + peers, _ := it.queryFn(n, d) + for _, p := range peers { + it.discover(p) + } + it.finishWork() + } +} + +// Next blocks until a newly-discovered peer is available, then returns true +// and makes the peer accessible via Node. Returns false when the iterator +// has terminated. +func (it *crawlIterator) Next() bool { + it.mu.Lock() + defer it.mu.Unlock() + for len(it.output) == 0 { + if it.closing { + return false + } + it.cond.Wait() + } + it.cur = it.output[0] + it.output = it.output[1:] + // Wake any worker stalled in discover() because output was at capacity. + it.cond.Broadcast() + return true +} + +// Node returns the most recent peer surfaced by Next. +func (it *crawlIterator) Node() *enode.Node { + it.mu.Lock() + defer it.mu.Unlock() + return it.cur +} + +// Close terminates the iterator, unblocking any goroutines waiting in Next. +// Workers exit at their next poll point; in-flight FINDNODE responses are +// dropped. +func (it *crawlIterator) Close() { + it.mu.Lock() + if !it.closing { + it.closing = true + it.cond.Broadcast() + } + it.mu.Unlock() + it.wg.Wait() +} diff --git a/p2p/discover/crawliter_test.go b/p2p/discover/crawliter_test.go new file mode 100644 index 0000000000..42a4b1b9ca --- /dev/null +++ b/p2p/discover/crawliter_test.go @@ -0,0 +1,263 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package discover + +import ( + "crypto/ecdsa" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/enr" +) + +// makeTestNodes returns n deterministically-generated nodes so callers can +// build small synthetic graphs. +func makeTestNodes(t *testing.T, n int) []*enode.Node { + t.Helper() + nodes := make([]*enode.Node, n) + for i := 0; i < n; i++ { + var key *ecdsa.PrivateKey + var err error + key, err = crypto.GenerateKey() + if err != nil { + t.Fatal(err) + } + var r enr.Record + r.Set(enr.IPv4{127, 0, 0, 1}) + r.Set(enr.UDP(30300 + i)) + if err := enode.SignV4(&r, key); err != nil { + t.Fatal(err) + } + nodes[i], err = enode.New(enode.ValidSchemes, &r) + if err != nil { + t.Fatal(err) + } + } + return nodes +} + +// TestCrawlIteratorTerminates verifies that the iterator emits every node in +// a finite synthetic graph, exactly once, and then returns false from Next. +func TestCrawlIteratorTerminates(t *testing.T) { + nodes := makeTestNodes(t, 50) + + // Build a synthetic neighbour map: each node knows the next 5 nodes + // (cyclic). The crawl should reach all 50 from any single seed. + neighbours := make(map[enode.ID][]*enode.Node, len(nodes)) + for i, n := range nodes { + var ns []*enode.Node + for k := 1; k <= 5; k++ { + ns = append(ns, nodes[(i+k)%len(nodes)]) + } + neighbours[n.ID()] = ns + } + + var calls atomic.Int64 + queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) { + calls.Add(1) + return neighbours[dst.ID()], nil + } + + it := newCrawlIterator(CrawlOptions{ + Workers: 4, + Seeds: []*enode.Node{nodes[0]}, + Drange: 16, + }, queryFn) + + seen := make(map[enode.ID]int) + done := make(chan struct{}) + go func() { + for it.Next() { + seen[it.Node().ID()]++ + } + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("iterator did not terminate within 5s") + } + + if got := len(seen); got != len(nodes) { + t.Fatalf("emitted %d distinct nodes, want %d", got, len(nodes)) + } + for id, c := range seen { + if c != 1 { + t.Errorf("node %x emitted %d times, want 1", id[:4], c) + } + } + // Every distinct node should have been queried once. + if got, want := calls.Load(), int64(len(nodes)); got != want { + t.Errorf("queryFn invoked %d times, want %d", got, want) + } +} + +// TestCrawlIteratorClose verifies that calling Close while the iterator is +// still discovering nodes unblocks Next and stops workers cleanly. +func TestCrawlIteratorClose(t *testing.T) { + nodes := makeTestNodes(t, 20) + + // Slow queryFn so we can interrupt mid-crawl. + queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) { + time.Sleep(50 * time.Millisecond) + var ns []*enode.Node + for i, n := range nodes { + if n.ID() == dst.ID() { + ns = append(ns, nodes[(i+1)%len(nodes)]) + break + } + } + return ns, nil + } + + it := newCrawlIterator(CrawlOptions{ + Workers: 2, + Seeds: []*enode.Node{nodes[0]}, + Drange: 16, + }, queryFn) + + // Drain a few nodes, then Close. + go func() { + time.Sleep(50 * time.Millisecond) + it.Close() + }() + + var wg sync.WaitGroup + wg.Add(1) + done := make(chan struct{}) + go func() { + defer wg.Done() + for it.Next() { + } + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Next did not return after Close") + } + wg.Wait() +} + +// TestCrawlIteratorOutputCap verifies the backpressure invariant: +// the size of the output buffer never exceeds OutputCap regardless of +// how fast the queryFn returns peers. +func TestCrawlIteratorOutputCap(t *testing.T) { + const cap = 8 + nodes := makeTestNodes(t, 200) + + // Each node maps to the next in the chain, so discovery is unbounded + // from the iterator's perspective until we've covered the cycle. + queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) { + for i, n := range nodes { + if n.ID() == dst.ID() { + // Return 4 fresh neighbours so the producer outpaces the + // consumer (we sleep between Next() calls below). + return []*enode.Node{ + nodes[(i+1)%len(nodes)], + nodes[(i+2)%len(nodes)], + nodes[(i+3)%len(nodes)], + nodes[(i+4)%len(nodes)], + }, nil + } + } + return nil, nil + } + + itAny := newCrawlIterator(CrawlOptions{ + Workers: 8, + Seeds: []*enode.Node{nodes[0]}, + Drange: 16, + OutputCap: cap, + }, queryFn) + it := itAny // *crawlIterator + + var maxObserved int + check := func() { + it.mu.Lock() + if l := len(it.output); l > maxObserved { + maxObserved = l + } + it.mu.Unlock() + } + + // Slow consumer: read with delays so workers must back off. + done := make(chan struct{}) + go func() { + defer close(done) + for it.Next() { + check() + time.Sleep(2 * time.Millisecond) + } + }() + + select { + case <-done: + case <-time.After(10 * time.Second): + it.Close() + t.Fatal("iterator did not terminate within 10s") + } + + if maxObserved > cap { + t.Errorf("output buffer reached %d, want <= cap=%d", maxObserved, cap) + } +} + +// TestCrawlIteratorRotation verifies that the d argument passed to queryFn +// rotates through 0..Drange-1. +func TestCrawlIteratorRotation(t *testing.T) { + nodes := makeTestNodes(t, 64) + // Each node has the next 1 as neighbour, so the crawl makes exactly + // len(nodes) FINDNODE calls in a chain. + neighbours := make(map[enode.ID]*enode.Node, len(nodes)) + for i, n := range nodes { + neighbours[n.ID()] = nodes[(i+1)%len(nodes)] + } + + var ( + mu sync.Mutex + seenDs = make(map[int]int) + ) + queryFn := func(dst *enode.Node, d int) ([]*enode.Node, error) { + mu.Lock() + seenDs[d]++ + mu.Unlock() + return []*enode.Node{neighbours[dst.ID()]}, nil + } + + it := newCrawlIterator(CrawlOptions{ + Workers: 1, // single worker → strictly increasing d + Seeds: []*enode.Node{nodes[0]}, + Drange: 16, + }, queryFn) + for it.Next() { + } + + mu.Lock() + defer mu.Unlock() + for d := 0; d < 16; d++ { + if seenDs[d] == 0 { + t.Errorf("rotation index %d never used", d) + } + } +} From dfa3bbffae66589bd7869f0be8f8f675386b3b13 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 6 May 2026 18:07:06 +0200 Subject: [PATCH 2/4] cmd/devp2p: add --mode flag selecting crawl iterator Wire the new discover.CrawlIterator into devp2p discv4/discv5 crawl behind a --mode flag (default 'lookup', i.e. existing behaviour). devp2p discv4 crawl --mode=fast --timeout 30s nodes.json devp2p discv5 crawl --mode=fast --timeout 30s nodes.json Smoke test against mainnet bootnodes for 30s on a residential link yields ~2.4x more nodes under --mode=fast (587 vs 240 in one run), with the new per-tick LogDist log showing a much more uniform distribution of query distances. Workers default to the existing --parallel value (16); pacing is RTT-driven. The 'lookup' default keeps existing behaviour byte-identical for any operator running the saved devp2p discv4 crawl from a script. --- cmd/devp2p/discv4cmd.go | 29 ++++++++++++++++++++++++++--- cmd/devp2p/discv5cmd.go | 23 ++++++++++++++++++++++- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/cmd/devp2p/discv4cmd.go b/cmd/devp2p/discv4cmd.go index 84c7ef0c44..388e0c523a 100644 --- a/cmd/devp2p/discv4cmd.go +++ b/cmd/devp2p/discv4cmd.go @@ -91,7 +91,7 @@ var ( Name: "crawl", Usage: "Updates a nodes.json file with random nodes found in the DHT", Action: discv4Crawl, - Flags: slices.Concat(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag, crawlParallelismFlag}), + Flags: slices.Concat(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag, crawlParallelismFlag, crawlModeFlag}), } discv4TestCommand = &cli.Command{ Name: "test", @@ -135,9 +135,14 @@ var ( } crawlParallelismFlag = &cli.IntFlag{ Name: "parallel", - Usage: "How many parallel discoveries to attempt.", + Usage: "How many parallel discoveries to attempt. Used both as the crawler harness's RequestENR worker count and (under -mode=fast) as the FINDNODE iterator's worker count.", Value: 16, } + crawlModeFlag = &cli.StringFlag{ + Name: "mode", + Usage: "Crawl iterator mode: 'lookup' (alpha-bounded Kademlia lookup) or 'fast' (one FINDNODE per peer with rotating prefix; sized by -parallel).", + Value: "lookup", + } remoteEnodeFlag = &cli.StringFlag{ Name: "remote", Usage: "Enode of the remote node under test", @@ -259,7 +264,11 @@ func discv4Crawl(ctx *cli.Context) error { disc, config := startV4(ctx) defer disc.Close() - c, err := newCrawler(inputSet, config.Bootnodes, disc, disc.RandomNodes()) + iter, err := newDiscv4CrawlIterator(disc, config.Bootnodes, ctx.String(crawlModeFlag.Name), ctx.Int(crawlParallelismFlag.Name)) + if err != nil { + return err + } + c, err := newCrawler(inputSet, config.Bootnodes, disc, iter) if err != nil { return err } @@ -269,6 +278,20 @@ func discv4Crawl(ctx *cli.Context) error { return nil } +func newDiscv4CrawlIterator(disc *discover.UDPv4, bootnodes []*enode.Node, mode string, parallel int) (enode.Iterator, error) { + switch mode { + case "", "lookup": + return disc.RandomNodes(), nil + case "fast": + return disc.CrawlIterator(discover.CrawlOptions{ + Workers: parallel, + Seeds: bootnodes, + }), nil + default: + return nil, fmt.Errorf("unknown -%s value %q (want 'lookup' or 'fast')", crawlModeFlag.Name, mode) + } +} + // discv4Test runs the protocol test suite. func discv4Test(ctx *cli.Context) error { // Configure test package globals. diff --git a/cmd/devp2p/discv5cmd.go b/cmd/devp2p/discv5cmd.go index dd253dd082..7497cccc55 100644 --- a/cmd/devp2p/discv5cmd.go +++ b/cmd/devp2p/discv5cmd.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/cmd/devp2p/internal/v5test" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/urfave/cli/v2" ) @@ -58,6 +59,8 @@ var ( Action: discv5Crawl, Flags: slices.Concat(discoveryNodeFlags, []cli.Flag{ crawlTimeoutFlag, + crawlParallelismFlag, + crawlModeFlag, }), } discv5TestCommand = &cli.Command{ @@ -111,7 +114,11 @@ func discv5Crawl(ctx *cli.Context) error { disc, config := startV5(ctx) defer disc.Close() - c, err := newCrawler(inputSet, config.Bootnodes, disc, disc.RandomNodes()) + iter, err := newDiscv5CrawlIterator(disc, config.Bootnodes, ctx.String(crawlModeFlag.Name), ctx.Int(crawlParallelismFlag.Name)) + if err != nil { + return err + } + c, err := newCrawler(inputSet, config.Bootnodes, disc, iter) if err != nil { return err } @@ -121,6 +128,20 @@ func discv5Crawl(ctx *cli.Context) error { return nil } +func newDiscv5CrawlIterator(disc *discover.UDPv5, bootnodes []*enode.Node, mode string, parallel int) (enode.Iterator, error) { + switch mode { + case "", "lookup": + return disc.RandomNodes(), nil + case "fast": + return disc.CrawlIterator(discover.CrawlOptions{ + Workers: parallel, + Seeds: bootnodes, + }), nil + default: + return nil, fmt.Errorf("unknown -%s value %q (want 'lookup' or 'fast')", crawlModeFlag.Name, mode) + } +} + // discv5Test runs the protocol test suite. func discv5Test(ctx *cli.Context) error { suite := &v5test.Suite{ From b026ef6bb7fa9b59fda3640d8e09784a6193fa24 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 7 May 2026 08:56:23 +0200 Subject: [PATCH 3/4] p2p/discover: drop discv4 prefix-bit grind from CrawlIterator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The original CrawlIterator on the discv4 path generated FINDNODE targets by grinding random pubkeys until their Keccak256 had a specific top-N-bit prefix matching a per-call rotation index, then sending them. The aim was to anchor each peer's response to a different /16 region of the global keyspace. Empirically (3 x 5-minute runs against mainnet bootnodes): mode total mean ± std mainnet mean ± std fast (grind) 5714 ± 117 549 ± 33 fast-random 5306 ± 366 521 ± 124 Means are within 1σ of each other. The grind's only measurable benefit is reduced run-to-run variance, not higher yield. For long-running curated crawls (the production use case for cmd/devp2p) the variance amortises away, so the simplification is worth taking. Replace the grind with a plain crand.Read on the v4 target, drop the randomTargetWithPrefix helper, log2Pow2 helper, and the v4-side prefix-bit math from withDefaults. Drange becomes a v5-only knob and its doc is updated to say so; the power-of-two requirement is gone. discv5 is unchanged: it uses native distance rotation, not target hashes, and was never affected by the grind. --- p2p/discover/crawliter.go | 72 ++++++++------------------------------- 1 file changed, 14 insertions(+), 58 deletions(-) diff --git a/p2p/discover/crawliter.go b/p2p/discover/crawliter.go index a099eed30b..8a7fc5c92b 100644 --- a/p2p/discover/crawliter.go +++ b/p2p/discover/crawliter.go @@ -21,7 +21,6 @@ import ( "sync" "sync/atomic" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/discover/v4wire" "github.com/ethereum/go-ethereum/p2p/enode" ) @@ -36,11 +35,13 @@ type CrawlOptions struct { // terminates immediately. Callers should pass at least the bootnodes. Seeds []*enode.Node - // Drange is the number of keyspace sub-regions to rotate the FINDNODE - // target through. Defaults to 16. Internally rounded up to the next power - // of two and capped at 256 (the keyspace top-byte width); on the discv4 - // path this caps the prefix-bit grind cost, on the discv5 path it caps - // the rotation to valid distances [1, 256]. + // Drange is the number of FINDNODE rotation slots per peer. Has effect + // only on the discv5 path, where each rotation slot d maps to the + // distance value 256-d (so Drange=16 covers distances 256, 255, ..., 241). + // On the discv4 path it has no effect: targets are random NodeIDs and + // the rotation counter is unused. + // + // Defaults to 16. Capped at 256. Drange int // OutputCap bounds the number of newly-discovered peers buffered for the @@ -69,43 +70,27 @@ func (o *CrawlOptions) withDefaults() { if o.Drange > 256 { o.Drange = 256 } - // Round up to the next power of two so the prefix-bit width is - // well-defined and Drange divides the top byte of the keyspace evenly. - if o.Drange&(o.Drange-1) != 0 { - o.Drange = nextPowerOfTwo(o.Drange) - } if o.OutputCap <= 0 { o.OutputCap = 16 * o.Workers } } -// nextPowerOfTwo returns the smallest power of two >= n, for n in [1, 256]. -func nextPowerOfTwo(n int) int { - p := 1 - for p < n { - p <<= 1 - } - return p -} - // CrawlIterator returns an enode.Iterator that performs a breadth-first -// crawl by issuing a single FINDNODE request per discovered peer, rotating -// the request's target through Drange sub-regions of the keyspace so that -// each peer is asked about a different slice. Compared to RandomNodes, this -// avoids the alpha-bounded Kademlia lookup convergence loop and is the right -// shape for breadth crawls (e.g. devp2p discv4 crawl). +// crawl by issuing a single FINDNODE request per discovered peer, with a +// fresh random target each call. Compared to RandomNodes, this avoids the +// alpha-bounded Kademlia lookup convergence loop and is the right shape +// for breadth crawls (e.g. devp2p discv4 crawl). // // Concurrency is bounded by opts.Workers; pacing is RTT-driven, not // rate-limited. func (t *UDPv4) CrawlIterator(opts CrawlOptions) enode.Iterator { - opts.withDefaults() - prefixBits := log2Pow2(opts.Drange) - queryFn := func(dst *enode.Node, d int) ([]*enode.Node, error) { + queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) { addr, ok := dst.UDPEndpoint() if !ok { return nil, errNoUDPEndpoint } - target := randomTargetWithPrefix(uint8(d), prefixBits) + var target v4wire.Pubkey + crand.Read(target[:]) peers, err := t.findnode(dst.ID(), addr, target) if err != nil { t.log.Trace("FINDNODE failed", "id", dst.ID(), "err", err) @@ -131,35 +116,6 @@ func (t *UDPv5) CrawlIterator(opts CrawlOptions) enode.Iterator { return newCrawlIterator(opts, queryFn) } -// log2Pow2 returns log2(n) for power-of-two n. The caller must ensure n is a -// power of two; non-power-of-two inputs round down. -func log2Pow2(n int) int { - bits := 0 - for n > 1 { - n >>= 1 - bits++ - } - return bits -} - -// randomTargetWithPrefix returns a v4wire.Pubkey whose Keccak256 hash has its -// top `bits` bits equal to d. On average ~2^bits draws are needed. -func randomTargetWithPrefix(d uint8, bits int) v4wire.Pubkey { - if bits == 0 { - var pk v4wire.Pubkey - crand.Read(pk[:]) - return pk - } - for { - var pk v4wire.Pubkey - crand.Read(pk[:]) - h := crypto.Keccak256(pk[:]) - if (h[0] >> (8 - bits)) == d { - return pk - } - } -} - // crawlIterator is a breadth-first FINDNODE-driven iterator. It maintains a // shared work queue and an output buffer; workers pop from the queue, issue // one FINDNODE per pop, and feed any newly-seen peers back into both the From 33785aab21725e642cf88b67c567d55762ce52a8 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 7 May 2026 09:38:59 +0200 Subject: [PATCH 4/4] p2p/discover: document BFS choice, add RandomWorkers split Two related changes to CrawlIterator: (1) Add a file-level commentary block explaining why the iterator uses a FIFO queue (BFS over the FINDNODE-response graph) and what it is *not* suitable for (target-directed lookup -- use RandomNodes() / the alpha=3 lookup iterator for that). The choice was inherited from dcrawl.nim without explicit reasoning; making it visible avoids future readers re-deriving the survey-vs-lookup distinction. The BFS rationale is two-fold: - Coverage: BFS reaches every peer within N hops of the seeds in order, so a time-bounded run produces a representative sample of the reachable graph rather than a deep tendril through one sub-region. - Adversarial resilience: a peer returning malicious "neighbour" claims, dead-end peers, or eclipse-style sub-graphs cannot monopolise the worker pool, because pending work from other branches sits ahead of the attacker's responses in the queue. DFS would amplify each of these attacks. (2) Add a RandomWorkers field to CrawlOptions. Of the Workers-sized worker pool, the first (Workers - RandomWorkers) workers pop the FIFO front (BFS), while RandomWorkers workers pop a uniform-random queue index via swap-and-pop (O(1)). Total worker count is unchanged. Default RandomWorkers = Workers / 4 (4 of 16 with the default parallelism). At this ratio: - Cold-start cost is negligible: 12 of 16 workers still drain FIFO, so the first ~1s of a fresh crawl behaves like pure BFS. - 25% of pops break strict FIFO ordering, providing a mild anti-fingerprint defence against an attacker who could otherwise predict our processing order from the contents of their own FINDNODE responses. Operators can override per-run via the new --random-workers CLI flag on `devp2p discv4 crawl` and `discv5 crawl`. Negative value forces pure BFS; positive value selects an explicit count. The new TestCrawlIteratorRandomWorkers covers four pop-policy configurations (all-fifo, all-random, half-half, default) and asserts the iterator still terminates and emits each node exactly once in each. --- cmd/devp2p/discv4cmd.go | 16 ++++-- cmd/devp2p/discv5cmd.go | 10 ++-- p2p/discover/crawliter.go | 91 +++++++++++++++++++++++++++++++--- p2p/discover/crawliter_test.go | 63 +++++++++++++++++++++++ 4 files changed, 164 insertions(+), 16 deletions(-) diff --git a/cmd/devp2p/discv4cmd.go b/cmd/devp2p/discv4cmd.go index 388e0c523a..945da3bc53 100644 --- a/cmd/devp2p/discv4cmd.go +++ b/cmd/devp2p/discv4cmd.go @@ -91,7 +91,7 @@ var ( Name: "crawl", Usage: "Updates a nodes.json file with random nodes found in the DHT", Action: discv4Crawl, - Flags: slices.Concat(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag, crawlParallelismFlag, crawlModeFlag}), + Flags: slices.Concat(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag, crawlParallelismFlag, crawlModeFlag, crawlRandomWorkersFlag}), } discv4TestCommand = &cli.Command{ Name: "test", @@ -143,6 +143,11 @@ var ( Usage: "Crawl iterator mode: 'lookup' (alpha-bounded Kademlia lookup) or 'fast' (one FINDNODE per peer with rotating prefix; sized by -parallel).", Value: "lookup", } + crawlRandomWorkersFlag = &cli.IntFlag{ + Name: "random-workers", + Usage: "Of the -parallel workers in -mode=fast, how many pop a random queue item rather than the FIFO front. 0 = library default (parallel/4); negative = pure BFS.", + Value: 0, + } remoteEnodeFlag = &cli.StringFlag{ Name: "remote", Usage: "Enode of the remote node under test", @@ -264,7 +269,7 @@ func discv4Crawl(ctx *cli.Context) error { disc, config := startV4(ctx) defer disc.Close() - iter, err := newDiscv4CrawlIterator(disc, config.Bootnodes, ctx.String(crawlModeFlag.Name), ctx.Int(crawlParallelismFlag.Name)) + iter, err := newDiscv4CrawlIterator(disc, config.Bootnodes, ctx.String(crawlModeFlag.Name), ctx.Int(crawlParallelismFlag.Name), ctx.Int(crawlRandomWorkersFlag.Name)) if err != nil { return err } @@ -278,14 +283,15 @@ func discv4Crawl(ctx *cli.Context) error { return nil } -func newDiscv4CrawlIterator(disc *discover.UDPv4, bootnodes []*enode.Node, mode string, parallel int) (enode.Iterator, error) { +func newDiscv4CrawlIterator(disc *discover.UDPv4, bootnodes []*enode.Node, mode string, parallel, randomWorkers int) (enode.Iterator, error) { switch mode { case "", "lookup": return disc.RandomNodes(), nil case "fast": return disc.CrawlIterator(discover.CrawlOptions{ - Workers: parallel, - Seeds: bootnodes, + Workers: parallel, + RandomWorkers: randomWorkers, + Seeds: bootnodes, }), nil default: return nil, fmt.Errorf("unknown -%s value %q (want 'lookup' or 'fast')", crawlModeFlag.Name, mode) diff --git a/cmd/devp2p/discv5cmd.go b/cmd/devp2p/discv5cmd.go index 7497cccc55..27237a6236 100644 --- a/cmd/devp2p/discv5cmd.go +++ b/cmd/devp2p/discv5cmd.go @@ -61,6 +61,7 @@ var ( crawlTimeoutFlag, crawlParallelismFlag, crawlModeFlag, + crawlRandomWorkersFlag, }), } discv5TestCommand = &cli.Command{ @@ -114,7 +115,7 @@ func discv5Crawl(ctx *cli.Context) error { disc, config := startV5(ctx) defer disc.Close() - iter, err := newDiscv5CrawlIterator(disc, config.Bootnodes, ctx.String(crawlModeFlag.Name), ctx.Int(crawlParallelismFlag.Name)) + iter, err := newDiscv5CrawlIterator(disc, config.Bootnodes, ctx.String(crawlModeFlag.Name), ctx.Int(crawlParallelismFlag.Name), ctx.Int(crawlRandomWorkersFlag.Name)) if err != nil { return err } @@ -128,14 +129,15 @@ func discv5Crawl(ctx *cli.Context) error { return nil } -func newDiscv5CrawlIterator(disc *discover.UDPv5, bootnodes []*enode.Node, mode string, parallel int) (enode.Iterator, error) { +func newDiscv5CrawlIterator(disc *discover.UDPv5, bootnodes []*enode.Node, mode string, parallel, randomWorkers int) (enode.Iterator, error) { switch mode { case "", "lookup": return disc.RandomNodes(), nil case "fast": return disc.CrawlIterator(discover.CrawlOptions{ - Workers: parallel, - Seeds: bootnodes, + Workers: parallel, + RandomWorkers: randomWorkers, + Seeds: bootnodes, }), nil default: return nil, fmt.Errorf("unknown -%s value %q (want 'lookup' or 'fast')", crawlModeFlag.Name, mode) diff --git a/p2p/discover/crawliter.go b/p2p/discover/crawliter.go index 8a7fc5c92b..9be1e75271 100644 --- a/p2p/discover/crawliter.go +++ b/p2p/discover/crawliter.go @@ -18,6 +18,7 @@ package discover import ( crand "crypto/rand" + "math/rand/v2" "sync" "sync/atomic" @@ -25,6 +26,37 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" ) +// CrawlIterator performs a breadth-first crawl of the discv4/discv5 +// FINDNODE-response graph. Each worker pops a peer from the work queue, +// issues one FINDNODE call against it, and feeds any newly-seen peers +// back into the queue (and the iterator's output buffer). +// +// BFS (FIFO queue) was chosen over DFS or target-directed lookup for two +// reasons that align with the iterator's intended use case (survey-style +// crawls — devp2p crawl, geth dial-candidate discovery): +// +// - Coverage: BFS reaches every peer within N hops of the seeds in +// order, so a time-bounded run produces a representative sample of +// the reachable graph rather than a deep tendril through one +// sub-region. +// - Adversarial resilience: a peer returning malicious "neighbour" +// claims, dead-end peers, or eclipse-style sub-graphs cannot +// monopolise the worker pool, because pending work from other +// branches sits ahead of the attacker's responses in the queue. +// DFS would amplify each of these attacks. +// +// CrawlIterator is NOT a target-directed search. To find a specific node +// (or the K closest to a known target), use the lookup-based iterator +// returned by [UDPv4.RandomNodes] / [UDPv5.RandomNodes] instead — those +// run an alpha-bounded Kademlia lookup, which is the right shape for +// "find peer X" but the wrong shape for "survey the network". +// +// Pop policy can be tuned via [CrawlOptions.RandomWorkers]: any subset +// of the worker pool can be configured to pop a uniform-random queue +// item instead of the FIFO front, breaking strict ordering as a mild +// anti-fingerprint defence. The remaining workers pop FIFO and preserve +// the BFS character. See the field doc for details. + // CrawlOptions configures a CrawlIterator. type CrawlOptions struct { // Workers is the number of concurrent FINDNODE calls in flight. @@ -58,6 +90,23 @@ type CrawlOptions struct { // once across a long crawl. Realistic bound is the reachable DHT size // (~1M peers, ~50 MB). OutputCap int + + // RandomWorkers is the number of workers in the pool that pop a uniform- + // random queue item instead of the FIFO front. The remaining + // Workers - RandomWorkers workers behave as today, popping front. Total + // worker count is unchanged at Workers; only the BFS-vs-random split + // differs. + // + // Zero (the struct zero-value) selects the library default of + // Workers/4 rounded down. Pass a negative value (e.g. -1) to force + // pure BFS with zero random workers. A positive value > Workers is + // clamped to Workers (i.e. fully-random pop with no FIFO workers). + // + // The default Workers/4 mix preserves the BFS coverage character (cold + // start is dominated by the FIFO majority) while breaking strict FIFO + // ordering, a mild anti-fingerprint defence. See the file-level comment + // for details. + RandomWorkers int } func (o *CrawlOptions) withDefaults() { @@ -73,6 +122,17 @@ func (o *CrawlOptions) withDefaults() { if o.OutputCap <= 0 { o.OutputCap = 16 * o.Workers } + switch { + case o.RandomWorkers < 0: + // Negative means: explicit pure BFS, no random workers. + o.RandomWorkers = 0 + case o.RandomWorkers == 0: + // Zero (struct zero-value) means: library default. + o.RandomWorkers = o.Workers / 4 + case o.RandomWorkers > o.Workers: + // Clamp; can't have more random workers than total workers. + o.RandomWorkers = o.Workers + } } // CrawlIterator returns an enode.Iterator that performs a breadth-first @@ -166,10 +226,17 @@ func newCrawlIterator(opts CrawlOptions, queryFn func(*enode.Node, int) ([]*enod it.inflight++ } - // Workers. - for i := 0; i < opts.Workers; i++ { + // Workers. The first (Workers - RandomWorkers) workers pop FIFO; the + // remaining RandomWorkers pop a uniform-random queue index. Both share + // the same queue, mutex, cond, and termination semantics. + fifoCount := opts.Workers - opts.RandomWorkers + for i := 0; i < fifoCount; i++ { it.wg.Add(1) - go it.worker() + go it.worker(false) + } + for i := 0; i < opts.RandomWorkers; i++ { + it.wg.Add(1) + go it.worker(true) } return it } @@ -207,8 +274,10 @@ func (it *crawlIterator) discover(n *enode.Node) { } // popWork blocks until either a peer is available to query, or the iterator -// has nothing left to do. Returns (nil, false) on termination. -func (it *crawlIterator) popWork() (*enode.Node, bool) { +// has nothing left to do. Returns (nil, false) on termination. If random is +// true, pops a uniform-random index via swap-and-pop; otherwise pops the +// FIFO front. Both modes share the same termination semantics. +func (it *crawlIterator) popWork(random bool) (*enode.Node, bool) { it.mu.Lock() defer it.mu.Unlock() for { @@ -216,6 +285,14 @@ func (it *crawlIterator) popWork() (*enode.Node, bool) { return nil, false } if len(it.queue) > 0 { + if random { + i := rand.IntN(len(it.queue)) + n := it.queue[i] + // Swap-and-pop: O(1) removal from middle. + it.queue[i] = it.queue[len(it.queue)-1] + it.queue = it.queue[:len(it.queue)-1] + return n, true + } n := it.queue[0] it.queue = it.queue[1:] return n, true @@ -243,10 +320,10 @@ func (it *crawlIterator) finishWork() { } } -func (it *crawlIterator) worker() { +func (it *crawlIterator) worker(random bool) { defer it.wg.Done() for { - n, ok := it.popWork() + n, ok := it.popWork(random) if !ok { return } diff --git a/p2p/discover/crawliter_test.go b/p2p/discover/crawliter_test.go index 42a4b1b9ca..f646dc01bb 100644 --- a/p2p/discover/crawliter_test.go +++ b/p2p/discover/crawliter_test.go @@ -223,6 +223,69 @@ func TestCrawlIteratorOutputCap(t *testing.T) { } } +// TestCrawlIteratorRandomWorkers verifies that with a mixed FIFO + random +// worker pool, the iterator still terminates and emits each node exactly +// once on the synthetic graph from TestCrawlIteratorTerminates. +func TestCrawlIteratorRandomWorkers(t *testing.T) { + for _, tc := range []struct { + name string + workers int + randomWorkers int + }{ + {"all-fifo", 4, -1}, // -1 → pure BFS + {"all-random", 4, 4}, // all workers random-pop + {"half-half", 8, 4}, // 4 FIFO + 4 random + {"default", 16, 0}, // 0 → library default = 4 random of 16 + } { + t.Run(tc.name, func(t *testing.T) { + nodes := makeTestNodes(t, 50) + neighbours := make(map[enode.ID][]*enode.Node, len(nodes)) + for i, n := range nodes { + var ns []*enode.Node + for k := 1; k <= 5; k++ { + ns = append(ns, nodes[(i+k)%len(nodes)]) + } + neighbours[n.ID()] = ns + } + var calls atomic.Int64 + queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) { + calls.Add(1) + return neighbours[dst.ID()], nil + } + it := newCrawlIterator(CrawlOptions{ + Workers: tc.workers, + RandomWorkers: tc.randomWorkers, + Seeds: []*enode.Node{nodes[0]}, + Drange: 16, + }, queryFn) + seen := make(map[enode.ID]int) + done := make(chan struct{}) + go func() { + for it.Next() { + seen[it.Node().ID()]++ + } + close(done) + }() + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("iterator did not terminate within 5s") + } + if got := len(seen); got != len(nodes) { + t.Fatalf("emitted %d distinct nodes, want %d", got, len(nodes)) + } + for id, c := range seen { + if c != 1 { + t.Errorf("node %x emitted %d times, want 1", id[:4], c) + } + } + if got, want := calls.Load(), int64(len(nodes)); got != want { + t.Errorf("queryFn invoked %d times, want %d", got, want) + } + }) + } +} + // TestCrawlIteratorRotation verifies that the d argument passed to queryFn // rotates through 0..Drange-1. func TestCrawlIteratorRotation(t *testing.T) {