mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-24 08:49:29 +00:00
Merge 33785aab21 into 12eabbd76d
This commit is contained in:
commit
0aeb67705e
4 changed files with 758 additions and 4 deletions
|
|
@ -91,7 +91,7 @@ var (
|
||||||
Name: "crawl",
|
Name: "crawl",
|
||||||
Usage: "Updates a nodes.json file with random nodes found in the DHT",
|
Usage: "Updates a nodes.json file with random nodes found in the DHT",
|
||||||
Action: discv4Crawl,
|
Action: discv4Crawl,
|
||||||
Flags: slices.Concat(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag, crawlParallelismFlag}),
|
Flags: slices.Concat(discoveryNodeFlags, []cli.Flag{crawlTimeoutFlag, crawlParallelismFlag, crawlModeFlag, crawlRandomWorkersFlag}),
|
||||||
}
|
}
|
||||||
discv4TestCommand = &cli.Command{
|
discv4TestCommand = &cli.Command{
|
||||||
Name: "test",
|
Name: "test",
|
||||||
|
|
@ -135,9 +135,19 @@ var (
|
||||||
}
|
}
|
||||||
crawlParallelismFlag = &cli.IntFlag{
|
crawlParallelismFlag = &cli.IntFlag{
|
||||||
Name: "parallel",
|
Name: "parallel",
|
||||||
Usage: "How many parallel discoveries to attempt.",
|
Usage: "How many parallel discoveries to attempt. Used both as the crawler harness's RequestENR worker count and (under -mode=fast) as the FINDNODE iterator's worker count.",
|
||||||
Value: 16,
|
Value: 16,
|
||||||
}
|
}
|
||||||
|
crawlModeFlag = &cli.StringFlag{
|
||||||
|
Name: "mode",
|
||||||
|
Usage: "Crawl iterator mode: 'lookup' (alpha-bounded Kademlia lookup) or 'fast' (one FINDNODE per peer with rotating prefix; sized by -parallel).",
|
||||||
|
Value: "lookup",
|
||||||
|
}
|
||||||
|
crawlRandomWorkersFlag = &cli.IntFlag{
|
||||||
|
Name: "random-workers",
|
||||||
|
Usage: "Of the -parallel workers in -mode=fast, how many pop a random queue item rather than the FIFO front. 0 = library default (parallel/4); negative = pure BFS.",
|
||||||
|
Value: 0,
|
||||||
|
}
|
||||||
remoteEnodeFlag = &cli.StringFlag{
|
remoteEnodeFlag = &cli.StringFlag{
|
||||||
Name: "remote",
|
Name: "remote",
|
||||||
Usage: "Enode of the remote node under test",
|
Usage: "Enode of the remote node under test",
|
||||||
|
|
@ -259,7 +269,11 @@ func discv4Crawl(ctx *cli.Context) error {
|
||||||
disc, config := startV4(ctx)
|
disc, config := startV4(ctx)
|
||||||
defer disc.Close()
|
defer disc.Close()
|
||||||
|
|
||||||
c, err := newCrawler(inputSet, config.Bootnodes, disc, disc.RandomNodes())
|
iter, err := newDiscv4CrawlIterator(disc, config.Bootnodes, ctx.String(crawlModeFlag.Name), ctx.Int(crawlParallelismFlag.Name), ctx.Int(crawlRandomWorkersFlag.Name))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c, err := newCrawler(inputSet, config.Bootnodes, disc, iter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -269,6 +283,21 @@ func discv4Crawl(ctx *cli.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newDiscv4CrawlIterator(disc *discover.UDPv4, bootnodes []*enode.Node, mode string, parallel, randomWorkers int) (enode.Iterator, error) {
|
||||||
|
switch mode {
|
||||||
|
case "", "lookup":
|
||||||
|
return disc.RandomNodes(), nil
|
||||||
|
case "fast":
|
||||||
|
return disc.CrawlIterator(discover.CrawlOptions{
|
||||||
|
Workers: parallel,
|
||||||
|
RandomWorkers: randomWorkers,
|
||||||
|
Seeds: bootnodes,
|
||||||
|
}), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown -%s value %q (want 'lookup' or 'fast')", crawlModeFlag.Name, mode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// discv4Test runs the protocol test suite.
|
// discv4Test runs the protocol test suite.
|
||||||
func discv4Test(ctx *cli.Context) error {
|
func discv4Test(ctx *cli.Context) error {
|
||||||
// Configure test package globals.
|
// Configure test package globals.
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/cmd/devp2p/internal/v5test"
|
"github.com/ethereum/go-ethereum/cmd/devp2p/internal/v5test"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||||
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -58,6 +59,9 @@ var (
|
||||||
Action: discv5Crawl,
|
Action: discv5Crawl,
|
||||||
Flags: slices.Concat(discoveryNodeFlags, []cli.Flag{
|
Flags: slices.Concat(discoveryNodeFlags, []cli.Flag{
|
||||||
crawlTimeoutFlag,
|
crawlTimeoutFlag,
|
||||||
|
crawlParallelismFlag,
|
||||||
|
crawlModeFlag,
|
||||||
|
crawlRandomWorkersFlag,
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
discv5TestCommand = &cli.Command{
|
discv5TestCommand = &cli.Command{
|
||||||
|
|
@ -111,7 +115,11 @@ func discv5Crawl(ctx *cli.Context) error {
|
||||||
disc, config := startV5(ctx)
|
disc, config := startV5(ctx)
|
||||||
defer disc.Close()
|
defer disc.Close()
|
||||||
|
|
||||||
c, err := newCrawler(inputSet, config.Bootnodes, disc, disc.RandomNodes())
|
iter, err := newDiscv5CrawlIterator(disc, config.Bootnodes, ctx.String(crawlModeFlag.Name), ctx.Int(crawlParallelismFlag.Name), ctx.Int(crawlRandomWorkersFlag.Name))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c, err := newCrawler(inputSet, config.Bootnodes, disc, iter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -121,6 +129,21 @@ func discv5Crawl(ctx *cli.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newDiscv5CrawlIterator(disc *discover.UDPv5, bootnodes []*enode.Node, mode string, parallel, randomWorkers int) (enode.Iterator, error) {
|
||||||
|
switch mode {
|
||||||
|
case "", "lookup":
|
||||||
|
return disc.RandomNodes(), nil
|
||||||
|
case "fast":
|
||||||
|
return disc.CrawlIterator(discover.CrawlOptions{
|
||||||
|
Workers: parallel,
|
||||||
|
RandomWorkers: randomWorkers,
|
||||||
|
Seeds: bootnodes,
|
||||||
|
}), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown -%s value %q (want 'lookup' or 'fast')", crawlModeFlag.Name, mode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// discv5Test runs the protocol test suite.
|
// discv5Test runs the protocol test suite.
|
||||||
func discv5Test(ctx *cli.Context) error {
|
func discv5Test(ctx *cli.Context) error {
|
||||||
suite := &v5test.Suite{
|
suite := &v5test.Suite{
|
||||||
|
|
|
||||||
376
p2p/discover/crawliter.go
Normal file
376
p2p/discover/crawliter.go
Normal file
|
|
@ -0,0 +1,376 @@
|
||||||
|
// Copyright 2026 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package discover
|
||||||
|
|
||||||
|
import (
|
||||||
|
crand "crypto/rand"
|
||||||
|
"math/rand/v2"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/p2p/discover/v4wire"
|
||||||
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CrawlIterator performs a breadth-first crawl of the discv4/discv5
|
||||||
|
// FINDNODE-response graph. Each worker pops a peer from the work queue,
|
||||||
|
// issues one FINDNODE call against it, and feeds any newly-seen peers
|
||||||
|
// back into the queue (and the iterator's output buffer).
|
||||||
|
//
|
||||||
|
// BFS (FIFO queue) was chosen over DFS or target-directed lookup for two
|
||||||
|
// reasons that align with the iterator's intended use case (survey-style
|
||||||
|
// crawls — devp2p crawl, geth dial-candidate discovery):
|
||||||
|
//
|
||||||
|
// - Coverage: BFS reaches every peer within N hops of the seeds in
|
||||||
|
// order, so a time-bounded run produces a representative sample of
|
||||||
|
// the reachable graph rather than a deep tendril through one
|
||||||
|
// sub-region.
|
||||||
|
// - Adversarial resilience: a peer returning malicious "neighbour"
|
||||||
|
// claims, dead-end peers, or eclipse-style sub-graphs cannot
|
||||||
|
// monopolise the worker pool, because pending work from other
|
||||||
|
// branches sits ahead of the attacker's responses in the queue.
|
||||||
|
// DFS would amplify each of these attacks.
|
||||||
|
//
|
||||||
|
// CrawlIterator is NOT a target-directed search. To find a specific node
|
||||||
|
// (or the K closest to a known target), use the lookup-based iterator
|
||||||
|
// returned by [UDPv4.RandomNodes] / [UDPv5.RandomNodes] instead — those
|
||||||
|
// run an alpha-bounded Kademlia lookup, which is the right shape for
|
||||||
|
// "find peer X" but the wrong shape for "survey the network".
|
||||||
|
//
|
||||||
|
// Pop policy can be tuned via [CrawlOptions.RandomWorkers]: any subset
|
||||||
|
// of the worker pool can be configured to pop a uniform-random queue
|
||||||
|
// item instead of the FIFO front, breaking strict ordering as a mild
|
||||||
|
// anti-fingerprint defence. The remaining workers pop FIFO and preserve
|
||||||
|
// the BFS character. See the field doc for details.
|
||||||
|
|
||||||
|
// CrawlOptions configures a CrawlIterator.
|
||||||
|
type CrawlOptions struct {
|
||||||
|
// Workers is the number of concurrent FINDNODE calls in flight.
|
||||||
|
// If <= 0, a default of 16 is used.
|
||||||
|
Workers int
|
||||||
|
|
||||||
|
// Seeds are the nodes to start the crawl from. If empty, the iterator
|
||||||
|
// terminates immediately. Callers should pass at least the bootnodes.
|
||||||
|
Seeds []*enode.Node
|
||||||
|
|
||||||
|
// Drange is the number of FINDNODE rotation slots per peer. Has effect
|
||||||
|
// only on the discv5 path, where each rotation slot d maps to the
|
||||||
|
// distance value 256-d (so Drange=16 covers distances 256, 255, ..., 241).
|
||||||
|
// On the discv4 path it has no effect: targets are random NodeIDs and
|
||||||
|
// the rotation counter is unused.
|
||||||
|
//
|
||||||
|
// Defaults to 16. Capped at 256.
|
||||||
|
Drange int
|
||||||
|
|
||||||
|
// OutputCap bounds the number of newly-discovered peers buffered for the
|
||||||
|
// caller's Next() to drain. When the buffer reaches this size, workers
|
||||||
|
// pause discovery via cond.Wait until Next() drains it. This is the
|
||||||
|
// iterator's backpressure point for slow consumers and adversarial peers
|
||||||
|
// flooding fresh ENRs in FINDNODE responses.
|
||||||
|
//
|
||||||
|
// If <= 0, defaults to 16 * Workers (≥ one full FINDNODE response per
|
||||||
|
// worker). Set higher for callers that drain in large batches.
|
||||||
|
//
|
||||||
|
// Note: the dedup set and the worker work-queue are not bounded; their
|
||||||
|
// growth is implicit in any iterator that emits each unique peer exactly
|
||||||
|
// once across a long crawl. Realistic bound is the reachable DHT size
|
||||||
|
// (~1M peers, ~50 MB).
|
||||||
|
OutputCap int
|
||||||
|
|
||||||
|
// RandomWorkers is the number of workers in the pool that pop a uniform-
|
||||||
|
// random queue item instead of the FIFO front. The remaining
|
||||||
|
// Workers - RandomWorkers workers behave as today, popping front. Total
|
||||||
|
// worker count is unchanged at Workers; only the BFS-vs-random split
|
||||||
|
// differs.
|
||||||
|
//
|
||||||
|
// Zero (the struct zero-value) selects the library default of
|
||||||
|
// Workers/4 rounded down. Pass a negative value (e.g. -1) to force
|
||||||
|
// pure BFS with zero random workers. A positive value > Workers is
|
||||||
|
// clamped to Workers (i.e. fully-random pop with no FIFO workers).
|
||||||
|
//
|
||||||
|
// The default Workers/4 mix preserves the BFS coverage character (cold
|
||||||
|
// start is dominated by the FIFO majority) while breaking strict FIFO
|
||||||
|
// ordering, a mild anti-fingerprint defence. See the file-level comment
|
||||||
|
// for details.
|
||||||
|
RandomWorkers int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *CrawlOptions) withDefaults() {
|
||||||
|
if o.Workers <= 0 {
|
||||||
|
o.Workers = 16
|
||||||
|
}
|
||||||
|
if o.Drange <= 0 {
|
||||||
|
o.Drange = 16
|
||||||
|
}
|
||||||
|
if o.Drange > 256 {
|
||||||
|
o.Drange = 256
|
||||||
|
}
|
||||||
|
if o.OutputCap <= 0 {
|
||||||
|
o.OutputCap = 16 * o.Workers
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case o.RandomWorkers < 0:
|
||||||
|
// Negative means: explicit pure BFS, no random workers.
|
||||||
|
o.RandomWorkers = 0
|
||||||
|
case o.RandomWorkers == 0:
|
||||||
|
// Zero (struct zero-value) means: library default.
|
||||||
|
o.RandomWorkers = o.Workers / 4
|
||||||
|
case o.RandomWorkers > o.Workers:
|
||||||
|
// Clamp; can't have more random workers than total workers.
|
||||||
|
o.RandomWorkers = o.Workers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CrawlIterator returns an enode.Iterator that performs a breadth-first
|
||||||
|
// crawl by issuing a single FINDNODE request per discovered peer, with a
|
||||||
|
// fresh random target each call. Compared to RandomNodes, this avoids the
|
||||||
|
// alpha-bounded Kademlia lookup convergence loop and is the right shape
|
||||||
|
// for breadth crawls (e.g. devp2p discv4 crawl).
|
||||||
|
//
|
||||||
|
// Concurrency is bounded by opts.Workers; pacing is RTT-driven, not
|
||||||
|
// rate-limited.
|
||||||
|
func (t *UDPv4) CrawlIterator(opts CrawlOptions) enode.Iterator {
|
||||||
|
queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) {
|
||||||
|
addr, ok := dst.UDPEndpoint()
|
||||||
|
if !ok {
|
||||||
|
return nil, errNoUDPEndpoint
|
||||||
|
}
|
||||||
|
var target v4wire.Pubkey
|
||||||
|
crand.Read(target[:])
|
||||||
|
peers, err := t.findnode(dst.ID(), addr, target)
|
||||||
|
if err != nil {
|
||||||
|
t.log.Trace("FINDNODE failed", "id", dst.ID(), "err", err)
|
||||||
|
}
|
||||||
|
return peers, err
|
||||||
|
}
|
||||||
|
return newCrawlIterator(opts, queryFn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CrawlIterator returns an enode.Iterator that performs a breadth-first
|
||||||
|
// crawl using single-distance FINDNODE requests. See [UDPv4.CrawlIterator]
|
||||||
|
// for the algorithm; the discv5 protocol takes a list of distances directly,
|
||||||
|
// so the rotation maps to distances [256, 255, ..., 256-Drange+1].
|
||||||
|
func (t *UDPv5) CrawlIterator(opts CrawlOptions) enode.Iterator {
|
||||||
|
queryFn := func(dst *enode.Node, d int) ([]*enode.Node, error) {
|
||||||
|
dist := uint(256 - d)
|
||||||
|
peers, err := t.Findnode(dst, []uint{dist})
|
||||||
|
if err != nil {
|
||||||
|
t.log.Trace("FINDNODE failed", "id", dst.ID(), "err", err)
|
||||||
|
}
|
||||||
|
return peers, err
|
||||||
|
}
|
||||||
|
return newCrawlIterator(opts, queryFn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// crawlIterator is a breadth-first FINDNODE-driven iterator. It maintains a
|
||||||
|
// shared work queue and an output buffer; workers pop from the queue, issue
|
||||||
|
// one FINDNODE per pop, and feed any newly-seen peers back into both the
|
||||||
|
// queue and the output buffer. The iterator terminates when the queue is
|
||||||
|
// empty and no FINDNODE call is in flight.
|
||||||
|
type crawlIterator struct {
|
||||||
|
queryFn func(dst *enode.Node, d int) ([]*enode.Node, error)
|
||||||
|
drange int
|
||||||
|
outputCap int
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
cond *sync.Cond
|
||||||
|
queue []*enode.Node // pending FINDNODE work
|
||||||
|
output []*enode.Node // emitted peers (one-time)
|
||||||
|
discovered map[enode.ID]struct{}
|
||||||
|
inflight int // queued + in-progress
|
||||||
|
closing bool // Close() called or natural termination
|
||||||
|
cur *enode.Node
|
||||||
|
|
||||||
|
rotation atomic.Uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCrawlIterator(opts CrawlOptions, queryFn func(*enode.Node, int) ([]*enode.Node, error)) *crawlIterator {
|
||||||
|
opts.withDefaults()
|
||||||
|
it := &crawlIterator{
|
||||||
|
queryFn: queryFn,
|
||||||
|
drange: opts.Drange,
|
||||||
|
outputCap: opts.OutputCap,
|
||||||
|
discovered: make(map[enode.ID]struct{}),
|
||||||
|
}
|
||||||
|
it.cond = sync.NewCond(&it.mu)
|
||||||
|
|
||||||
|
// Seed directly into the queue/output. Going through discover() would
|
||||||
|
// block on the OutputCap if len(Seeds) > OutputCap, deadlocking the
|
||||||
|
// constructor since workers haven't started and Next() hasn't been
|
||||||
|
// called yet.
|
||||||
|
for _, n := range opts.Seeds {
|
||||||
|
if n == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, seen := it.discovered[n.ID()]; seen {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
it.discovered[n.ID()] = struct{}{}
|
||||||
|
it.queue = append(it.queue, n)
|
||||||
|
it.output = append(it.output, n)
|
||||||
|
it.inflight++
|
||||||
|
}
|
||||||
|
|
||||||
|
// Workers. The first (Workers - RandomWorkers) workers pop FIFO; the
|
||||||
|
// remaining RandomWorkers pop a uniform-random queue index. Both share
|
||||||
|
// the same queue, mutex, cond, and termination semantics.
|
||||||
|
fifoCount := opts.Workers - opts.RandomWorkers
|
||||||
|
for i := 0; i < fifoCount; i++ {
|
||||||
|
it.wg.Add(1)
|
||||||
|
go it.worker(false)
|
||||||
|
}
|
||||||
|
for i := 0; i < opts.RandomWorkers; i++ {
|
||||||
|
it.wg.Add(1)
|
||||||
|
go it.worker(true)
|
||||||
|
}
|
||||||
|
return it
|
||||||
|
}
|
||||||
|
|
||||||
|
// discover records a newly-seen peer. Acquires mu internally; callers
|
||||||
|
// MUST NOT hold it. If output is at capacity, waits on cond until Next()
|
||||||
|
// drains it; this is the iterator's backpressure point.
|
||||||
|
func (it *crawlIterator) discover(n *enode.Node) {
|
||||||
|
if n == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
it.mu.Lock()
|
||||||
|
defer it.mu.Unlock()
|
||||||
|
for {
|
||||||
|
if it.closing {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, seen := it.discovered[n.ID()]; seen {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if it.outputCap > 0 && len(it.output) >= it.outputCap {
|
||||||
|
// Pause discovery until the consumer drains output. Releases mu
|
||||||
|
// while waiting so other workers can keep popping from queue and
|
||||||
|
// the consumer can pop from output.
|
||||||
|
it.cond.Wait()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
it.discovered[n.ID()] = struct{}{}
|
||||||
|
it.queue = append(it.queue, n)
|
||||||
|
it.output = append(it.output, n)
|
||||||
|
it.inflight++
|
||||||
|
it.cond.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
|
// popWork blocks until either a peer is available to query, or the iterator
|
||||||
|
// has nothing left to do. Returns (nil, false) on termination. If random is
|
||||||
|
// true, pops a uniform-random index via swap-and-pop; otherwise pops the
|
||||||
|
// FIFO front. Both modes share the same termination semantics.
|
||||||
|
func (it *crawlIterator) popWork(random bool) (*enode.Node, bool) {
|
||||||
|
it.mu.Lock()
|
||||||
|
defer it.mu.Unlock()
|
||||||
|
for {
|
||||||
|
if it.closing {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
if len(it.queue) > 0 {
|
||||||
|
if random {
|
||||||
|
i := rand.IntN(len(it.queue))
|
||||||
|
n := it.queue[i]
|
||||||
|
// Swap-and-pop: O(1) removal from middle.
|
||||||
|
it.queue[i] = it.queue[len(it.queue)-1]
|
||||||
|
it.queue = it.queue[:len(it.queue)-1]
|
||||||
|
return n, true
|
||||||
|
}
|
||||||
|
n := it.queue[0]
|
||||||
|
it.queue = it.queue[1:]
|
||||||
|
return n, true
|
||||||
|
}
|
||||||
|
if it.inflight == 0 {
|
||||||
|
// Queue empty AND nothing in flight: natural termination.
|
||||||
|
it.closing = true
|
||||||
|
it.cond.Broadcast()
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
it.cond.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// finishWork is called by workers after their FINDNODE response has been
|
||||||
|
// processed. It decrements the in-flight counter and broadcasts so a possibly
|
||||||
|
// idle worker can re-evaluate termination.
|
||||||
|
func (it *crawlIterator) finishWork() {
|
||||||
|
it.mu.Lock()
|
||||||
|
defer it.mu.Unlock()
|
||||||
|
it.inflight--
|
||||||
|
if it.inflight == 0 && len(it.queue) == 0 {
|
||||||
|
it.closing = true
|
||||||
|
it.cond.Broadcast()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (it *crawlIterator) worker(random bool) {
|
||||||
|
defer it.wg.Done()
|
||||||
|
for {
|
||||||
|
n, ok := it.popWork(random)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
d := int(it.rotation.Add(1)-1) % it.drange
|
||||||
|
peers, _ := it.queryFn(n, d)
|
||||||
|
for _, p := range peers {
|
||||||
|
it.discover(p)
|
||||||
|
}
|
||||||
|
it.finishWork()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next blocks until a newly-discovered peer is available, then returns true
|
||||||
|
// and makes the peer accessible via Node. Returns false when the iterator
|
||||||
|
// has terminated.
|
||||||
|
func (it *crawlIterator) Next() bool {
|
||||||
|
it.mu.Lock()
|
||||||
|
defer it.mu.Unlock()
|
||||||
|
for len(it.output) == 0 {
|
||||||
|
if it.closing {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
it.cond.Wait()
|
||||||
|
}
|
||||||
|
it.cur = it.output[0]
|
||||||
|
it.output = it.output[1:]
|
||||||
|
// Wake any worker stalled in discover() because output was at capacity.
|
||||||
|
it.cond.Broadcast()
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Node returns the most recent peer surfaced by Next.
|
||||||
|
func (it *crawlIterator) Node() *enode.Node {
|
||||||
|
it.mu.Lock()
|
||||||
|
defer it.mu.Unlock()
|
||||||
|
return it.cur
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close terminates the iterator, unblocking any goroutines waiting in Next.
|
||||||
|
// Workers exit at their next poll point; in-flight FINDNODE responses are
|
||||||
|
// dropped.
|
||||||
|
func (it *crawlIterator) Close() {
|
||||||
|
it.mu.Lock()
|
||||||
|
if !it.closing {
|
||||||
|
it.closing = true
|
||||||
|
it.cond.Broadcast()
|
||||||
|
}
|
||||||
|
it.mu.Unlock()
|
||||||
|
it.wg.Wait()
|
||||||
|
}
|
||||||
326
p2p/discover/crawliter_test.go
Normal file
326
p2p/discover/crawliter_test.go
Normal file
|
|
@ -0,0 +1,326 @@
|
||||||
|
// Copyright 2026 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package discover
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
|
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||||
|
)
|
||||||
|
|
||||||
|
// makeTestNodes returns n deterministically-generated nodes so callers can
|
||||||
|
// build small synthetic graphs.
|
||||||
|
func makeTestNodes(t *testing.T, n int) []*enode.Node {
|
||||||
|
t.Helper()
|
||||||
|
nodes := make([]*enode.Node, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
var key *ecdsa.PrivateKey
|
||||||
|
var err error
|
||||||
|
key, err = crypto.GenerateKey()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
var r enr.Record
|
||||||
|
r.Set(enr.IPv4{127, 0, 0, 1})
|
||||||
|
r.Set(enr.UDP(30300 + i))
|
||||||
|
if err := enode.SignV4(&r, key); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
nodes[i], err = enode.New(enode.ValidSchemes, &r)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCrawlIteratorTerminates verifies that the iterator emits every node in
|
||||||
|
// a finite synthetic graph, exactly once, and then returns false from Next.
|
||||||
|
func TestCrawlIteratorTerminates(t *testing.T) {
|
||||||
|
nodes := makeTestNodes(t, 50)
|
||||||
|
|
||||||
|
// Build a synthetic neighbour map: each node knows the next 5 nodes
|
||||||
|
// (cyclic). The crawl should reach all 50 from any single seed.
|
||||||
|
neighbours := make(map[enode.ID][]*enode.Node, len(nodes))
|
||||||
|
for i, n := range nodes {
|
||||||
|
var ns []*enode.Node
|
||||||
|
for k := 1; k <= 5; k++ {
|
||||||
|
ns = append(ns, nodes[(i+k)%len(nodes)])
|
||||||
|
}
|
||||||
|
neighbours[n.ID()] = ns
|
||||||
|
}
|
||||||
|
|
||||||
|
var calls atomic.Int64
|
||||||
|
queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) {
|
||||||
|
calls.Add(1)
|
||||||
|
return neighbours[dst.ID()], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
it := newCrawlIterator(CrawlOptions{
|
||||||
|
Workers: 4,
|
||||||
|
Seeds: []*enode.Node{nodes[0]},
|
||||||
|
Drange: 16,
|
||||||
|
}, queryFn)
|
||||||
|
|
||||||
|
seen := make(map[enode.ID]int)
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
for it.Next() {
|
||||||
|
seen[it.Node().ID()]++
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("iterator did not terminate within 5s")
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := len(seen); got != len(nodes) {
|
||||||
|
t.Fatalf("emitted %d distinct nodes, want %d", got, len(nodes))
|
||||||
|
}
|
||||||
|
for id, c := range seen {
|
||||||
|
if c != 1 {
|
||||||
|
t.Errorf("node %x emitted %d times, want 1", id[:4], c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Every distinct node should have been queried once.
|
||||||
|
if got, want := calls.Load(), int64(len(nodes)); got != want {
|
||||||
|
t.Errorf("queryFn invoked %d times, want %d", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCrawlIteratorClose verifies that calling Close while the iterator is
|
||||||
|
// still discovering nodes unblocks Next and stops workers cleanly.
|
||||||
|
func TestCrawlIteratorClose(t *testing.T) {
|
||||||
|
nodes := makeTestNodes(t, 20)
|
||||||
|
|
||||||
|
// Slow queryFn so we can interrupt mid-crawl.
|
||||||
|
queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) {
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
var ns []*enode.Node
|
||||||
|
for i, n := range nodes {
|
||||||
|
if n.ID() == dst.ID() {
|
||||||
|
ns = append(ns, nodes[(i+1)%len(nodes)])
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ns, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
it := newCrawlIterator(CrawlOptions{
|
||||||
|
Workers: 2,
|
||||||
|
Seeds: []*enode.Node{nodes[0]},
|
||||||
|
Drange: 16,
|
||||||
|
}, queryFn)
|
||||||
|
|
||||||
|
// Drain a few nodes, then Close.
|
||||||
|
go func() {
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
it.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for it.Next() {
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("Next did not return after Close")
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCrawlIteratorOutputCap verifies the backpressure invariant:
|
||||||
|
// the size of the output buffer never exceeds OutputCap regardless of
|
||||||
|
// how fast the queryFn returns peers.
|
||||||
|
func TestCrawlIteratorOutputCap(t *testing.T) {
|
||||||
|
const cap = 8
|
||||||
|
nodes := makeTestNodes(t, 200)
|
||||||
|
|
||||||
|
// Each node maps to the next in the chain, so discovery is unbounded
|
||||||
|
// from the iterator's perspective until we've covered the cycle.
|
||||||
|
queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) {
|
||||||
|
for i, n := range nodes {
|
||||||
|
if n.ID() == dst.ID() {
|
||||||
|
// Return 4 fresh neighbours so the producer outpaces the
|
||||||
|
// consumer (we sleep between Next() calls below).
|
||||||
|
return []*enode.Node{
|
||||||
|
nodes[(i+1)%len(nodes)],
|
||||||
|
nodes[(i+2)%len(nodes)],
|
||||||
|
nodes[(i+3)%len(nodes)],
|
||||||
|
nodes[(i+4)%len(nodes)],
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
itAny := newCrawlIterator(CrawlOptions{
|
||||||
|
Workers: 8,
|
||||||
|
Seeds: []*enode.Node{nodes[0]},
|
||||||
|
Drange: 16,
|
||||||
|
OutputCap: cap,
|
||||||
|
}, queryFn)
|
||||||
|
it := itAny // *crawlIterator
|
||||||
|
|
||||||
|
var maxObserved int
|
||||||
|
check := func() {
|
||||||
|
it.mu.Lock()
|
||||||
|
if l := len(it.output); l > maxObserved {
|
||||||
|
maxObserved = l
|
||||||
|
}
|
||||||
|
it.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow consumer: read with delays so workers must back off.
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(done)
|
||||||
|
for it.Next() {
|
||||||
|
check()
|
||||||
|
time.Sleep(2 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
it.Close()
|
||||||
|
t.Fatal("iterator did not terminate within 10s")
|
||||||
|
}
|
||||||
|
|
||||||
|
if maxObserved > cap {
|
||||||
|
t.Errorf("output buffer reached %d, want <= cap=%d", maxObserved, cap)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCrawlIteratorRandomWorkers verifies that with a mixed FIFO + random
|
||||||
|
// worker pool, the iterator still terminates and emits each node exactly
|
||||||
|
// once on the synthetic graph from TestCrawlIteratorTerminates.
|
||||||
|
func TestCrawlIteratorRandomWorkers(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
workers int
|
||||||
|
randomWorkers int
|
||||||
|
}{
|
||||||
|
{"all-fifo", 4, -1}, // -1 → pure BFS
|
||||||
|
{"all-random", 4, 4}, // all workers random-pop
|
||||||
|
{"half-half", 8, 4}, // 4 FIFO + 4 random
|
||||||
|
{"default", 16, 0}, // 0 → library default = 4 random of 16
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
nodes := makeTestNodes(t, 50)
|
||||||
|
neighbours := make(map[enode.ID][]*enode.Node, len(nodes))
|
||||||
|
for i, n := range nodes {
|
||||||
|
var ns []*enode.Node
|
||||||
|
for k := 1; k <= 5; k++ {
|
||||||
|
ns = append(ns, nodes[(i+k)%len(nodes)])
|
||||||
|
}
|
||||||
|
neighbours[n.ID()] = ns
|
||||||
|
}
|
||||||
|
var calls atomic.Int64
|
||||||
|
queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) {
|
||||||
|
calls.Add(1)
|
||||||
|
return neighbours[dst.ID()], nil
|
||||||
|
}
|
||||||
|
it := newCrawlIterator(CrawlOptions{
|
||||||
|
Workers: tc.workers,
|
||||||
|
RandomWorkers: tc.randomWorkers,
|
||||||
|
Seeds: []*enode.Node{nodes[0]},
|
||||||
|
Drange: 16,
|
||||||
|
}, queryFn)
|
||||||
|
seen := make(map[enode.ID]int)
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
for it.Next() {
|
||||||
|
seen[it.Node().ID()]++
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("iterator did not terminate within 5s")
|
||||||
|
}
|
||||||
|
if got := len(seen); got != len(nodes) {
|
||||||
|
t.Fatalf("emitted %d distinct nodes, want %d", got, len(nodes))
|
||||||
|
}
|
||||||
|
for id, c := range seen {
|
||||||
|
if c != 1 {
|
||||||
|
t.Errorf("node %x emitted %d times, want 1", id[:4], c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if got, want := calls.Load(), int64(len(nodes)); got != want {
|
||||||
|
t.Errorf("queryFn invoked %d times, want %d", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCrawlIteratorRotation verifies that the d argument passed to queryFn
|
||||||
|
// rotates through 0..Drange-1.
|
||||||
|
func TestCrawlIteratorRotation(t *testing.T) {
|
||||||
|
nodes := makeTestNodes(t, 64)
|
||||||
|
// Each node has the next 1 as neighbour, so the crawl makes exactly
|
||||||
|
// len(nodes) FINDNODE calls in a chain.
|
||||||
|
neighbours := make(map[enode.ID]*enode.Node, len(nodes))
|
||||||
|
for i, n := range nodes {
|
||||||
|
neighbours[n.ID()] = nodes[(i+1)%len(nodes)]
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
mu sync.Mutex
|
||||||
|
seenDs = make(map[int]int)
|
||||||
|
)
|
||||||
|
queryFn := func(dst *enode.Node, d int) ([]*enode.Node, error) {
|
||||||
|
mu.Lock()
|
||||||
|
seenDs[d]++
|
||||||
|
mu.Unlock()
|
||||||
|
return []*enode.Node{neighbours[dst.ID()]}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
it := newCrawlIterator(CrawlOptions{
|
||||||
|
Workers: 1, // single worker → strictly increasing d
|
||||||
|
Seeds: []*enode.Node{nodes[0]},
|
||||||
|
Drange: 16,
|
||||||
|
}, queryFn)
|
||||||
|
for it.Next() {
|
||||||
|
}
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
for d := 0; d < 16; d++ {
|
||||||
|
if seenDs[d] == 0 {
|
||||||
|
t.Errorf("rotation index %d never used", d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in a new issue