mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-23 08:19:27 +00:00
cmd/devp2p/internal/v5test: fix hive test for discv5 findnode results (#34043)
This fixes the remaining Hive discv5/FindnodeResults failures in the cmd/devp2p/internal/v5test fixture. The issue was in the simulator-side bystander behavior, not in production discovery logic. The existing fixture could get bystanders inserted into the remote table, but under current geth behavior they were not stable enough to remain valid FINDNODE results. In particular, the fixture still had a few protocol/behavior mismatches: - incomplete WHOAREYOU recovery - replies not consistently following the UDP envelope source - incorrect endpoint echoing in PONG - fixture-originated PING using the wrong ENR sequence - bystanders answering background FINDNODE with empty NODES That last point was important because current lookup accounting can treat repeatedly unhelpful FINDNODE interactions as failures. As a result, a bystander could become live via PING/PONG and still later be dropped from the table before the final FindnodeResults assertion. This change updates the fixture so that bystanders behave more like stable discv5 peers: - perform one explicit initial handshake, then switch to passive response handling - resend the exact challenged packet when handling WHOAREYOU - reply to the actual UDP packet source and mirror that source in PONG.ToIP / PONG.ToPort - use the bystander’s own ENR sequence in fixture-originated PING - prefill each bystander with the bystander ENR set and answer FINDNODE from that set The result is that the fixture now forms a small self-consistent lookup environment instead of a set of peers that are live but systematically poor lookup participants.
This commit is contained in:
parent
e1fe4a1a98
commit
c453b99a57
2 changed files with 154 additions and 92 deletions
|
|
@ -257,34 +257,50 @@ that they are returned by FINDNODE.`)
|
||||||
|
|
||||||
// Create bystanders.
|
// Create bystanders.
|
||||||
nodes := make([]*bystander, 5)
|
nodes := make([]*bystander, 5)
|
||||||
added := make(chan enode.ID, len(nodes))
|
liveCh := make(chan enode.ID, len(nodes))
|
||||||
for i := range nodes {
|
for i := range nodes {
|
||||||
nodes[i] = newBystander(t, s, added)
|
nodes[i] = newBystander(t, s, liveCh)
|
||||||
defer nodes[i].close()
|
defer nodes[i].close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get them added to the remote table.
|
// Prefill each bystander with the full bystander set so background FINDNODE
|
||||||
|
// lookups see useful routing data instead of empty responses.
|
||||||
|
known := make([]*enode.Node, 0, len(nodes))
|
||||||
|
for _, bn := range nodes {
|
||||||
|
known = append(known, bn.conn.localNode.Node())
|
||||||
|
}
|
||||||
|
for _, bn := range nodes {
|
||||||
|
bn.known = append([]*enode.Node(nil), known...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until enough bystanders have actually become live, i.e. the remote node
|
||||||
|
// has revalidated them by sending PING and receiving our PONG.
|
||||||
|
requiredLiveNodes := len(nodes)
|
||||||
timeout := 60 * time.Second
|
timeout := 60 * time.Second
|
||||||
timeoutCh := time.After(timeout)
|
timeoutCh := time.After(timeout)
|
||||||
for count := 0; count < len(nodes); {
|
liveSet := make(map[enode.ID]*enode.Node)
|
||||||
|
for len(liveSet) < requiredLiveNodes {
|
||||||
select {
|
select {
|
||||||
case id := <-added:
|
case id := <-liveCh:
|
||||||
t.Logf("bystander node %v added to remote table", id)
|
for _, bn := range nodes {
|
||||||
count++
|
if bn.id() == id {
|
||||||
|
liveSet[id] = bn.conn.localNode.Node()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Logf("bystander node %v became live", id)
|
||||||
case <-timeoutCh:
|
case <-timeoutCh:
|
||||||
t.Errorf("remote added %d bystander nodes in %v, need %d to continue", count, timeout, len(nodes))
|
t.Errorf("remote revalidated %d bystander nodes in %v, need %d to continue", len(liveSet), timeout, requiredLiveNodes)
|
||||||
t.Logf("this can happen if the node has a non-empty table from previous runs")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.Logf("all %d bystander nodes were added", len(nodes))
|
t.Logf("continuing after all %d bystander nodes became live", len(liveSet))
|
||||||
|
|
||||||
// Collect our nodes by distance.
|
// Collect live nodes by distance.
|
||||||
var dists []uint
|
var dists []uint
|
||||||
expect := make(map[enode.ID]*enode.Node)
|
expect := make(map[enode.ID]*enode.Node)
|
||||||
for _, bn := range nodes {
|
for id, n := range liveSet {
|
||||||
n := bn.conn.localNode.Node()
|
expect[id] = n
|
||||||
expect[n.ID()] = n
|
|
||||||
d := uint(enode.LogDist(n.ID(), s.Dest.ID()))
|
d := uint(enode.LogDist(n.ID(), s.Dest.ID()))
|
||||||
if !slices.Contains(dists, d) {
|
if !slices.Contains(dists, d) {
|
||||||
dists = append(dists, d)
|
dists = append(dists, d)
|
||||||
|
|
@ -295,42 +311,63 @@ that they are returned by FINDNODE.`)
|
||||||
t.Log("requesting nodes")
|
t.Log("requesting nodes")
|
||||||
conn, l1 := s.listen1(t)
|
conn, l1 := s.listen1(t)
|
||||||
defer conn.close()
|
defer conn.close()
|
||||||
foundNodes, err := conn.findnode(l1, dists)
|
|
||||||
if err != nil {
|
const maxAttempts = 5
|
||||||
t.Fatal(err)
|
const retryInterval = 2 * time.Second
|
||||||
}
|
|
||||||
t.Logf("remote returned %d nodes for distance list %v", len(foundNodes), dists)
|
for attempt := 1; attempt <= maxAttempts; attempt++ {
|
||||||
for _, n := range foundNodes {
|
foundNodes, err := conn.findnode(l1, dists)
|
||||||
delete(expect, n.ID())
|
if err != nil {
|
||||||
}
|
t.Fatal(err)
|
||||||
if len(expect) > 0 {
|
}
|
||||||
t.Errorf("missing %d nodes in FINDNODE result", len(expect))
|
missing := make(map[enode.ID]struct{})
|
||||||
t.Logf("this can happen if the test is run multiple times in quick succession")
|
for id := range expect {
|
||||||
t.Logf("and the remote node hasn't removed dead nodes from previous runs yet")
|
missing[id] = struct{}{}
|
||||||
} else {
|
}
|
||||||
t.Logf("all %d expected nodes were returned", len(nodes))
|
for _, n := range foundNodes {
|
||||||
|
delete(missing, n.ID())
|
||||||
|
}
|
||||||
|
t.Logf("attempt %d: remote returned %d nodes for distance list %v, missing %d", attempt, len(foundNodes), dists, len(missing))
|
||||||
|
if len(missing) == 0 {
|
||||||
|
t.Logf("all %d expected live nodes were returned", len(expect))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if attempt < maxAttempts {
|
||||||
|
time.Sleep(retryInterval)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
t.Errorf("missing nodes in FINDNODE result after %d attempts", maxAttempts)
|
||||||
|
t.Logf("this can happen if the node has a non-empty table from previous runs")
|
||||||
}
|
}
|
||||||
|
|
||||||
// A bystander is a node whose only purpose is filling a spot in the remote table.
|
// A bystander is a node whose only purpose is filling a spot in the remote table.
|
||||||
type bystander struct {
|
type bystander struct {
|
||||||
dest *enode.Node
|
dest *enode.Node
|
||||||
conn *conn
|
conn *conn
|
||||||
l net.PacketConn
|
l net.PacketConn
|
||||||
|
known []*enode.Node
|
||||||
|
|
||||||
addedCh chan enode.ID
|
liveCh chan enode.ID
|
||||||
done sync.WaitGroup
|
sent map[v5wire.Nonce]v5wire.Packet
|
||||||
|
done sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBystander(t *utesting.T, s *Suite, added chan enode.ID) *bystander {
|
func newBystander(t *utesting.T, s *Suite, live chan enode.ID) *bystander {
|
||||||
conn, l := s.listen1(t)
|
conn, l := s.listen1(t)
|
||||||
conn.setEndpoint(l) // bystander nodes need IP/port to get pinged
|
conn.setEndpoint(l) // bystander nodes need IP/port to get pinged
|
||||||
bn := &bystander{
|
bn := &bystander{
|
||||||
conn: conn,
|
conn: conn,
|
||||||
l: l,
|
l: l,
|
||||||
dest: s.Dest,
|
dest: s.Dest,
|
||||||
addedCh: added,
|
liveCh: live,
|
||||||
|
sent: make(map[v5wire.Nonce]v5wire.Packet),
|
||||||
}
|
}
|
||||||
|
// Establish an initial session and let the remote learn this node before
|
||||||
|
// switching to the passive responder loop below.
|
||||||
|
conn.reqresp(l, &v5wire.Ping{
|
||||||
|
ReqID: conn.nextReqID(),
|
||||||
|
ENRSeq: conn.localNode.Seq(),
|
||||||
|
})
|
||||||
bn.done.Add(1)
|
bn.done.Add(1)
|
||||||
go bn.loop()
|
go bn.loop()
|
||||||
return bn
|
return bn
|
||||||
|
|
@ -351,48 +388,57 @@ func (bn *bystander) close() {
|
||||||
func (bn *bystander) loop() {
|
func (bn *bystander) loop() {
|
||||||
defer bn.done.Done()
|
defer bn.done.Done()
|
||||||
|
|
||||||
var (
|
|
||||||
lastPing time.Time
|
|
||||||
wasAdded bool
|
|
||||||
)
|
|
||||||
for {
|
for {
|
||||||
// Ping the remote node.
|
p, from := bn.conn.readFrom(bn.l)
|
||||||
if !wasAdded && time.Since(lastPing) > 10*time.Second {
|
switch p := p.(type) {
|
||||||
bn.conn.reqresp(bn.l, &v5wire.Ping{
|
case *v5wire.Whoareyou:
|
||||||
ReqID: bn.conn.nextReqID(),
|
p.Node = bn.dest
|
||||||
ENRSeq: bn.dest.Seq(),
|
if resp, ok := bn.sent[p.Nonce]; ok {
|
||||||
})
|
nonce := bn.conn.writeTo(bn.l, resp, p, from)
|
||||||
lastPing = time.Now()
|
delete(bn.sent, p.Nonce)
|
||||||
}
|
bn.sent[nonce] = resp
|
||||||
// Answer packets.
|
} else {
|
||||||
switch p := bn.conn.read(bn.l).(type) {
|
bn.conn.writeTo(bn.l, &v5wire.Ping{
|
||||||
case *v5wire.Ping:
|
ReqID: bn.conn.nextReqID(),
|
||||||
bn.conn.write(bn.l, &v5wire.Pong{
|
ENRSeq: bn.conn.localNode.Seq(),
|
||||||
ReqID: p.ReqID,
|
}, p, from)
|
||||||
ENRSeq: bn.conn.localNode.Seq(),
|
|
||||||
ToIP: bn.dest.IP(),
|
|
||||||
ToPort: uint16(bn.dest.UDP()),
|
|
||||||
}, nil)
|
|
||||||
wasAdded = true
|
|
||||||
bn.notifyAdded()
|
|
||||||
case *v5wire.Findnode:
|
|
||||||
bn.conn.write(bn.l, &v5wire.Nodes{ReqID: p.ReqID, RespCount: 1}, nil)
|
|
||||||
wasAdded = true
|
|
||||||
bn.notifyAdded()
|
|
||||||
case *v5wire.TalkRequest:
|
|
||||||
bn.conn.write(bn.l, &v5wire.TalkResponse{ReqID: p.ReqID}, nil)
|
|
||||||
case *readError:
|
|
||||||
if !netutil.IsTemporaryError(p.err) {
|
|
||||||
bn.conn.logf("shutting down: %v", p.err)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
case *v5wire.Ping:
|
||||||
|
resp := &v5wire.Pong{
|
||||||
|
ReqID: append([]byte(nil), p.ReqID...),
|
||||||
|
ENRSeq: bn.conn.localNode.Seq(),
|
||||||
|
ToIP: from.IP,
|
||||||
|
ToPort: uint16(from.Port),
|
||||||
|
}
|
||||||
|
nonce := bn.conn.writeTo(bn.l, resp, nil, from)
|
||||||
|
bn.sent[nonce] = resp
|
||||||
|
bn.notifyLive()
|
||||||
|
case *v5wire.Findnode:
|
||||||
|
resp := &v5wire.Nodes{ReqID: append([]byte(nil), p.ReqID...), RespCount: 1}
|
||||||
|
for _, n := range bn.known {
|
||||||
|
if slices.Contains(p.Distances, uint(enode.LogDist(n.ID(), bn.id()))) {
|
||||||
|
resp.Nodes = append(resp.Nodes, n.Record())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
nonce := bn.conn.writeTo(bn.l, resp, nil, from)
|
||||||
|
bn.sent[nonce] = resp
|
||||||
|
case *v5wire.TalkRequest:
|
||||||
|
resp := &v5wire.TalkResponse{ReqID: append([]byte(nil), p.ReqID...)}
|
||||||
|
nonce := bn.conn.writeTo(bn.l, resp, nil, from)
|
||||||
|
bn.sent[nonce] = resp
|
||||||
|
case *readError:
|
||||||
|
if netutil.IsTemporaryError(p.err) || v5wire.IsInvalidHeader(p.err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
bn.conn.logf("shutting down: %v", p.err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bn *bystander) notifyAdded() {
|
func (bn *bystander) notifyLive() {
|
||||||
if bn.addedCh != nil {
|
if bn.liveCh != nil {
|
||||||
bn.addedCh <- bn.id()
|
bn.liveCh <- bn.id()
|
||||||
bn.addedCh = nil
|
bn.liveCh = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -127,14 +127,16 @@ func (tc *conn) nextReqID() []byte {
|
||||||
// The request is retried if a handshake is requested.
|
// The request is retried if a handshake is requested.
|
||||||
func (tc *conn) reqresp(c net.PacketConn, req v5wire.Packet) v5wire.Packet {
|
func (tc *conn) reqresp(c net.PacketConn, req v5wire.Packet) v5wire.Packet {
|
||||||
reqnonce := tc.write(c, req, nil)
|
reqnonce := tc.write(c, req, nil)
|
||||||
switch resp := tc.read(c).(type) {
|
resp, from := tc.readFrom(c)
|
||||||
|
switch resp := resp.(type) {
|
||||||
case *v5wire.Whoareyou:
|
case *v5wire.Whoareyou:
|
||||||
if resp.Nonce != reqnonce {
|
if resp.Nonce != reqnonce {
|
||||||
return readErrorf("wrong nonce %x in WHOAREYOU (want %x)", resp.Nonce[:], reqnonce[:])
|
return readErrorf("wrong nonce %x in WHOAREYOU (want %x)", resp.Nonce[:], reqnonce[:])
|
||||||
}
|
}
|
||||||
resp.Node = tc.remote
|
resp.Node = tc.remote
|
||||||
tc.write(c, req, resp)
|
tc.writeTo(c, req, resp, from)
|
||||||
return tc.read(c)
|
resp2, _ := tc.readFrom(c)
|
||||||
|
return resp2
|
||||||
default:
|
default:
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
|
@ -150,21 +152,24 @@ func (tc *conn) findnode(c net.PacketConn, dists []uint) ([]*enode.Node, error)
|
||||||
results []*enode.Node
|
results []*enode.Node
|
||||||
)
|
)
|
||||||
for n := 1; n > 0; {
|
for n := 1; n > 0; {
|
||||||
switch resp := tc.read(c).(type) {
|
resp, from := tc.readFrom(c)
|
||||||
|
switch resp := resp.(type) {
|
||||||
case *v5wire.Whoareyou:
|
case *v5wire.Whoareyou:
|
||||||
// Handle handshake.
|
// Handle handshake.
|
||||||
if resp.Nonce == reqnonce {
|
if resp.Nonce == reqnonce {
|
||||||
resp.Node = tc.remote
|
resp.Node = tc.remote
|
||||||
tc.write(c, findnode, resp)
|
tc.writeTo(c, findnode, resp, from)
|
||||||
} else {
|
} else {
|
||||||
return nil, fmt.Errorf("unexpected WHOAREYOU (nonce %x), waiting for NODES", resp.Nonce[:])
|
return nil, fmt.Errorf("unexpected WHOAREYOU (nonce %x), waiting for NODES", resp.Nonce[:])
|
||||||
}
|
}
|
||||||
case *v5wire.Ping:
|
case *v5wire.Ping:
|
||||||
// Handle ping from remote.
|
// Handle ping from remote.
|
||||||
tc.write(c, &v5wire.Pong{
|
tc.writeTo(c, &v5wire.Pong{
|
||||||
ReqID: resp.ReqID,
|
ReqID: resp.ReqID,
|
||||||
ENRSeq: tc.localNode.Seq(),
|
ENRSeq: tc.localNode.Seq(),
|
||||||
}, nil)
|
ToIP: from.IP,
|
||||||
|
ToPort: uint16(from.Port),
|
||||||
|
}, nil, from)
|
||||||
case *v5wire.Nodes:
|
case *v5wire.Nodes:
|
||||||
// Got NODES! Check request ID.
|
// Got NODES! Check request ID.
|
||||||
if !bytes.Equal(resp.ReqID, findnode.ReqID) {
|
if !bytes.Equal(resp.ReqID, findnode.ReqID) {
|
||||||
|
|
@ -200,11 +205,16 @@ func (tc *conn) findnode(c net.PacketConn, dists []uint) ([]*enode.Node, error)
|
||||||
|
|
||||||
// write sends a packet on the given connection.
|
// write sends a packet on the given connection.
|
||||||
func (tc *conn) write(c net.PacketConn, p v5wire.Packet, challenge *v5wire.Whoareyou) v5wire.Nonce {
|
func (tc *conn) write(c net.PacketConn, p v5wire.Packet, challenge *v5wire.Whoareyou) v5wire.Nonce {
|
||||||
|
return tc.writeTo(c, p, challenge, tc.remoteAddr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeTo sends a packet on the given connection to the given UDP address.
|
||||||
|
func (tc *conn) writeTo(c net.PacketConn, p v5wire.Packet, challenge *v5wire.Whoareyou, to *net.UDPAddr) v5wire.Nonce {
|
||||||
packet, nonce, err := tc.codec.Encode(tc.remote.ID(), tc.remoteAddr.String(), p, challenge)
|
packet, nonce, err := tc.codec.Encode(tc.remote.ID(), tc.remoteAddr.String(), p, challenge)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("can't encode %v packet: %v", p.Name(), err))
|
panic(fmt.Errorf("can't encode %v packet: %v", p.Name(), err))
|
||||||
}
|
}
|
||||||
if _, err := c.WriteTo(packet, tc.remoteAddr); err != nil {
|
if _, err := c.WriteTo(packet, to); err != nil {
|
||||||
tc.logf("Can't send %s: %v", p.Name(), err)
|
tc.logf("Can't send %s: %v", p.Name(), err)
|
||||||
} else {
|
} else {
|
||||||
tc.logf(">> %s", p.Name())
|
tc.logf(">> %s", p.Name())
|
||||||
|
|
@ -214,24 +224,30 @@ func (tc *conn) write(c net.PacketConn, p v5wire.Packet, challenge *v5wire.Whoar
|
||||||
|
|
||||||
// read waits for an incoming packet on the given connection.
|
// read waits for an incoming packet on the given connection.
|
||||||
func (tc *conn) read(c net.PacketConn) v5wire.Packet {
|
func (tc *conn) read(c net.PacketConn) v5wire.Packet {
|
||||||
|
p, _ := tc.readFrom(c)
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// readFrom waits for an incoming packet and returns its source address.
|
||||||
|
func (tc *conn) readFrom(c net.PacketConn) (v5wire.Packet, *net.UDPAddr) {
|
||||||
buf := make([]byte, 1280)
|
buf := make([]byte, 1280)
|
||||||
if err := c.SetReadDeadline(time.Now().Add(waitTime)); err != nil {
|
if err := c.SetReadDeadline(time.Now().Add(waitTime)); err != nil {
|
||||||
return &readError{err}
|
return &readError{err}, nil
|
||||||
}
|
}
|
||||||
n, _, err := c.ReadFrom(buf)
|
n, from, err := c.ReadFrom(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &readError{err}
|
return &readError{err}, nil
|
||||||
}
|
}
|
||||||
// Always use tc.remoteAddr for session lookup. The actual source address of
|
udpFrom, _ := from.(*net.UDPAddr)
|
||||||
// the packet may differ from tc.remoteAddr when the remote node is reachable
|
// Use tc.remoteAddr for codec/session lookup because the fixture keys sessions
|
||||||
// via multiple networks (e.g. Docker bridge vs. overlay), but the codec's
|
// by the advertised endpoint, but return the actual UDP source so responses can
|
||||||
// session cache is keyed by the address used during Encode.
|
// comply with the spec and go back to the request envelope address.
|
||||||
_, _, p, err := tc.codec.Decode(buf[:n], tc.remoteAddr.String())
|
_, _, p, err := tc.codec.Decode(buf[:n], tc.remoteAddr.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &readError{err}
|
return &readError{err}, udpFrom
|
||||||
}
|
}
|
||||||
tc.logf("<< %s", p.Name())
|
tc.logf("<< %s", p.Name())
|
||||||
return p
|
return p, udpFrom
|
||||||
}
|
}
|
||||||
|
|
||||||
// logf prints to the test log.
|
// logf prints to the test log.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue