mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-25 01:09:28 +00:00
p2p/discover: add CrawlIterator for breadth-first FINDNODE walks
Add an enode.Iterator that drives discovery by issuing a single FINDNODE per discovered peer, rotating the target through Drange sub-regions of the keyspace. Compared to RandomNodes (which wraps an alpha=3 Kademlia lookup that converges on a single target), this shape is geared for breadth: each peer is asked about a different slice of the keyspace, so aggregate coverage grows quickly without per-peer overlap. The two protocols expose different FINDNODE primitives, so the iterator threads a per-protocol queryFn: * discv5 takes a list of distances natively, so we just pass [256-d] for d in 0..Drange-1. * discv4 takes a target NodeID and replies with the K closest. To get an equivalent rotation, we pick a random pubkey whose Keccak256 starts with the desired prefix nibble. With Drange=16 that's ~16 random draws per call -- negligible compared to the network round trip. Concurrency is bounded by Workers (default 16). There is intentionally no rate limit: pacing is RTT-driven, ~Workers/RTT on the wire. Termination is implicit: when the work queue is empty AND no FINDNODE is in flight, the iterator closes its output and Next returns false. Close() short-circuits this for callers that want to bail early. Adapts the algorithm from github.com/cskiraly/fast-ethereum-crawler (dcrawl.nim) -- the prefix-rotation idea -- but drops its 1000 req/s rate limit in favour of the bounded worker pool.
This commit is contained in:
parent
aaa2b66285
commit
6c0d848d9c
2 changed files with 606 additions and 0 deletions
343
p2p/discover/crawliter.go
Normal file
343
p2p/discover/crawliter.go
Normal file
|
|
@ -0,0 +1,343 @@
|
|||
// Copyright 2026 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package discover
|
||||
|
||||
import (
|
||||
crand "crypto/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/p2p/discover/v4wire"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
)
|
||||
|
||||
// CrawlOptions configures a CrawlIterator.
|
||||
type CrawlOptions struct {
|
||||
// Workers is the number of concurrent FINDNODE calls in flight.
|
||||
// If <= 0, a default of 16 is used.
|
||||
Workers int
|
||||
|
||||
// Seeds are the nodes to start the crawl from. If empty, the iterator
|
||||
// terminates immediately. Callers should pass at least the bootnodes.
|
||||
Seeds []*enode.Node
|
||||
|
||||
// Drange is the number of keyspace sub-regions to rotate the FINDNODE
|
||||
// target through. Defaults to 16. Internally rounded up to the next power
|
||||
// of two and capped at 256 (the keyspace top-byte width); on the discv4
|
||||
// path this caps the prefix-bit grind cost, on the discv5 path it caps
|
||||
// the rotation to valid distances [1, 256].
|
||||
Drange int
|
||||
|
||||
// OutputCap bounds the number of newly-discovered peers buffered for the
|
||||
// caller's Next() to drain. When the buffer reaches this size, workers
|
||||
// pause discovery via cond.Wait until Next() drains it. This is the
|
||||
// iterator's backpressure point for slow consumers and adversarial peers
|
||||
// flooding fresh ENRs in FINDNODE responses.
|
||||
//
|
||||
// If <= 0, defaults to 16 * Workers (≥ one full FINDNODE response per
|
||||
// worker). Set higher for callers that drain in large batches.
|
||||
//
|
||||
// Note: the dedup set and the worker work-queue are not bounded; their
|
||||
// growth is implicit in any iterator that emits each unique peer exactly
|
||||
// once across a long crawl. Realistic bound is the reachable DHT size
|
||||
// (~1M peers, ~50 MB).
|
||||
OutputCap int
|
||||
}
|
||||
|
||||
func (o *CrawlOptions) withDefaults() {
|
||||
if o.Workers <= 0 {
|
||||
o.Workers = 16
|
||||
}
|
||||
if o.Drange <= 0 {
|
||||
o.Drange = 16
|
||||
}
|
||||
if o.Drange > 256 {
|
||||
o.Drange = 256
|
||||
}
|
||||
// Round up to the next power of two so the prefix-bit width is
|
||||
// well-defined and Drange divides the top byte of the keyspace evenly.
|
||||
if o.Drange&(o.Drange-1) != 0 {
|
||||
o.Drange = nextPowerOfTwo(o.Drange)
|
||||
}
|
||||
if o.OutputCap <= 0 {
|
||||
o.OutputCap = 16 * o.Workers
|
||||
}
|
||||
}
|
||||
|
||||
// nextPowerOfTwo returns the smallest power of two >= n, for n in [1, 256].
|
||||
func nextPowerOfTwo(n int) int {
|
||||
p := 1
|
||||
for p < n {
|
||||
p <<= 1
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
// CrawlIterator returns an enode.Iterator that performs a breadth-first
|
||||
// crawl by issuing a single FINDNODE request per discovered peer, rotating
|
||||
// the request's target through Drange sub-regions of the keyspace so that
|
||||
// each peer is asked about a different slice. Compared to RandomNodes, this
|
||||
// avoids the alpha-bounded Kademlia lookup convergence loop and is the right
|
||||
// shape for breadth crawls (e.g. devp2p discv4 crawl).
|
||||
//
|
||||
// Concurrency is bounded by opts.Workers; pacing is RTT-driven, not
|
||||
// rate-limited.
|
||||
func (t *UDPv4) CrawlIterator(opts CrawlOptions) enode.Iterator {
|
||||
opts.withDefaults()
|
||||
prefixBits := log2Pow2(opts.Drange)
|
||||
queryFn := func(dst *enode.Node, d int) ([]*enode.Node, error) {
|
||||
addr, ok := dst.UDPEndpoint()
|
||||
if !ok {
|
||||
return nil, errNoUDPEndpoint
|
||||
}
|
||||
target := randomTargetWithPrefix(uint8(d), prefixBits)
|
||||
peers, err := t.findnode(dst.ID(), addr, target)
|
||||
if err != nil {
|
||||
t.log.Trace("FINDNODE failed", "id", dst.ID(), "err", err)
|
||||
}
|
||||
return peers, err
|
||||
}
|
||||
return newCrawlIterator(opts, queryFn)
|
||||
}
|
||||
|
||||
// CrawlIterator returns an enode.Iterator that performs a breadth-first
|
||||
// crawl using single-distance FINDNODE requests. See [UDPv4.CrawlIterator]
|
||||
// for the algorithm; the discv5 protocol takes a list of distances directly,
|
||||
// so the rotation maps to distances [256, 255, ..., 256-Drange+1].
|
||||
func (t *UDPv5) CrawlIterator(opts CrawlOptions) enode.Iterator {
|
||||
queryFn := func(dst *enode.Node, d int) ([]*enode.Node, error) {
|
||||
dist := uint(256 - d)
|
||||
peers, err := t.Findnode(dst, []uint{dist})
|
||||
if err != nil {
|
||||
t.log.Trace("FINDNODE failed", "id", dst.ID(), "err", err)
|
||||
}
|
||||
return peers, err
|
||||
}
|
||||
return newCrawlIterator(opts, queryFn)
|
||||
}
|
||||
|
||||
// log2Pow2 returns log2(n) for power-of-two n. The caller must ensure n is a
|
||||
// power of two; non-power-of-two inputs round down.
|
||||
func log2Pow2(n int) int {
|
||||
bits := 0
|
||||
for n > 1 {
|
||||
n >>= 1
|
||||
bits++
|
||||
}
|
||||
return bits
|
||||
}
|
||||
|
||||
// randomTargetWithPrefix returns a v4wire.Pubkey whose Keccak256 hash has its
|
||||
// top `bits` bits equal to d. On average ~2^bits draws are needed.
|
||||
func randomTargetWithPrefix(d uint8, bits int) v4wire.Pubkey {
|
||||
if bits == 0 {
|
||||
var pk v4wire.Pubkey
|
||||
crand.Read(pk[:])
|
||||
return pk
|
||||
}
|
||||
for {
|
||||
var pk v4wire.Pubkey
|
||||
crand.Read(pk[:])
|
||||
h := crypto.Keccak256(pk[:])
|
||||
if (h[0] >> (8 - bits)) == d {
|
||||
return pk
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// crawlIterator is a breadth-first FINDNODE-driven iterator. It maintains a
|
||||
// shared work queue and an output buffer; workers pop from the queue, issue
|
||||
// one FINDNODE per pop, and feed any newly-seen peers back into both the
|
||||
// queue and the output buffer. The iterator terminates when the queue is
|
||||
// empty and no FINDNODE call is in flight.
|
||||
type crawlIterator struct {
|
||||
queryFn func(dst *enode.Node, d int) ([]*enode.Node, error)
|
||||
drange int
|
||||
outputCap int
|
||||
wg sync.WaitGroup
|
||||
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
queue []*enode.Node // pending FINDNODE work
|
||||
output []*enode.Node // emitted peers (one-time)
|
||||
discovered map[enode.ID]struct{}
|
||||
inflight int // queued + in-progress
|
||||
closing bool // Close() called or natural termination
|
||||
cur *enode.Node
|
||||
|
||||
rotation atomic.Uint64
|
||||
}
|
||||
|
||||
func newCrawlIterator(opts CrawlOptions, queryFn func(*enode.Node, int) ([]*enode.Node, error)) *crawlIterator {
|
||||
opts.withDefaults()
|
||||
it := &crawlIterator{
|
||||
queryFn: queryFn,
|
||||
drange: opts.Drange,
|
||||
outputCap: opts.OutputCap,
|
||||
discovered: make(map[enode.ID]struct{}),
|
||||
}
|
||||
it.cond = sync.NewCond(&it.mu)
|
||||
|
||||
// Seed directly into the queue/output. Going through discover() would
|
||||
// block on the OutputCap if len(Seeds) > OutputCap, deadlocking the
|
||||
// constructor since workers haven't started and Next() hasn't been
|
||||
// called yet.
|
||||
for _, n := range opts.Seeds {
|
||||
if n == nil {
|
||||
continue
|
||||
}
|
||||
if _, seen := it.discovered[n.ID()]; seen {
|
||||
continue
|
||||
}
|
||||
it.discovered[n.ID()] = struct{}{}
|
||||
it.queue = append(it.queue, n)
|
||||
it.output = append(it.output, n)
|
||||
it.inflight++
|
||||
}
|
||||
|
||||
// Workers.
|
||||
for i := 0; i < opts.Workers; i++ {
|
||||
it.wg.Add(1)
|
||||
go it.worker()
|
||||
}
|
||||
return it
|
||||
}
|
||||
|
||||
// discover records a newly-seen peer. Acquires mu internally; callers
|
||||
// MUST NOT hold it. If output is at capacity, waits on cond until Next()
|
||||
// drains it; this is the iterator's backpressure point.
|
||||
func (it *crawlIterator) discover(n *enode.Node) {
|
||||
if n == nil {
|
||||
return
|
||||
}
|
||||
it.mu.Lock()
|
||||
defer it.mu.Unlock()
|
||||
for {
|
||||
if it.closing {
|
||||
return
|
||||
}
|
||||
if _, seen := it.discovered[n.ID()]; seen {
|
||||
return
|
||||
}
|
||||
if it.outputCap > 0 && len(it.output) >= it.outputCap {
|
||||
// Pause discovery until the consumer drains output. Releases mu
|
||||
// while waiting so other workers can keep popping from queue and
|
||||
// the consumer can pop from output.
|
||||
it.cond.Wait()
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
it.discovered[n.ID()] = struct{}{}
|
||||
it.queue = append(it.queue, n)
|
||||
it.output = append(it.output, n)
|
||||
it.inflight++
|
||||
it.cond.Broadcast()
|
||||
}
|
||||
|
||||
// popWork blocks until either a peer is available to query, or the iterator
|
||||
// has nothing left to do. Returns (nil, false) on termination.
|
||||
func (it *crawlIterator) popWork() (*enode.Node, bool) {
|
||||
it.mu.Lock()
|
||||
defer it.mu.Unlock()
|
||||
for {
|
||||
if it.closing {
|
||||
return nil, false
|
||||
}
|
||||
if len(it.queue) > 0 {
|
||||
n := it.queue[0]
|
||||
it.queue = it.queue[1:]
|
||||
return n, true
|
||||
}
|
||||
if it.inflight == 0 {
|
||||
// Queue empty AND nothing in flight: natural termination.
|
||||
it.closing = true
|
||||
it.cond.Broadcast()
|
||||
return nil, false
|
||||
}
|
||||
it.cond.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
// finishWork is called by workers after their FINDNODE response has been
|
||||
// processed. It decrements the in-flight counter and broadcasts so a possibly
|
||||
// idle worker can re-evaluate termination.
|
||||
func (it *crawlIterator) finishWork() {
|
||||
it.mu.Lock()
|
||||
defer it.mu.Unlock()
|
||||
it.inflight--
|
||||
if it.inflight == 0 && len(it.queue) == 0 {
|
||||
it.closing = true
|
||||
it.cond.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
func (it *crawlIterator) worker() {
|
||||
defer it.wg.Done()
|
||||
for {
|
||||
n, ok := it.popWork()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
d := int(it.rotation.Add(1)-1) % it.drange
|
||||
peers, _ := it.queryFn(n, d)
|
||||
for _, p := range peers {
|
||||
it.discover(p)
|
||||
}
|
||||
it.finishWork()
|
||||
}
|
||||
}
|
||||
|
||||
// Next blocks until a newly-discovered peer is available, then returns true
|
||||
// and makes the peer accessible via Node. Returns false when the iterator
|
||||
// has terminated.
|
||||
func (it *crawlIterator) Next() bool {
|
||||
it.mu.Lock()
|
||||
defer it.mu.Unlock()
|
||||
for len(it.output) == 0 {
|
||||
if it.closing {
|
||||
return false
|
||||
}
|
||||
it.cond.Wait()
|
||||
}
|
||||
it.cur = it.output[0]
|
||||
it.output = it.output[1:]
|
||||
// Wake any worker stalled in discover() because output was at capacity.
|
||||
it.cond.Broadcast()
|
||||
return true
|
||||
}
|
||||
|
||||
// Node returns the most recent peer surfaced by Next.
|
||||
func (it *crawlIterator) Node() *enode.Node {
|
||||
it.mu.Lock()
|
||||
defer it.mu.Unlock()
|
||||
return it.cur
|
||||
}
|
||||
|
||||
// Close terminates the iterator, unblocking any goroutines waiting in Next.
|
||||
// Workers exit at their next poll point; in-flight FINDNODE responses are
|
||||
// dropped.
|
||||
func (it *crawlIterator) Close() {
|
||||
it.mu.Lock()
|
||||
if !it.closing {
|
||||
it.closing = true
|
||||
it.cond.Broadcast()
|
||||
}
|
||||
it.mu.Unlock()
|
||||
it.wg.Wait()
|
||||
}
|
||||
263
p2p/discover/crawliter_test.go
Normal file
263
p2p/discover/crawliter_test.go
Normal file
|
|
@ -0,0 +1,263 @@
|
|||
// Copyright 2026 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package discover
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||
)
|
||||
|
||||
// makeTestNodes returns n deterministically-generated nodes so callers can
|
||||
// build small synthetic graphs.
|
||||
func makeTestNodes(t *testing.T, n int) []*enode.Node {
|
||||
t.Helper()
|
||||
nodes := make([]*enode.Node, n)
|
||||
for i := 0; i < n; i++ {
|
||||
var key *ecdsa.PrivateKey
|
||||
var err error
|
||||
key, err = crypto.GenerateKey()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var r enr.Record
|
||||
r.Set(enr.IPv4{127, 0, 0, 1})
|
||||
r.Set(enr.UDP(30300 + i))
|
||||
if err := enode.SignV4(&r, key); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
nodes[i], err = enode.New(enode.ValidSchemes, &r)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
// TestCrawlIteratorTerminates verifies that the iterator emits every node in
|
||||
// a finite synthetic graph, exactly once, and then returns false from Next.
|
||||
func TestCrawlIteratorTerminates(t *testing.T) {
|
||||
nodes := makeTestNodes(t, 50)
|
||||
|
||||
// Build a synthetic neighbour map: each node knows the next 5 nodes
|
||||
// (cyclic). The crawl should reach all 50 from any single seed.
|
||||
neighbours := make(map[enode.ID][]*enode.Node, len(nodes))
|
||||
for i, n := range nodes {
|
||||
var ns []*enode.Node
|
||||
for k := 1; k <= 5; k++ {
|
||||
ns = append(ns, nodes[(i+k)%len(nodes)])
|
||||
}
|
||||
neighbours[n.ID()] = ns
|
||||
}
|
||||
|
||||
var calls atomic.Int64
|
||||
queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) {
|
||||
calls.Add(1)
|
||||
return neighbours[dst.ID()], nil
|
||||
}
|
||||
|
||||
it := newCrawlIterator(CrawlOptions{
|
||||
Workers: 4,
|
||||
Seeds: []*enode.Node{nodes[0]},
|
||||
Drange: 16,
|
||||
}, queryFn)
|
||||
|
||||
seen := make(map[enode.ID]int)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for it.Next() {
|
||||
seen[it.Node().ID()]++
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("iterator did not terminate within 5s")
|
||||
}
|
||||
|
||||
if got := len(seen); got != len(nodes) {
|
||||
t.Fatalf("emitted %d distinct nodes, want %d", got, len(nodes))
|
||||
}
|
||||
for id, c := range seen {
|
||||
if c != 1 {
|
||||
t.Errorf("node %x emitted %d times, want 1", id[:4], c)
|
||||
}
|
||||
}
|
||||
// Every distinct node should have been queried once.
|
||||
if got, want := calls.Load(), int64(len(nodes)); got != want {
|
||||
t.Errorf("queryFn invoked %d times, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCrawlIteratorClose verifies that calling Close while the iterator is
|
||||
// still discovering nodes unblocks Next and stops workers cleanly.
|
||||
func TestCrawlIteratorClose(t *testing.T) {
|
||||
nodes := makeTestNodes(t, 20)
|
||||
|
||||
// Slow queryFn so we can interrupt mid-crawl.
|
||||
queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
var ns []*enode.Node
|
||||
for i, n := range nodes {
|
||||
if n.ID() == dst.ID() {
|
||||
ns = append(ns, nodes[(i+1)%len(nodes)])
|
||||
break
|
||||
}
|
||||
}
|
||||
return ns, nil
|
||||
}
|
||||
|
||||
it := newCrawlIterator(CrawlOptions{
|
||||
Workers: 2,
|
||||
Seeds: []*enode.Node{nodes[0]},
|
||||
Drange: 16,
|
||||
}, queryFn)
|
||||
|
||||
// Drain a few nodes, then Close.
|
||||
go func() {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
it.Close()
|
||||
}()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for it.Next() {
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Next did not return after Close")
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// TestCrawlIteratorOutputCap verifies the backpressure invariant:
|
||||
// the size of the output buffer never exceeds OutputCap regardless of
|
||||
// how fast the queryFn returns peers.
|
||||
func TestCrawlIteratorOutputCap(t *testing.T) {
|
||||
const cap = 8
|
||||
nodes := makeTestNodes(t, 200)
|
||||
|
||||
// Each node maps to the next in the chain, so discovery is unbounded
|
||||
// from the iterator's perspective until we've covered the cycle.
|
||||
queryFn := func(dst *enode.Node, _ int) ([]*enode.Node, error) {
|
||||
for i, n := range nodes {
|
||||
if n.ID() == dst.ID() {
|
||||
// Return 4 fresh neighbours so the producer outpaces the
|
||||
// consumer (we sleep between Next() calls below).
|
||||
return []*enode.Node{
|
||||
nodes[(i+1)%len(nodes)],
|
||||
nodes[(i+2)%len(nodes)],
|
||||
nodes[(i+3)%len(nodes)],
|
||||
nodes[(i+4)%len(nodes)],
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
itAny := newCrawlIterator(CrawlOptions{
|
||||
Workers: 8,
|
||||
Seeds: []*enode.Node{nodes[0]},
|
||||
Drange: 16,
|
||||
OutputCap: cap,
|
||||
}, queryFn)
|
||||
it := itAny // *crawlIterator
|
||||
|
||||
var maxObserved int
|
||||
check := func() {
|
||||
it.mu.Lock()
|
||||
if l := len(it.output); l > maxObserved {
|
||||
maxObserved = l
|
||||
}
|
||||
it.mu.Unlock()
|
||||
}
|
||||
|
||||
// Slow consumer: read with delays so workers must back off.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
for it.Next() {
|
||||
check()
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(10 * time.Second):
|
||||
it.Close()
|
||||
t.Fatal("iterator did not terminate within 10s")
|
||||
}
|
||||
|
||||
if maxObserved > cap {
|
||||
t.Errorf("output buffer reached %d, want <= cap=%d", maxObserved, cap)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCrawlIteratorRotation verifies that the d argument passed to queryFn
|
||||
// rotates through 0..Drange-1.
|
||||
func TestCrawlIteratorRotation(t *testing.T) {
|
||||
nodes := makeTestNodes(t, 64)
|
||||
// Each node has the next 1 as neighbour, so the crawl makes exactly
|
||||
// len(nodes) FINDNODE calls in a chain.
|
||||
neighbours := make(map[enode.ID]*enode.Node, len(nodes))
|
||||
for i, n := range nodes {
|
||||
neighbours[n.ID()] = nodes[(i+1)%len(nodes)]
|
||||
}
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
seenDs = make(map[int]int)
|
||||
)
|
||||
queryFn := func(dst *enode.Node, d int) ([]*enode.Node, error) {
|
||||
mu.Lock()
|
||||
seenDs[d]++
|
||||
mu.Unlock()
|
||||
return []*enode.Node{neighbours[dst.ID()]}, nil
|
||||
}
|
||||
|
||||
it := newCrawlIterator(CrawlOptions{
|
||||
Workers: 1, // single worker → strictly increasing d
|
||||
Seeds: []*enode.Node{nodes[0]},
|
||||
Drange: 16,
|
||||
}, queryFn)
|
||||
for it.Next() {
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
for d := 0; d < 16; d++ {
|
||||
if seenDs[d] == 0 {
|
||||
t.Errorf("rotation index %d never used", d)
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue