diff --git a/p2p/message.go b/p2p/message.go index 58c159ee79..47194e3b52 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -38,9 +38,13 @@ import ( // separate Msg with a bytes.Reader as Payload for each send. type Msg struct { Code uint64 - Size uint32 // size of the paylod + Size uint32 // Size of the raw payload Payload io.Reader ReceivedAt time.Time + + meterCap Cap // Protocol name and version for egress metering + meterCode uint64 // Message within protocol for egress metering + meterSize uint32 // Compressed message size for ingress metering } // Decode parses the RLP content of a message into diff --git a/p2p/metrics.go b/p2p/metrics.go index 290a215659..8105e26567 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -26,8 +26,10 @@ import ( ) var ( - ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/InboundTraffic", nil) - egressTrafficMeter = metrics.NewRegisteredMeter("p2p/OutboundTraffic", nil) + MetricsInboundTraffic = "p2p/ingress" // Name for the registered inbound traffic meter + MetricsOutboundTraffic = "p2p/egress" // Name for the registered outbound traffic meter + ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/InboundTraffic", nil) + egressTrafficMeter = metrics.NewRegisteredMeter("p2p/OutboundTraffic", nil) ) var ( diff --git a/p2p/peer.go b/p2p/peer.go index ac6c464e7e..730c62a6b4 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -28,6 +28,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/common/mclock" "github.com/XinFinOrg/XDPoSChain/event" "github.com/XinFinOrg/XDPoSChain/log" + "github.com/XinFinOrg/XDPoSChain/metrics" "github.com/XinFinOrg/XDPoSChain/p2p/discover" "github.com/XinFinOrg/XDPoSChain/rlp" ) @@ -301,6 +302,9 @@ func (p *Peer) handle(msg Msg) error { if err != nil { return fmt.Errorf("msg code out of range: %v", msg.Code) } + if metrics.Enabled() { + metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsInboundTraffic, proto.Name, proto.Version, msg.Code-proto.offset), nil).Mark(int64(msg.meterSize)) + } select { case proto.in <- msg: return nil @@ -400,6 +404,9 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) { if msg.Code >= rw.Length { return newPeerError(errInvalidMsgCode, "not handled") } + msg.meterCap = rw.cap() + msg.meterCode = msg.Code + msg.Code += rw.offset select { case <-rw.wstart: diff --git a/p2p/rlpx.go b/p2p/rlpx.go index ad422d5ebd..bf966fe2ff 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx.go @@ -36,6 +36,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/crypto" "github.com/XinFinOrg/XDPoSChain/crypto/ecies" + "github.com/XinFinOrg/XDPoSChain/metrics" "github.com/XinFinOrg/XDPoSChain/p2p/discover" "github.com/XinFinOrg/XDPoSChain/rlp" "github.com/golang/snappy" @@ -610,6 +611,10 @@ func (rw *rlpxFrameRW) WriteMsg(msg Msg) error { msg.Payload = bytes.NewReader(payload) msg.Size = uint32(len(payload)) } + msg.meterSize = msg.Size + if metrics.Enabled() && msg.meterCap.Name != "" { // don't meter non-subprotocol messages + metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsOutboundTraffic, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode), nil).Mark(int64(msg.meterSize)) + } // write header headbuf := make([]byte, 32) fsize := uint32(len(ptype)) + msg.Size @@ -694,6 +699,7 @@ func (rw *rlpxFrameRW) ReadMsg() (msg Msg, err error) { return msg, err } msg.Size = uint32(content.Len()) + msg.meterSize = msg.Size msg.Payload = content // if snappy is enabled, verify and decompress message