From 2bb6edd07bc785eb8de6a3de0a0244c9e9ce2cf9 Mon Sep 17 00:00:00 2001 From: JukLee0ira Date: Sun, 19 Jan 2025 15:43:18 +0800 Subject: [PATCH] p2p, p2p/discover: add dial metrics (#27621) --- p2p/dial.go | 5 ++-- p2p/discover/metrics.go | 37 +++++++++++++++++++++++++ p2p/discover/table.go | 43 ++++++++++++++++++++++++++--- p2p/discover/table_test.go | 9 ------- p2p/discover/udp.go | 2 +- p2p/discover/udp_test.go | 2 +- p2p/metrics.go | 55 +++++++++++++++++++++++++++++++------- p2p/server.go | 17 ++++++++++-- p2p/server_test.go | 9 +++---- 9 files changed, 147 insertions(+), 32 deletions(-) create mode 100644 p2p/discover/metrics.go diff --git a/p2p/dial.go b/p2p/dial.go index 14d9e222ee..320874d2d7 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -365,12 +365,13 @@ type dialError struct { // dial performs the actual connection attempt. func (t *dialTask) dial(srv *Server, dest *discover.Node) error { + dialMeter.Mark(1) fd, err := srv.Dialer.Dial(dest) if err != nil { + dialConnectionError.Mark(1) return &dialError{err} } - mfd := newMeteredConn(fd, false) - return srv.SetupConn(mfd, t.flags, dest) + return srv.SetupConn(newMeteredConn(fd), t.flags, dest) } func (t *dialTask) String() string { diff --git a/p2p/discover/metrics.go b/p2p/discover/metrics.go new file mode 100644 index 0000000000..2ab686e14c --- /dev/null +++ b/p2p/discover/metrics.go @@ -0,0 +1,37 @@ +// Copyright 2023 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package discover + +import ( + "fmt" + + "github.com/XinFinOrg/XDPoSChain/metrics" +) + +const ( + moduleName = "discover" +) + +var ( + bucketsCounter []*metrics.Counter +) + +func init() { + for i := 0; i < nBuckets; i++ { + bucketsCounter = append(bucketsCounter, metrics.NewRegisteredCounter(fmt.Sprintf("%s/bucket/%d/count", moduleName, i), nil)) + } +} diff --git a/p2p/discover/table.go b/p2p/discover/table.go index b930cdf0b8..7aafbb24b5 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -37,6 +37,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/crypto" "github.com/XinFinOrg/XDPoSChain/log" + "github.com/XinFinOrg/XDPoSChain/metrics" "github.com/XinFinOrg/XDPoSChain/p2p/netutil" ) @@ -86,7 +87,8 @@ type Table struct { bonding map[NodeID]*bondproc bondslots chan struct{} // limits total number of active bonding processes - nodeAddedHook func(*Node) // for testing + nodeAddedHook func(*bucket, *Node) + nodeRemovedHook func(*bucket, *Node) net transport self *Node // metadata of the local node @@ -114,6 +116,7 @@ type bucket struct { entries []*Node // live entries, sorted by time of last contact replacements []*Node // recently seen nodes to be used if revalidation fails ips netutil.DistinctNetSet + index int } func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) { @@ -145,7 +148,8 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string } for i := range tab.buckets { tab.buckets[i] = &bucket{ - ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit}, + index: i, + ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit}, } } tab.seedRand() @@ -158,6 +162,22 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string return tab, nil } +func newMeteredTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) { + tab, err := newTable(t, ourID, ourAddr, nodeDBPath, bootnodes) + if err != nil { + return nil, err + } + if metrics.Enabled() { + tab.nodeAddedHook = func(b *bucket, n *Node) { + bucketsCounter[b.index].Inc(1) + } + tab.nodeRemovedHook = func(b *bucket, n *Node) { + bucketsCounter[b.index].Dec(1) + } + } + return tab, nil +} + func (tab *Table) seedRand() { var b [8]byte crand.Read(b[:]) @@ -814,14 +834,31 @@ func (tab *Table) bumpOrAdd(b *bucket, n *Node) bool { b.replacements = deleteNode(b.replacements, n) n.addedAt = time.Now() if tab.nodeAddedHook != nil { - tab.nodeAddedHook(n) + tab.nodeAddedHook(b, n) } return true } func (tab *Table) deleteInBucket(b *bucket, n *Node) { + // Check if the node is actually in the bucket so the removed hook + // isn't called multiple times for the same node. + if !contains(b.entries, n.ID) { + return + } b.entries = deleteNode(b.entries, n) tab.removeIP(b, n.IP) + if tab.nodeRemovedHook != nil { + tab.nodeRemovedHook(b, n) + } +} + +func contains(ns []*Node, id NodeID) bool { + for _, n := range ns { + if n.ID == id { + return true + } + } + return false } // pushNode adds n to the front of list, keeping at most max items. diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 38f22880c8..98afa2d008 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -641,15 +641,6 @@ func sortedByDistanceTo(distbase common.Hash, slice []*Node) bool { return true } -func contains(ns []*Node, id NodeID) bool { - for _, n := range ns { - if n.ID == id { - return true - } - } - return false -} - // gen wraps quick.Value so it's easier to use. // it generates a random value of the given value's type. func gen(typ interface{}, rand *rand.Rand) interface{} { diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index bb541ab07a..4cfa8e6be8 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -258,7 +258,7 @@ func newUDP(c conn, cfg Config) (*Table, *udp, error) { } // TODO: separate TCP port udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port)) - tab, err := newTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes) + tab, err := newMeteredTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes) if err != nil { return nil, nil, err } diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go index a559a1eeee..040b177c20 100644 --- a/p2p/discover/udp_test.go +++ b/p2p/discover/udp_test.go @@ -325,7 +325,7 @@ func TestUDP_findnodeMultiReply(t *testing.T) { func TestUDP_successfulPing(t *testing.T) { test := newUDPTest(t) added := make(chan *Node, 1) - test.table.nodeAddedHook = func(n *Node) { added <- n } + test.table.nodeAddedHook = func(b *bucket, n *Node) { added <- n } defer test.table.Close() // The remote side sends a ping packet to initiate the exchange. diff --git a/p2p/metrics.go b/p2p/metrics.go index febff6c4ec..290a215659 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -19,18 +19,61 @@ package p2p import ( + "errors" "net" "github.com/XinFinOrg/XDPoSChain/metrics" ) var ( - ingressConnectMeter = metrics.NewRegisteredMeter("p2p/InboundConnects", nil) ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/InboundTraffic", nil) - egressConnectMeter = metrics.NewRegisteredMeter("p2p/OutboundConnects", nil) egressTrafficMeter = metrics.NewRegisteredMeter("p2p/OutboundTraffic", nil) ) +var ( + activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil) + + serveMeter = metrics.NewRegisteredMeter("p2p/serves", nil) + serveSuccessMeter = metrics.NewRegisteredMeter("p2p/serves/success", nil) + dialMeter = metrics.NewRegisteredMeter("p2p/dials", nil) + dialSuccessMeter = metrics.NewRegisteredMeter("p2p/dials/success", nil) + dialConnectionError = metrics.NewRegisteredMeter("p2p/dials/error/connection", nil) + + // handshake error meters + dialTooManyPeers = metrics.NewRegisteredMeter("p2p/dials/error/saturated", nil) + dialAlreadyConnected = metrics.NewRegisteredMeter("p2p/dials/error/known", nil) + dialSelf = metrics.NewRegisteredMeter("p2p/dials/error/self", nil) + dialUselessPeer = metrics.NewRegisteredMeter("p2p/dials/error/useless", nil) + dialUnexpectedIdentity = metrics.NewRegisteredMeter("p2p/dials/error/id/unexpected", nil) + dialEncHandshakeError = metrics.NewRegisteredMeter("p2p/dials/error/rlpx/enc", nil) + dialProtoHandshakeError = metrics.NewRegisteredMeter("p2p/dials/error/rlpx/proto", nil) +) + +func markDialError(err error) { + if !metrics.Enabled() { + return + } + if err2 := errors.Unwrap(err); err2 != nil { + err = err2 + } + switch err { + case DiscTooManyPeers: + dialTooManyPeers.Mark(1) + case DiscAlreadyConnected: + dialAlreadyConnected.Mark(1) + case DiscSelf: + dialSelf.Mark(1) + case DiscUselessPeer: + dialUselessPeer.Mark(1) + case DiscUnexpectedIdentity: + dialUnexpectedIdentity.Mark(1) + case errEncHandshakeError: + dialEncHandshakeError.Mark(1) + case errProtoHandshakeError: + dialProtoHandshakeError.Mark(1) + } +} + // meteredConn is a wrapper around a network TCP connection that meters both the // inbound and outbound network traffic. type meteredConn struct { @@ -40,17 +83,11 @@ type meteredConn struct { // newMeteredConn creates a new metered connection, also bumping the ingress or // egress connection meter. If the metrics system is disabled, this function // returns the original object. -func newMeteredConn(conn net.Conn, ingress bool) net.Conn { +func newMeteredConn(conn net.Conn) net.Conn { // Short circuit if metrics are disabled if !metrics.Enabled() { return conn } - // Otherwise bump the connection counters and wrap the connection - if ingress { - ingressConnectMeter.Mark(1) - } else { - egressConnectMeter.Mark(1) - } return &meteredConn{conn.(*net.TCPConn)} } diff --git a/p2p/server.go b/p2p/server.go index fc5b40a9ab..0aa148d0d1 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -51,7 +51,11 @@ const ( frameWriteTimeout = 20 * time.Second ) -var errServerStopped = errors.New("server stopped") +var ( + errServerStopped = errors.New("server stopped") + errEncHandshakeError = errors.New("rlpx enc error") + errProtoHandshakeError = errors.New("rlpx proto error") +) // Config holds Server options. type Config struct { @@ -702,7 +706,11 @@ running: } if p.Inbound() { inboundCount++ + serveSuccessMeter.Mark(1) + } else { + dialSuccessMeter.Mark(1) } + activePeerGauge.Inc(1) } // The dialer logic relies on the assumption that // dial tasks complete after the peer has been added or @@ -720,6 +728,7 @@ running: if pd.Inbound() { inboundCount-- } + activePeerGauge.Dec(1) } } @@ -839,7 +848,8 @@ func (srv *Server) listenLoop() { } } - fd = newMeteredConn(fd, true) + fd = newMeteredConn(fd) + serveMeter.Mark(1) srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) go func() { srv.SetupConn(fd, inboundConn, nil) @@ -859,6 +869,9 @@ func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Nod c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)} err := srv.setupConn(c, flags, dialDest) if err != nil { + if !c.is(inboundConn) { + markDialError(err) + } c.close(err) srv.log.Trace("Setting up connection failed", "id", c.id, "err", err) } diff --git a/p2p/server_test.go b/p2p/server_test.go index 49739f3b84..79c3c2a2f1 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -18,7 +18,6 @@ package p2p import ( "crypto/ecdsa" - "errors" "math/rand" "net" "reflect" @@ -506,10 +505,10 @@ func TestServerSetupConn(t *testing.T) { wantCloseErr: errServerStopped, }, { - tt: &setupTransport{id: id, encHandshakeErr: errors.New("read error")}, + tt: &setupTransport{id: id, encHandshakeErr: errEncHandshakeError}, flags: inboundConn, wantCalls: "doEncHandshake,close,", - wantCloseErr: errors.New("read error"), + wantCloseErr: errEncHandshakeError, }, { tt: &setupTransport{id: id}, @@ -526,11 +525,11 @@ func TestServerSetupConn(t *testing.T) { wantCloseErr: DiscUnexpectedIdentity, }, { - tt: &setupTransport{id: id, protoHandshakeErr: errors.New("foo")}, + tt: &setupTransport{id: id, protoHandshakeErr: errProtoHandshakeError}, dialDest: &discover.Node{ID: id}, flags: dynDialedConn, wantCalls: "doEncHandshake,doProtoHandshake,close,", - wantCloseErr: errors.New("foo"), + wantCloseErr: errProtoHandshakeError, }, { tt: &setupTransport{id: srvid, phs: &protoHandshake{ID: srvid}},