From 4bb097b7ffc32256791e55ff16ca50ef83c4609b Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 5 Jun 2025 12:14:35 +0200 Subject: [PATCH] eth, p2p: improve dial speed by pre-fetching dial candidates (#31944) This PR improves the speed of Disc/v4 and Disc/v5 based discovery by adding a prefetch buffer to discovery sources, eliminating slowdowns due to timeouts and rate mismatch between the two processes. Since we now want to filter the discv4 nodes iterator, it is being removed from the default discovery mix in p2p.Server. To keep backwards-compatibility, the default unfiltered discovery iterator will be utilized by the server when no protocol-specific discovery is configured. --------- Signed-off-by: Csaba Kiraly Co-authored-by: Felix Lange --- eth/backend.go | 40 ++++++++++- p2p/enode/iter.go | 168 ++++++++++++++++++++++++++++++++++++++++++---- p2p/server.go | 23 +++++-- 3 files changed, 211 insertions(+), 20 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index ba087c5843..7f9e45edea 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -18,6 +18,7 @@ package eth import ( + "context" "encoding/json" "fmt" "math/big" @@ -62,6 +63,26 @@ import ( gethversion "github.com/ethereum/go-ethereum/version" ) +const ( + // This is the fairness knob for the discovery mixer. When looking for peers, we'll + // wait this long for a single source of candidates before moving on and trying other + // sources. If this timeout expires, the source will be skipped in this round, but it + // will continue to fetch in the background and will have a chance with a new timeout + // in the next rounds, giving it overall more time but a proportionally smaller share. + // We expect a normal source to produce ~10 candidates per second. + discmixTimeout = 100 * time.Millisecond + + // discoveryPrefetchBuffer is the number of peers to pre-fetch from a discovery + // source. It is useful to avoid the negative effects of potential longer timeouts + // in the discovery, keeping dial progress while waiting for the next batch of + // candidates. + discoveryPrefetchBuffer = 32 + + // maxParallelENRRequests is the maximum number of parallel ENR requests that can be + // performed by a disc/v4 source. + maxParallelENRRequests = 16 +) + // Config contains the configuration options of the ETH protocol. // Deprecated: use ethconfig.Config instead. type Config = ethconfig.Config @@ -176,7 +197,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { networkID: networkID, gasPrice: config.Miner.GasPrice, p2pServer: stack.Server(), - discmix: enode.NewFairMix(0), + discmix: enode.NewFairMix(discmixTimeout), shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb), } bcVersion := rawdb.ReadDatabaseVersion(chainDb) @@ -494,10 +515,27 @@ func (s *Ethereum) setupDiscovery() error { s.discmix.AddSource(iter) } + // Add DHT nodes from discv4. + if s.p2pServer.DiscoveryV4() != nil { + iter := s.p2pServer.DiscoveryV4().RandomNodes() + resolverFunc := func(ctx context.Context, enr *enode.Node) *enode.Node { + // RequestENR does not yet support context. It will simply time out. + // If the ENR can't be resolved, RequestENR will return nil. We don't + // care about the specific error here, so we ignore it. + nn, _ := s.p2pServer.DiscoveryV4().RequestENR(enr) + return nn + } + iter = enode.AsyncFilter(iter, resolverFunc, maxParallelENRRequests) + iter = enode.Filter(iter, eth.NewNodeFilter(s.blockchain)) + iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer) + s.discmix.AddSource(iter) + } + // Add DHT nodes from discv5. if s.p2pServer.DiscoveryV5() != nil { filter := eth.NewNodeFilter(s.blockchain) iter := enode.Filter(s.p2pServer.DiscoveryV5().RandomNodes(), filter) + iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer) s.discmix.AddSource(iter) } diff --git a/p2p/enode/iter.go b/p2p/enode/iter.go index 4b7e28929e..f8f79a9436 100644 --- a/p2p/enode/iter.go +++ b/p2p/enode/iter.go @@ -17,6 +17,7 @@ package enode import ( + "context" "sync" "time" ) @@ -59,6 +60,11 @@ func (it sourceIter) NodeSource() string { return it.name } +type iteratorItem struct { + n *Node + source string +} + // ReadNodes reads at most n nodes from the given iterator. The return value contains no // duplicates and no nil values. To prevent looping indefinitely for small repeating node // sequences, this function calls Next at most n times. @@ -152,6 +158,149 @@ func (f *filterIter) Next() bool { return false } +// asyncFilterIter wraps an iterator such that Next only returns nodes for which +// the 'check' function returns a (possibly modified) node. +type asyncFilterIter struct { + it SourceIterator // the iterator to filter + slots chan struct{} // the slots for parallel checking + passed chan iteratorItem // channel to collect passed nodes + cur iteratorItem // buffer to serve the Node call + cancel context.CancelFunc + closeOnce sync.Once +} + +type AsyncFilterFunc func(context.Context, *Node) *Node + +// AsyncFilter creates an iterator which checks nodes in parallel. +// The 'check' function is called on multiple goroutines to filter each node +// from the upstream iterator. When check returns nil, the node will be skipped. +// It can also return a new node to be returned by the iterator instead of the . +func AsyncFilter(it Iterator, check AsyncFilterFunc, workers int) Iterator { + f := &asyncFilterIter{ + it: ensureSourceIter(it), + slots: make(chan struct{}, workers+1), + passed: make(chan iteratorItem), + } + for range cap(f.slots) { + f.slots <- struct{}{} + } + ctx, cancel := context.WithCancel(context.Background()) + f.cancel = cancel + + go func() { + select { + case <-ctx.Done(): + return + case <-f.slots: + } + // read from the iterator and start checking nodes in parallel + // when a node is checked, it will be sent to the passed channel + // and the slot will be released + for f.it.Next() { + node := f.it.Node() + nodeSource := f.it.NodeSource() + + // check the node async, in a separate goroutine + <-f.slots + go func() { + if nn := check(ctx, node); nn != nil { + item := iteratorItem{nn, nodeSource} + select { + case f.passed <- item: + case <-ctx.Done(): // bale out if downstream is already closed and not calling Next + } + } + f.slots <- struct{}{} + }() + } + // the iterator has ended + f.slots <- struct{}{} + }() + + return f +} + +// Next blocks until a node is available or the iterator is closed. +func (f *asyncFilterIter) Next() bool { + var ok bool + f.cur, ok = <-f.passed + return ok +} + +// Node returns the current node. +func (f *asyncFilterIter) Node() *Node { + return f.cur.n +} + +// NodeSource implements IteratorSource. +func (f *asyncFilterIter) NodeSource() string { + return f.cur.source +} + +// Close ends the iterator, also closing the wrapped iterator. +func (f *asyncFilterIter) Close() { + f.closeOnce.Do(func() { + f.it.Close() + f.cancel() + for range cap(f.slots) { + <-f.slots + } + close(f.slots) + close(f.passed) + }) +} + +// bufferIter wraps an iterator and buffers the nodes it returns. +// The buffer is pre-filled with the given size from the wrapped iterator. +type bufferIter struct { + it SourceIterator + buffer chan iteratorItem + head iteratorItem + closeOnce sync.Once +} + +// NewBufferIter creates a new pre-fetch buffer of a given size. +func NewBufferIter(it Iterator, size int) Iterator { + b := bufferIter{ + it: ensureSourceIter(it), + buffer: make(chan iteratorItem, size), + } + + go func() { + // if the wrapped iterator ends, the buffer content will still be served. + defer close(b.buffer) + // If instead the bufferIterator is closed, we bail out of the loop. + for b.it.Next() { + item := iteratorItem{b.it.Node(), b.it.NodeSource()} + b.buffer <- item + } + }() + return &b +} + +func (b *bufferIter) Next() bool { + var ok bool + b.head, ok = <-b.buffer + return ok +} + +func (b *bufferIter) Node() *Node { + return b.head.n +} + +func (b *bufferIter) NodeSource() string { + return b.head.source +} + +func (b *bufferIter) Close() { + b.closeOnce.Do(func() { + b.it.Close() + // Drain buffer and wait for the goroutine to end. + for range b.buffer { + } + }) +} + // FairMix aggregates multiple node iterators. The mixer itself is an iterator which ends // only when Close is called. Source iterators added via AddSource are removed from the // mix when they end. @@ -164,9 +313,9 @@ func (f *filterIter) Next() bool { // It's safe to call AddSource and Close concurrently with Next. type FairMix struct { wg sync.WaitGroup - fromAny chan mixItem + fromAny chan iteratorItem timeout time.Duration - cur mixItem + cur iteratorItem mu sync.Mutex closed chan struct{} @@ -176,15 +325,10 @@ type FairMix struct { type mixSource struct { it SourceIterator - next chan mixItem + next chan iteratorItem timeout time.Duration } -type mixItem struct { - n *Node - source string -} - // NewFairMix creates a mixer. // // The timeout specifies how long the mixer will wait for the next fairly-chosen source @@ -193,7 +337,7 @@ type mixItem struct { // timeout makes the mixer completely fair. func NewFairMix(timeout time.Duration) *FairMix { m := &FairMix{ - fromAny: make(chan mixItem), + fromAny: make(chan iteratorItem), closed: make(chan struct{}), timeout: timeout, } @@ -211,7 +355,7 @@ func (m *FairMix) AddSource(it Iterator) { m.wg.Add(1) source := &mixSource{ it: ensureSourceIter(it), - next: make(chan mixItem), + next: make(chan iteratorItem), timeout: m.timeout, } m.sources = append(m.sources, source) @@ -239,7 +383,7 @@ func (m *FairMix) Close() { // Next returns a node from a random source. func (m *FairMix) Next() bool { - m.cur = mixItem{} + m.cur = iteratorItem{} for { source := m.pickSource() @@ -327,7 +471,7 @@ func (m *FairMix) runSource(closed chan struct{}, s *mixSource) { defer m.wg.Done() defer close(s.next) for s.it.Next() { - item := mixItem{s.it.Node(), s.it.NodeSource()} + item := iteratorItem{s.it.Node(), s.it.NodeSource()} select { case s.next <- item: case m.fromAny <- item: diff --git a/p2p/server.go b/p2p/server.go index f3a58bba29..1f859089af 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -45,11 +45,6 @@ import ( const ( defaultDialTimeout = 15 * time.Second - // This is the fairness knob for the discovery mixer. When looking for peers, we'll - // wait this long for a single source of candidates before moving on and trying other - // sources. - discmixTimeout = 5 * time.Second - // Connectivity defaults. defaultMaxPendingPeers = 50 defaultDialRatio = 3 @@ -447,7 +442,9 @@ func (srv *Server) setupLocalNode() error { } func (srv *Server) setupDiscovery() error { - srv.discmix = enode.NewFairMix(discmixTimeout) + // Set up the discovery source mixer. Here, we don't care about the + // fairness of the mix, it's just for putting the + srv.discmix = enode.NewFairMix(0) // Don't listen on UDP endpoint if DHT is disabled. if srv.NoDiscovery { @@ -483,7 +480,6 @@ func (srv *Server) setupDiscovery() error { return err } srv.discv4 = ntab - srv.discmix.AddSource(ntab.RandomNodes()) } if srv.Config.DiscoveryV5 { cfg := discover.Config{ @@ -506,6 +502,19 @@ func (srv *Server) setupDiscovery() error { added[proto.Name] = true } } + + // Set up default non-protocol-specific discovery feeds if no protocol + // has configured discovery. + if len(added) == 0 { + if srv.discv4 != nil { + it := srv.discv4.RandomNodes() + srv.discmix.AddSource(enode.WithSourceName("discv4-default", it)) + } + if srv.discv5 != nil { + it := srv.discv5.RandomNodes() + srv.discmix.AddSource(enode.WithSourceName("discv5-default", it)) + } + } return nil }