cmd/devp2p/v5test: fix envelope-based discv5 replies

This commit is contained in:
Csaba Kiraly 2026-03-18 07:53:50 +00:00
parent d1b04ab2b3
commit 827689c146
No known key found for this signature in database
GPG key ID: 0FE274EE8C95166E
2 changed files with 80 additions and 58 deletions

View file

@ -257,34 +257,40 @@ that they are returned by FINDNODE.`)
// Create bystanders.
nodes := make([]*bystander, 5)
added := make(chan enode.ID, len(nodes))
liveCh := make(chan enode.ID, len(nodes))
for i := range nodes {
nodes[i] = newBystander(t, s, added)
nodes[i] = newBystander(t, s, liveCh)
defer nodes[i].close()
}
// Get them added to the remote table.
// Wait until enough bystanders have actually become live, i.e. the remote node
// has revalidated them by sending PING and receiving our PONG.
const minLiveNodes = 3
timeout := 60 * time.Second
timeoutCh := time.After(timeout)
for count := 0; count < len(nodes); {
liveSet := make(map[enode.ID]*enode.Node)
for len(liveSet) < minLiveNodes {
select {
case id := <-added:
t.Logf("bystander node %v added to remote table", id)
count++
case id := <-liveCh:
for _, bn := range nodes {
if bn.id() == id {
liveSet[id] = bn.conn.localNode.Node()
break
}
}
t.Logf("bystander node %v became live", id)
case <-timeoutCh:
t.Errorf("remote added %d bystander nodes in %v, need %d to continue", count, timeout, len(nodes))
t.Logf("this can happen if the node has a non-empty table from previous runs")
t.Errorf("remote revalidated %d bystander nodes in %v, need %d to continue", len(liveSet), timeout, minLiveNodes)
return
}
}
t.Logf("all %d bystander nodes were added", len(nodes))
t.Logf("continuing after %d bystander nodes became live", len(liveSet))
// Collect our nodes by distance.
// Collect live nodes by distance.
var dists []uint
expect := make(map[enode.ID]*enode.Node)
for _, bn := range nodes {
n := bn.conn.localNode.Node()
expect[n.ID()] = n
for id, n := range liveSet {
expect[id] = n
d := uint(enode.LogDist(n.ID(), s.Dest.ID()))
if !slices.Contains(dists, d) {
dists = append(dists, d)
@ -313,7 +319,7 @@ that they are returned by FINDNODE.`)
}
t.Logf("attempt %d: remote returned %d nodes for distance list %v, missing %d", attempt, len(foundNodes), dists, len(missing))
if len(missing) == 0 {
t.Logf("all %d expected nodes were returned", len(nodes))
t.Logf("all %d expected live nodes were returned", len(expect))
return
}
if attempt < maxAttempts {
@ -330,26 +336,26 @@ type bystander struct {
conn *conn
l net.PacketConn
addedCh chan enode.ID
sent map[v5wire.Nonce]v5wire.Packet
done sync.WaitGroup
liveCh chan enode.ID
sent map[v5wire.Nonce]v5wire.Packet
done sync.WaitGroup
}
func newBystander(t *utesting.T, s *Suite, added chan enode.ID) *bystander {
func newBystander(t *utesting.T, s *Suite, live chan enode.ID) *bystander {
conn, l := s.listen1(t)
conn.setEndpoint(l) // bystander nodes need IP/port to get pinged
bn := &bystander{
conn: conn,
l: l,
dest: s.Dest,
addedCh: added,
sent: make(map[v5wire.Nonce]v5wire.Packet),
conn: conn,
l: l,
dest: s.Dest,
liveCh: live,
sent: make(map[v5wire.Nonce]v5wire.Packet),
}
// Establish an initial session and let the remote learn this node before
// switching to the passive responder loop below.
conn.reqresp(l, &v5wire.Ping{
ReqID: conn.nextReqID(),
ENRSeq: s.Dest.Seq(),
ENRSeq: conn.localNode.Seq(),
})
bn.done.Add(1)
go bn.loop()
@ -372,37 +378,37 @@ func (bn *bystander) loop() {
defer bn.done.Done()
for {
switch p := bn.conn.read(bn.l).(type) {
p, from := bn.conn.readFrom(bn.l)
switch p := p.(type) {
case *v5wire.Whoareyou:
p.Node = bn.dest
if resp, ok := bn.sent[p.Nonce]; ok {
nonce := bn.conn.write(bn.l, resp, p)
nonce := bn.conn.writeTo(bn.l, resp, p, from)
delete(bn.sent, p.Nonce)
bn.sent[nonce] = resp
} else {
bn.conn.write(bn.l, &v5wire.Ping{
bn.conn.writeTo(bn.l, &v5wire.Ping{
ReqID: bn.conn.nextReqID(),
ENRSeq: bn.dest.Seq(),
}, p)
ENRSeq: bn.conn.localNode.Seq(),
}, p, from)
}
case *v5wire.Ping:
resp := &v5wire.Pong{
ReqID: append([]byte(nil), p.ReqID...),
ENRSeq: bn.conn.localNode.Seq(),
ToIP: bn.dest.IP(),
ToPort: uint16(bn.dest.UDP()),
ToIP: from.IP,
ToPort: uint16(from.Port),
}
nonce := bn.conn.write(bn.l, resp, nil)
nonce := bn.conn.writeTo(bn.l, resp, nil, from)
bn.sent[nonce] = resp
bn.notifyAdded()
bn.notifyLive()
case *v5wire.Findnode:
resp := &v5wire.Nodes{ReqID: append([]byte(nil), p.ReqID...), RespCount: 1}
nonce := bn.conn.write(bn.l, resp, nil)
nonce := bn.conn.writeTo(bn.l, resp, nil, from)
bn.sent[nonce] = resp
bn.notifyAdded()
case *v5wire.TalkRequest:
resp := &v5wire.TalkResponse{ReqID: append([]byte(nil), p.ReqID...)}
nonce := bn.conn.write(bn.l, resp, nil)
nonce := bn.conn.writeTo(bn.l, resp, nil, from)
bn.sent[nonce] = resp
case *readError:
if netutil.IsTemporaryError(p.err) || v5wire.IsInvalidHeader(p.err) {
@ -414,9 +420,9 @@ func (bn *bystander) loop() {
}
}
func (bn *bystander) notifyAdded() {
if bn.addedCh != nil {
bn.addedCh <- bn.id()
bn.addedCh = nil
func (bn *bystander) notifyLive() {
if bn.liveCh != nil {
bn.liveCh <- bn.id()
bn.liveCh = nil
}
}

View file

@ -127,14 +127,16 @@ func (tc *conn) nextReqID() []byte {
// The request is retried if a handshake is requested.
func (tc *conn) reqresp(c net.PacketConn, req v5wire.Packet) v5wire.Packet {
reqnonce := tc.write(c, req, nil)
switch resp := tc.read(c).(type) {
resp, from := tc.readFrom(c)
switch resp := resp.(type) {
case *v5wire.Whoareyou:
if resp.Nonce != reqnonce {
return readErrorf("wrong nonce %x in WHOAREYOU (want %x)", resp.Nonce[:], reqnonce[:])
}
resp.Node = tc.remote
tc.write(c, req, resp)
return tc.read(c)
tc.writeTo(c, req, resp, from)
resp2, _ := tc.readFrom(c)
return resp2
default:
return resp
}
@ -150,21 +152,24 @@ func (tc *conn) findnode(c net.PacketConn, dists []uint) ([]*enode.Node, error)
results []*enode.Node
)
for n := 1; n > 0; {
switch resp := tc.read(c).(type) {
resp, from := tc.readFrom(c)
switch resp := resp.(type) {
case *v5wire.Whoareyou:
// Handle handshake.
if resp.Nonce == reqnonce {
resp.Node = tc.remote
tc.write(c, findnode, resp)
tc.writeTo(c, findnode, resp, from)
} else {
return nil, fmt.Errorf("unexpected WHOAREYOU (nonce %x), waiting for NODES", resp.Nonce[:])
}
case *v5wire.Ping:
// Handle ping from remote.
tc.write(c, &v5wire.Pong{
tc.writeTo(c, &v5wire.Pong{
ReqID: resp.ReqID,
ENRSeq: tc.localNode.Seq(),
}, nil)
ToIP: from.IP,
ToPort: uint16(from.Port),
}, nil, from)
case *v5wire.Nodes:
// Got NODES! Check request ID.
if !bytes.Equal(resp.ReqID, findnode.ReqID) {
@ -200,11 +205,16 @@ func (tc *conn) findnode(c net.PacketConn, dists []uint) ([]*enode.Node, error)
// write sends a packet on the given connection.
func (tc *conn) write(c net.PacketConn, p v5wire.Packet, challenge *v5wire.Whoareyou) v5wire.Nonce {
return tc.writeTo(c, p, challenge, tc.remoteAddr)
}
// writeTo sends a packet on the given connection to the given UDP address.
func (tc *conn) writeTo(c net.PacketConn, p v5wire.Packet, challenge *v5wire.Whoareyou, to *net.UDPAddr) v5wire.Nonce {
packet, nonce, err := tc.codec.Encode(tc.remote.ID(), tc.remoteAddr.String(), p, challenge)
if err != nil {
panic(fmt.Errorf("can't encode %v packet: %v", p.Name(), err))
}
if _, err := c.WriteTo(packet, tc.remoteAddr); err != nil {
if _, err := c.WriteTo(packet, to); err != nil {
tc.logf("Can't send %s: %v", p.Name(), err)
} else {
tc.logf(">> %s", p.Name())
@ -214,24 +224,30 @@ func (tc *conn) write(c net.PacketConn, p v5wire.Packet, challenge *v5wire.Whoar
// read waits for an incoming packet on the given connection.
func (tc *conn) read(c net.PacketConn) v5wire.Packet {
p, _ := tc.readFrom(c)
return p
}
// readFrom waits for an incoming packet and returns its source address.
func (tc *conn) readFrom(c net.PacketConn) (v5wire.Packet, *net.UDPAddr) {
buf := make([]byte, 1280)
if err := c.SetReadDeadline(time.Now().Add(waitTime)); err != nil {
return &readError{err}
return &readError{err}, nil
}
n, _, err := c.ReadFrom(buf)
n, from, err := c.ReadFrom(buf)
if err != nil {
return &readError{err}
return &readError{err}, nil
}
// Always use tc.remoteAddr for session lookup. The actual source address of
// the packet may differ from tc.remoteAddr when the remote node is reachable
// via multiple networks (e.g. Docker bridge vs. overlay), but the codec's
// session cache is keyed by the address used during Encode.
udpFrom, _ := from.(*net.UDPAddr)
// Use tc.remoteAddr for codec/session lookup because the fixture keys sessions
// by the advertised endpoint, but return the actual UDP source so responses can
// comply with the spec and go back to the request envelope address.
_, _, p, err := tc.codec.Decode(buf[:n], tc.remoteAddr.String())
if err != nil {
return &readError{err}
return &readError{err}, udpFrom
}
tc.logf("<< %s", p.Name())
return p
return p, udpFrom
}
// logf prints to the test log.