mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 21:31:37 +00:00
p2p, p2p/discover: add dial metrics (#27621)
This commit is contained in:
parent
a52388550c
commit
2bb6edd07b
9 changed files with 147 additions and 32 deletions
|
|
@ -365,12 +365,13 @@ type dialError struct {
|
|||
|
||||
// dial performs the actual connection attempt.
|
||||
func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
|
||||
dialMeter.Mark(1)
|
||||
fd, err := srv.Dialer.Dial(dest)
|
||||
if err != nil {
|
||||
dialConnectionError.Mark(1)
|
||||
return &dialError{err}
|
||||
}
|
||||
mfd := newMeteredConn(fd, false)
|
||||
return srv.SetupConn(mfd, t.flags, dest)
|
||||
return srv.SetupConn(newMeteredConn(fd), t.flags, dest)
|
||||
}
|
||||
|
||||
func (t *dialTask) String() string {
|
||||
|
|
|
|||
37
p2p/discover/metrics.go
Normal file
37
p2p/discover/metrics.go
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
// Copyright 2023 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package discover
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
moduleName = "discover"
|
||||
)
|
||||
|
||||
var (
|
||||
bucketsCounter []*metrics.Counter
|
||||
)
|
||||
|
||||
func init() {
|
||||
for i := 0; i < nBuckets; i++ {
|
||||
bucketsCounter = append(bucketsCounter, metrics.NewRegisteredCounter(fmt.Sprintf("%s/bucket/%d/count", moduleName, i), nil))
|
||||
}
|
||||
}
|
||||
|
|
@ -37,6 +37,7 @@ import (
|
|||
"github.com/XinFinOrg/XDPoSChain/common"
|
||||
"github.com/XinFinOrg/XDPoSChain/crypto"
|
||||
"github.com/XinFinOrg/XDPoSChain/log"
|
||||
"github.com/XinFinOrg/XDPoSChain/metrics"
|
||||
"github.com/XinFinOrg/XDPoSChain/p2p/netutil"
|
||||
)
|
||||
|
||||
|
|
@ -86,7 +87,8 @@ type Table struct {
|
|||
bonding map[NodeID]*bondproc
|
||||
bondslots chan struct{} // limits total number of active bonding processes
|
||||
|
||||
nodeAddedHook func(*Node) // for testing
|
||||
nodeAddedHook func(*bucket, *Node)
|
||||
nodeRemovedHook func(*bucket, *Node)
|
||||
|
||||
net transport
|
||||
self *Node // metadata of the local node
|
||||
|
|
@ -114,6 +116,7 @@ type bucket struct {
|
|||
entries []*Node // live entries, sorted by time of last contact
|
||||
replacements []*Node // recently seen nodes to be used if revalidation fails
|
||||
ips netutil.DistinctNetSet
|
||||
index int
|
||||
}
|
||||
|
||||
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) {
|
||||
|
|
@ -145,7 +148,8 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string
|
|||
}
|
||||
for i := range tab.buckets {
|
||||
tab.buckets[i] = &bucket{
|
||||
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
|
||||
index: i,
|
||||
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
|
||||
}
|
||||
}
|
||||
tab.seedRand()
|
||||
|
|
@ -158,6 +162,22 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string
|
|||
return tab, nil
|
||||
}
|
||||
|
||||
func newMeteredTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string, bootnodes []*Node) (*Table, error) {
|
||||
tab, err := newTable(t, ourID, ourAddr, nodeDBPath, bootnodes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if metrics.Enabled() {
|
||||
tab.nodeAddedHook = func(b *bucket, n *Node) {
|
||||
bucketsCounter[b.index].Inc(1)
|
||||
}
|
||||
tab.nodeRemovedHook = func(b *bucket, n *Node) {
|
||||
bucketsCounter[b.index].Dec(1)
|
||||
}
|
||||
}
|
||||
return tab, nil
|
||||
}
|
||||
|
||||
func (tab *Table) seedRand() {
|
||||
var b [8]byte
|
||||
crand.Read(b[:])
|
||||
|
|
@ -814,14 +834,31 @@ func (tab *Table) bumpOrAdd(b *bucket, n *Node) bool {
|
|||
b.replacements = deleteNode(b.replacements, n)
|
||||
n.addedAt = time.Now()
|
||||
if tab.nodeAddedHook != nil {
|
||||
tab.nodeAddedHook(n)
|
||||
tab.nodeAddedHook(b, n)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (tab *Table) deleteInBucket(b *bucket, n *Node) {
|
||||
// Check if the node is actually in the bucket so the removed hook
|
||||
// isn't called multiple times for the same node.
|
||||
if !contains(b.entries, n.ID) {
|
||||
return
|
||||
}
|
||||
b.entries = deleteNode(b.entries, n)
|
||||
tab.removeIP(b, n.IP)
|
||||
if tab.nodeRemovedHook != nil {
|
||||
tab.nodeRemovedHook(b, n)
|
||||
}
|
||||
}
|
||||
|
||||
func contains(ns []*Node, id NodeID) bool {
|
||||
for _, n := range ns {
|
||||
if n.ID == id {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// pushNode adds n to the front of list, keeping at most max items.
|
||||
|
|
|
|||
|
|
@ -641,15 +641,6 @@ func sortedByDistanceTo(distbase common.Hash, slice []*Node) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func contains(ns []*Node, id NodeID) bool {
|
||||
for _, n := range ns {
|
||||
if n.ID == id {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// gen wraps quick.Value so it's easier to use.
|
||||
// it generates a random value of the given value's type.
|
||||
func gen(typ interface{}, rand *rand.Rand) interface{} {
|
||||
|
|
|
|||
|
|
@ -258,7 +258,7 @@ func newUDP(c conn, cfg Config) (*Table, *udp, error) {
|
|||
}
|
||||
// TODO: separate TCP port
|
||||
udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
|
||||
tab, err := newTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes)
|
||||
tab, err := newMeteredTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -325,7 +325,7 @@ func TestUDP_findnodeMultiReply(t *testing.T) {
|
|||
func TestUDP_successfulPing(t *testing.T) {
|
||||
test := newUDPTest(t)
|
||||
added := make(chan *Node, 1)
|
||||
test.table.nodeAddedHook = func(n *Node) { added <- n }
|
||||
test.table.nodeAddedHook = func(b *bucket, n *Node) { added <- n }
|
||||
defer test.table.Close()
|
||||
|
||||
// The remote side sends a ping packet to initiate the exchange.
|
||||
|
|
|
|||
|
|
@ -19,18 +19,61 @@
|
|||
package p2p
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
ingressConnectMeter = metrics.NewRegisteredMeter("p2p/InboundConnects", nil)
|
||||
ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/InboundTraffic", nil)
|
||||
egressConnectMeter = metrics.NewRegisteredMeter("p2p/OutboundConnects", nil)
|
||||
egressTrafficMeter = metrics.NewRegisteredMeter("p2p/OutboundTraffic", nil)
|
||||
)
|
||||
|
||||
var (
|
||||
activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil)
|
||||
|
||||
serveMeter = metrics.NewRegisteredMeter("p2p/serves", nil)
|
||||
serveSuccessMeter = metrics.NewRegisteredMeter("p2p/serves/success", nil)
|
||||
dialMeter = metrics.NewRegisteredMeter("p2p/dials", nil)
|
||||
dialSuccessMeter = metrics.NewRegisteredMeter("p2p/dials/success", nil)
|
||||
dialConnectionError = metrics.NewRegisteredMeter("p2p/dials/error/connection", nil)
|
||||
|
||||
// handshake error meters
|
||||
dialTooManyPeers = metrics.NewRegisteredMeter("p2p/dials/error/saturated", nil)
|
||||
dialAlreadyConnected = metrics.NewRegisteredMeter("p2p/dials/error/known", nil)
|
||||
dialSelf = metrics.NewRegisteredMeter("p2p/dials/error/self", nil)
|
||||
dialUselessPeer = metrics.NewRegisteredMeter("p2p/dials/error/useless", nil)
|
||||
dialUnexpectedIdentity = metrics.NewRegisteredMeter("p2p/dials/error/id/unexpected", nil)
|
||||
dialEncHandshakeError = metrics.NewRegisteredMeter("p2p/dials/error/rlpx/enc", nil)
|
||||
dialProtoHandshakeError = metrics.NewRegisteredMeter("p2p/dials/error/rlpx/proto", nil)
|
||||
)
|
||||
|
||||
func markDialError(err error) {
|
||||
if !metrics.Enabled() {
|
||||
return
|
||||
}
|
||||
if err2 := errors.Unwrap(err); err2 != nil {
|
||||
err = err2
|
||||
}
|
||||
switch err {
|
||||
case DiscTooManyPeers:
|
||||
dialTooManyPeers.Mark(1)
|
||||
case DiscAlreadyConnected:
|
||||
dialAlreadyConnected.Mark(1)
|
||||
case DiscSelf:
|
||||
dialSelf.Mark(1)
|
||||
case DiscUselessPeer:
|
||||
dialUselessPeer.Mark(1)
|
||||
case DiscUnexpectedIdentity:
|
||||
dialUnexpectedIdentity.Mark(1)
|
||||
case errEncHandshakeError:
|
||||
dialEncHandshakeError.Mark(1)
|
||||
case errProtoHandshakeError:
|
||||
dialProtoHandshakeError.Mark(1)
|
||||
}
|
||||
}
|
||||
|
||||
// meteredConn is a wrapper around a network TCP connection that meters both the
|
||||
// inbound and outbound network traffic.
|
||||
type meteredConn struct {
|
||||
|
|
@ -40,17 +83,11 @@ type meteredConn struct {
|
|||
// newMeteredConn creates a new metered connection, also bumping the ingress or
|
||||
// egress connection meter. If the metrics system is disabled, this function
|
||||
// returns the original object.
|
||||
func newMeteredConn(conn net.Conn, ingress bool) net.Conn {
|
||||
func newMeteredConn(conn net.Conn) net.Conn {
|
||||
// Short circuit if metrics are disabled
|
||||
if !metrics.Enabled() {
|
||||
return conn
|
||||
}
|
||||
// Otherwise bump the connection counters and wrap the connection
|
||||
if ingress {
|
||||
ingressConnectMeter.Mark(1)
|
||||
} else {
|
||||
egressConnectMeter.Mark(1)
|
||||
}
|
||||
return &meteredConn{conn.(*net.TCPConn)}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,11 @@ const (
|
|||
frameWriteTimeout = 20 * time.Second
|
||||
)
|
||||
|
||||
var errServerStopped = errors.New("server stopped")
|
||||
var (
|
||||
errServerStopped = errors.New("server stopped")
|
||||
errEncHandshakeError = errors.New("rlpx enc error")
|
||||
errProtoHandshakeError = errors.New("rlpx proto error")
|
||||
)
|
||||
|
||||
// Config holds Server options.
|
||||
type Config struct {
|
||||
|
|
@ -702,7 +706,11 @@ running:
|
|||
}
|
||||
if p.Inbound() {
|
||||
inboundCount++
|
||||
serveSuccessMeter.Mark(1)
|
||||
} else {
|
||||
dialSuccessMeter.Mark(1)
|
||||
}
|
||||
activePeerGauge.Inc(1)
|
||||
}
|
||||
// The dialer logic relies on the assumption that
|
||||
// dial tasks complete after the peer has been added or
|
||||
|
|
@ -720,6 +728,7 @@ running:
|
|||
if pd.Inbound() {
|
||||
inboundCount--
|
||||
}
|
||||
activePeerGauge.Dec(1)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -839,7 +848,8 @@ func (srv *Server) listenLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
fd = newMeteredConn(fd, true)
|
||||
fd = newMeteredConn(fd)
|
||||
serveMeter.Mark(1)
|
||||
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
|
||||
go func() {
|
||||
srv.SetupConn(fd, inboundConn, nil)
|
||||
|
|
@ -859,6 +869,9 @@ func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Nod
|
|||
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
|
||||
err := srv.setupConn(c, flags, dialDest)
|
||||
if err != nil {
|
||||
if !c.is(inboundConn) {
|
||||
markDialError(err)
|
||||
}
|
||||
c.close(err)
|
||||
srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ package p2p
|
|||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"net"
|
||||
"reflect"
|
||||
|
|
@ -506,10 +505,10 @@ func TestServerSetupConn(t *testing.T) {
|
|||
wantCloseErr: errServerStopped,
|
||||
},
|
||||
{
|
||||
tt: &setupTransport{id: id, encHandshakeErr: errors.New("read error")},
|
||||
tt: &setupTransport{id: id, encHandshakeErr: errEncHandshakeError},
|
||||
flags: inboundConn,
|
||||
wantCalls: "doEncHandshake,close,",
|
||||
wantCloseErr: errors.New("read error"),
|
||||
wantCloseErr: errEncHandshakeError,
|
||||
},
|
||||
{
|
||||
tt: &setupTransport{id: id},
|
||||
|
|
@ -526,11 +525,11 @@ func TestServerSetupConn(t *testing.T) {
|
|||
wantCloseErr: DiscUnexpectedIdentity,
|
||||
},
|
||||
{
|
||||
tt: &setupTransport{id: id, protoHandshakeErr: errors.New("foo")},
|
||||
tt: &setupTransport{id: id, protoHandshakeErr: errProtoHandshakeError},
|
||||
dialDest: &discover.Node{ID: id},
|
||||
flags: dynDialedConn,
|
||||
wantCalls: "doEncHandshake,doProtoHandshake,close,",
|
||||
wantCloseErr: errors.New("foo"),
|
||||
wantCloseErr: errProtoHandshakeError,
|
||||
},
|
||||
{
|
||||
tt: &setupTransport{id: srvid, phs: &protoHandshake{ID: srvid}},
|
||||
|
|
|
|||
Loading…
Reference in a new issue