1
0
Fork 0
forked from forks/go-ethereum

p2p/enode: add support for naming iterator sources (#31779)

This adds support for naming the source iterators of FairMix, like so:

  mix.AddSource(enode.WithSourceName("mySource", iter))

The source that produced the latest node is returned by the new NodeSource method.
This commit is contained in:
Felix Lange 2025-05-15 14:17:58 +02:00 committed by GitHub
parent 52dbd206bb
commit 228803c1a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 109 additions and 18 deletions

View file

@ -30,6 +30,35 @@ type Iterator interface {
Close() // ends the iterator
}
// SourceIterator represents a sequence of nodes like [Iterator]
// Each node also has a named 'source'.
type SourceIterator interface {
Iterator
NodeSource() string // source of current node
}
// WithSource attaches a 'source name' to an iterator.
func WithSourceName(name string, it Iterator) SourceIterator {
return sourceIter{it, name}
}
func ensureSourceIter(it Iterator) SourceIterator {
if si, ok := it.(SourceIterator); ok {
return si
}
return WithSourceName("", it)
}
type sourceIter struct {
Iterator
name string
}
// NodeSource implements IteratorSource.
func (it sourceIter) NodeSource() string {
return it.name
}
// ReadNodes reads at most n nodes from the given iterator. The return value contains no
// duplicates and no nil values. To prevent looping indefinitely for small repeating node
// sequences, this function calls Next at most n times.
@ -106,16 +135,16 @@ func (it *sliceIter) Close() {
// Filter wraps an iterator such that Next only returns nodes for which
// the 'check' function returns true.
func Filter(it Iterator, check func(*Node) bool) Iterator {
return &filterIter{it, check}
return &filterIter{ensureSourceIter(it), check}
}
type filterIter struct {
Iterator
SourceIterator
check func(*Node) bool
}
func (f *filterIter) Next() bool {
for f.Iterator.Next() {
for f.SourceIterator.Next() {
if f.check(f.Node()) {
return true
}
@ -135,9 +164,9 @@ func (f *filterIter) Next() bool {
// It's safe to call AddSource and Close concurrently with Next.
type FairMix struct {
wg sync.WaitGroup
fromAny chan *Node
fromAny chan mixItem
timeout time.Duration
cur *Node
cur mixItem
mu sync.Mutex
closed chan struct{}
@ -146,11 +175,16 @@ type FairMix struct {
}
type mixSource struct {
it Iterator
next chan *Node
it SourceIterator
next chan mixItem
timeout time.Duration
}
type mixItem struct {
n *Node
source string
}
// NewFairMix creates a mixer.
//
// The timeout specifies how long the mixer will wait for the next fairly-chosen source
@ -159,7 +193,7 @@ type mixSource struct {
// timeout makes the mixer completely fair.
func NewFairMix(timeout time.Duration) *FairMix {
m := &FairMix{
fromAny: make(chan *Node),
fromAny: make(chan mixItem),
closed: make(chan struct{}),
timeout: timeout,
}
@ -175,7 +209,11 @@ func (m *FairMix) AddSource(it Iterator) {
return
}
m.wg.Add(1)
source := &mixSource{it, make(chan *Node), m.timeout}
source := &mixSource{
it: ensureSourceIter(it),
next: make(chan mixItem),
timeout: m.timeout,
}
m.sources = append(m.sources, source)
go m.runSource(m.closed, source)
}
@ -201,7 +239,7 @@ func (m *FairMix) Close() {
// Next returns a node from a random source.
func (m *FairMix) Next() bool {
m.cur = nil
m.cur = mixItem{}
for {
source := m.pickSource()
@ -217,12 +255,12 @@ func (m *FairMix) Next() bool {
}
select {
case n, ok := <-source.next:
case item, ok := <-source.next:
if ok {
// Here, the timeout is reset to the configured value
// because the source delivered a node.
source.timeout = m.timeout
m.cur = n
m.cur = item
return true
}
// This source has ended.
@ -239,15 +277,20 @@ func (m *FairMix) Next() bool {
// Node returns the current node.
func (m *FairMix) Node() *Node {
return m.cur
return m.cur.n
}
// NodeSource returns the current node's source name.
func (m *FairMix) NodeSource() string {
return m.cur.source
}
// nextFromAny is used when there are no sources or when the 'fair' choice
// doesn't turn up a node quickly enough.
func (m *FairMix) nextFromAny() bool {
n, ok := <-m.fromAny
item, ok := <-m.fromAny
if ok {
m.cur = n
m.cur = item
}
return ok
}
@ -284,10 +327,10 @@ func (m *FairMix) runSource(closed chan struct{}, s *mixSource) {
defer m.wg.Done()
defer close(s.next)
for s.it.Next() {
n := s.it.Node()
item := mixItem{s.it.Node(), s.it.NodeSource()}
select {
case s.next <- n:
case m.fromAny <- n:
case s.next <- item:
case m.fromAny <- item:
case <-closed:
return
}

View file

@ -19,6 +19,7 @@ package enode
import (
"encoding/binary"
"runtime"
"slices"
"sync/atomic"
"testing"
"time"
@ -183,6 +184,53 @@ func TestFairMixRemoveSource(t *testing.T) {
}
}
// This checks that FairMix correctly returns the name of the source that produced the node.
func TestFairMixSourceName(t *testing.T) {
nodes := make([]*Node, 6)
for i := range nodes {
nodes[i] = testNode(uint64(i), uint64(i))
}
mix := NewFairMix(-1)
mix.AddSource(WithSourceName("s1", IterNodes(nodes[0:2])))
mix.AddSource(WithSourceName("s2", IterNodes(nodes[2:4])))
mix.AddSource(WithSourceName("s3", IterNodes(nodes[4:6])))
var names []string
for range nodes {
mix.Next()
names = append(names, mix.NodeSource())
}
want := []string{"s2", "s3", "s1", "s2", "s3", "s1"}
if !slices.Equal(names, want) {
t.Fatalf("wrong names: %v", names)
}
}
// This checks that FairMix returns the name of the source that produced the node,
// even when FairMix instances are nested.
func TestFairMixNestedSourceName(t *testing.T) {
nodes := make([]*Node, 6)
for i := range nodes {
nodes[i] = testNode(uint64(i), uint64(i))
}
mix := NewFairMix(-1)
mix.AddSource(WithSourceName("s1", IterNodes(nodes[0:2])))
submix := NewFairMix(-1)
submix.AddSource(WithSourceName("s2", IterNodes(nodes[2:4])))
submix.AddSource(WithSourceName("s3", IterNodes(nodes[4:6])))
mix.AddSource(submix)
var names []string
for range nodes {
mix.Next()
names = append(names, mix.NodeSource())
}
want := []string{"s3", "s1", "s2", "s1", "s3", "s2"}
if !slices.Equal(names, want) {
t.Fatalf("wrong names: %v", names)
}
}
type blockingIter chan struct{}
func (it blockingIter) Next() bool {