From 228803c1a29acf93c8cd53a29e477d8801fc60ad Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 15 May 2025 14:17:58 +0200 Subject: [PATCH] p2p/enode: add support for naming iterator sources (#31779) This adds support for naming the source iterators of FairMix, like so: mix.AddSource(enode.WithSourceName("mySource", iter)) The source that produced the latest node is returned by the new NodeSource method. --- p2p/enode/iter.go | 79 ++++++++++++++++++++++++++++++++---------- p2p/enode/iter_test.go | 48 +++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 18 deletions(-) diff --git a/p2p/enode/iter.go b/p2p/enode/iter.go index b8ab4a758a..4b7e28929e 100644 --- a/p2p/enode/iter.go +++ b/p2p/enode/iter.go @@ -30,6 +30,35 @@ type Iterator interface { Close() // ends the iterator } +// SourceIterator represents a sequence of nodes like [Iterator] +// Each node also has a named 'source'. +type SourceIterator interface { + Iterator + NodeSource() string // source of current node +} + +// WithSource attaches a 'source name' to an iterator. +func WithSourceName(name string, it Iterator) SourceIterator { + return sourceIter{it, name} +} + +func ensureSourceIter(it Iterator) SourceIterator { + if si, ok := it.(SourceIterator); ok { + return si + } + return WithSourceName("", it) +} + +type sourceIter struct { + Iterator + name string +} + +// NodeSource implements IteratorSource. +func (it sourceIter) NodeSource() string { + return it.name +} + // ReadNodes reads at most n nodes from the given iterator. The return value contains no // duplicates and no nil values. To prevent looping indefinitely for small repeating node // sequences, this function calls Next at most n times. @@ -106,16 +135,16 @@ func (it *sliceIter) Close() { // Filter wraps an iterator such that Next only returns nodes for which // the 'check' function returns true. func Filter(it Iterator, check func(*Node) bool) Iterator { - return &filterIter{it, check} + return &filterIter{ensureSourceIter(it), check} } type filterIter struct { - Iterator + SourceIterator check func(*Node) bool } func (f *filterIter) Next() bool { - for f.Iterator.Next() { + for f.SourceIterator.Next() { if f.check(f.Node()) { return true } @@ -135,9 +164,9 @@ func (f *filterIter) Next() bool { // It's safe to call AddSource and Close concurrently with Next. type FairMix struct { wg sync.WaitGroup - fromAny chan *Node + fromAny chan mixItem timeout time.Duration - cur *Node + cur mixItem mu sync.Mutex closed chan struct{} @@ -146,11 +175,16 @@ type FairMix struct { } type mixSource struct { - it Iterator - next chan *Node + it SourceIterator + next chan mixItem timeout time.Duration } +type mixItem struct { + n *Node + source string +} + // NewFairMix creates a mixer. // // The timeout specifies how long the mixer will wait for the next fairly-chosen source @@ -159,7 +193,7 @@ type mixSource struct { // timeout makes the mixer completely fair. func NewFairMix(timeout time.Duration) *FairMix { m := &FairMix{ - fromAny: make(chan *Node), + fromAny: make(chan mixItem), closed: make(chan struct{}), timeout: timeout, } @@ -175,7 +209,11 @@ func (m *FairMix) AddSource(it Iterator) { return } m.wg.Add(1) - source := &mixSource{it, make(chan *Node), m.timeout} + source := &mixSource{ + it: ensureSourceIter(it), + next: make(chan mixItem), + timeout: m.timeout, + } m.sources = append(m.sources, source) go m.runSource(m.closed, source) } @@ -201,7 +239,7 @@ func (m *FairMix) Close() { // Next returns a node from a random source. func (m *FairMix) Next() bool { - m.cur = nil + m.cur = mixItem{} for { source := m.pickSource() @@ -217,12 +255,12 @@ func (m *FairMix) Next() bool { } select { - case n, ok := <-source.next: + case item, ok := <-source.next: if ok { // Here, the timeout is reset to the configured value // because the source delivered a node. source.timeout = m.timeout - m.cur = n + m.cur = item return true } // This source has ended. @@ -239,15 +277,20 @@ func (m *FairMix) Next() bool { // Node returns the current node. func (m *FairMix) Node() *Node { - return m.cur + return m.cur.n +} + +// NodeSource returns the current node's source name. +func (m *FairMix) NodeSource() string { + return m.cur.source } // nextFromAny is used when there are no sources or when the 'fair' choice // doesn't turn up a node quickly enough. func (m *FairMix) nextFromAny() bool { - n, ok := <-m.fromAny + item, ok := <-m.fromAny if ok { - m.cur = n + m.cur = item } return ok } @@ -284,10 +327,10 @@ func (m *FairMix) runSource(closed chan struct{}, s *mixSource) { defer m.wg.Done() defer close(s.next) for s.it.Next() { - n := s.it.Node() + item := mixItem{s.it.Node(), s.it.NodeSource()} select { - case s.next <- n: - case m.fromAny <- n: + case s.next <- item: + case m.fromAny <- item: case <-closed: return } diff --git a/p2p/enode/iter_test.go b/p2p/enode/iter_test.go index b736ed450a..577f9c2825 100644 --- a/p2p/enode/iter_test.go +++ b/p2p/enode/iter_test.go @@ -19,6 +19,7 @@ package enode import ( "encoding/binary" "runtime" + "slices" "sync/atomic" "testing" "time" @@ -183,6 +184,53 @@ func TestFairMixRemoveSource(t *testing.T) { } } +// This checks that FairMix correctly returns the name of the source that produced the node. +func TestFairMixSourceName(t *testing.T) { + nodes := make([]*Node, 6) + for i := range nodes { + nodes[i] = testNode(uint64(i), uint64(i)) + } + mix := NewFairMix(-1) + mix.AddSource(WithSourceName("s1", IterNodes(nodes[0:2]))) + mix.AddSource(WithSourceName("s2", IterNodes(nodes[2:4]))) + mix.AddSource(WithSourceName("s3", IterNodes(nodes[4:6]))) + + var names []string + for range nodes { + mix.Next() + names = append(names, mix.NodeSource()) + } + want := []string{"s2", "s3", "s1", "s2", "s3", "s1"} + if !slices.Equal(names, want) { + t.Fatalf("wrong names: %v", names) + } +} + +// This checks that FairMix returns the name of the source that produced the node, +// even when FairMix instances are nested. +func TestFairMixNestedSourceName(t *testing.T) { + nodes := make([]*Node, 6) + for i := range nodes { + nodes[i] = testNode(uint64(i), uint64(i)) + } + mix := NewFairMix(-1) + mix.AddSource(WithSourceName("s1", IterNodes(nodes[0:2]))) + submix := NewFairMix(-1) + submix.AddSource(WithSourceName("s2", IterNodes(nodes[2:4]))) + submix.AddSource(WithSourceName("s3", IterNodes(nodes[4:6]))) + mix.AddSource(submix) + + var names []string + for range nodes { + mix.Next() + names = append(names, mix.NodeSource()) + } + want := []string{"s3", "s1", "s2", "s1", "s3", "s2"} + if !slices.Equal(names, want) { + t.Fatalf("wrong names: %v", names) + } +} + type blockingIter chan struct{} func (it blockingIter) Next() bool {