From 46e4f0b5c1d269e29d26a273016b18afbd13bbc4 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 29 Aug 2025 14:23:45 +0200 Subject: [PATCH] p2p/discover: add waitForNodes --- p2p/discover/lookup.go | 17 +++-------------- p2p/discover/table.go | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/p2p/discover/lookup.go b/p2p/discover/lookup.go index 09808b71e0..db1bbe32f9 100644 --- a/p2p/discover/lookup.go +++ b/p2p/discover/lookup.go @@ -19,7 +19,6 @@ package discover import ( "context" "errors" - "time" "github.com/ethereum/go-ethereum/p2p/enode" ) @@ -106,11 +105,10 @@ func (it *lookup) startQueries() bool { // The first query returns nodes from the local table. if it.queries == -1 { closest := it.tab.findnodeByID(it.result.target, bucketSize, false) - // Avoid finishing the lookup too quickly if table is empty. It'd be better to wait - // for the table to fill in this case, but there is no good mechanism for that - // yet. + // Avoid finishing the lookup too quickly if table is empty. + // Wait for the table to fill. if len(closest.entries) == 0 { - it.slowdown() + it.tab.waitForNodes(1) } it.queries = 1 it.replyCh <- closest.entries @@ -130,15 +128,6 @@ func (it *lookup) startQueries() bool { return it.queries > 0 } -func (it *lookup) slowdown() { - sleep := time.NewTimer(1 * time.Second) - defer sleep.Stop() - select { - case <-sleep.C: - case <-it.tab.closeReq: - } -} - func (it *lookup) query(n *enode.Node, reply chan<- []*enode.Node) { r, err := it.queryfunc(n) if !errors.Is(err, errClosed) { // avoid recording failures on shutdown. diff --git a/p2p/discover/table.go b/p2p/discover/table.go index b6c35aaaa9..63aa655256 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/enode" @@ -84,6 +85,7 @@ type Table struct { closeReq chan struct{} closed chan struct{} + nodeFeed event.FeedOf[*enode.Node] nodeAddedHook func(*bucket, *tableNode) nodeRemovedHook func(*bucket, *tableNode) } @@ -567,6 +569,8 @@ func (tab *Table) nodeAdded(b *bucket, n *tableNode) { } n.addedToBucket = time.Now() tab.revalidation.nodeAdded(tab, n) + + tab.nodeFeed.Send(n.Node) if tab.nodeAddedHook != nil { tab.nodeAddedHook(b, n) } @@ -702,3 +706,36 @@ func (tab *Table) deleteNode(n *enode.Node) { b := tab.bucket(n.ID()) tab.deleteInBucket(b, n.ID()) } + +// waitForNodes blocks until the table contains at least n nodes. +func (tab *Table) waitForNodes(n int) bool { + getlength := func() (count int) { + for _, b := range &tab.buckets { + count += len(b.entries) + } + return count + } + + var ch chan *enode.Node + for { + tab.mutex.Lock() + if getlength() >= n { + tab.mutex.Unlock() + return true + } + if ch == nil { + // Init subscription. + ch = make(chan *enode.Node) + sub := tab.nodeFeed.Subscribe(ch) + defer sub.Unsubscribe() + } + tab.mutex.Unlock() + + // Wait for a node add event. + select { + case <-ch: + case <-tab.closeReq: + return false + } + } +}