p2p/discover: add waitForNodes

This commit is contained in:
Felix Lange 2025-08-29 14:23:45 +02:00
parent 1f7f95d718
commit 46e4f0b5c1
2 changed files with 40 additions and 14 deletions

View file

@ -19,7 +19,6 @@ package discover
import (
"context"
"errors"
"time"
"github.com/ethereum/go-ethereum/p2p/enode"
)
@ -106,11 +105,10 @@ func (it *lookup) startQueries() bool {
// The first query returns nodes from the local table.
if it.queries == -1 {
closest := it.tab.findnodeByID(it.result.target, bucketSize, false)
// Avoid finishing the lookup too quickly if table is empty. It'd be better to wait
// for the table to fill in this case, but there is no good mechanism for that
// yet.
// Avoid finishing the lookup too quickly if table is empty.
// Wait for the table to fill.
if len(closest.entries) == 0 {
it.slowdown()
it.tab.waitForNodes(1)
}
it.queries = 1
it.replyCh <- closest.entries
@ -130,15 +128,6 @@ func (it *lookup) startQueries() bool {
return it.queries > 0
}
func (it *lookup) slowdown() {
sleep := time.NewTimer(1 * time.Second)
defer sleep.Stop()
select {
case <-sleep.C:
case <-it.tab.closeReq:
}
}
func (it *lookup) query(n *enode.Node, reply chan<- []*enode.Node) {
r, err := it.queryfunc(n)
if !errors.Is(err, errClosed) { // avoid recording failures on shutdown.

View file

@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/enode"
@ -84,6 +85,7 @@ type Table struct {
closeReq chan struct{}
closed chan struct{}
nodeFeed event.FeedOf[*enode.Node]
nodeAddedHook func(*bucket, *tableNode)
nodeRemovedHook func(*bucket, *tableNode)
}
@ -567,6 +569,8 @@ func (tab *Table) nodeAdded(b *bucket, n *tableNode) {
}
n.addedToBucket = time.Now()
tab.revalidation.nodeAdded(tab, n)
tab.nodeFeed.Send(n.Node)
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(b, n)
}
@ -702,3 +706,36 @@ func (tab *Table) deleteNode(n *enode.Node) {
b := tab.bucket(n.ID())
tab.deleteInBucket(b, n.ID())
}
// waitForNodes blocks until the table contains at least n nodes.
func (tab *Table) waitForNodes(n int) bool {
getlength := func() (count int) {
for _, b := range &tab.buckets {
count += len(b.entries)
}
return count
}
var ch chan *enode.Node
for {
tab.mutex.Lock()
if getlength() >= n {
tab.mutex.Unlock()
return true
}
if ch == nil {
// Init subscription.
ch = make(chan *enode.Node)
sub := tab.nodeFeed.Subscribe(ch)
defer sub.Unsubscribe()
}
tab.mutex.Unlock()
// Wait for a node add event.
select {
case <-ch:
case <-tab.closeReq:
return false
}
}
}