mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-22 06:34:32 +00:00
p2p: measure subprotocol bandwidth usage (#20133)
This commit is contained in:
parent
79e7aef66b
commit
2d33123b0a
4 changed files with 22 additions and 3 deletions
|
|
@ -38,9 +38,13 @@ import (
|
|||
// separate Msg with a bytes.Reader as Payload for each send.
|
||||
type Msg struct {
|
||||
Code uint64
|
||||
Size uint32 // size of the paylod
|
||||
Size uint32 // Size of the raw payload
|
||||
Payload io.Reader
|
||||
ReceivedAt time.Time
|
||||
|
||||
meterCap Cap // Protocol name and version for egress metering
|
||||
meterCode uint64 // Message within protocol for egress metering
|
||||
meterSize uint32 // Compressed message size for ingress metering
|
||||
}
|
||||
|
||||
// Decode parses the RLP content of a message into
|
||||
|
|
|
|||
|
|
@ -26,8 +26,10 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/InboundTraffic", nil)
|
||||
egressTrafficMeter = metrics.NewRegisteredMeter("p2p/OutboundTraffic", nil)
|
||||
MetricsInboundTraffic = "p2p/ingress" // Name for the registered inbound traffic meter
|
||||
MetricsOutboundTraffic = "p2p/egress" // Name for the registered outbound traffic meter
|
||||
ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/InboundTraffic", nil)
|
||||
egressTrafficMeter = metrics.NewRegisteredMeter("p2p/OutboundTraffic", nil)
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/XinFinOrg/XDPoSChain/common/mclock"
|
||||
"github.com/XinFinOrg/XDPoSChain/event"
|
||||
"github.com/XinFinOrg/XDPoSChain/log"
|
||||
"github.com/XinFinOrg/XDPoSChain/metrics"
|
||||
"github.com/XinFinOrg/XDPoSChain/p2p/discover"
|
||||
"github.com/XinFinOrg/XDPoSChain/rlp"
|
||||
)
|
||||
|
|
@ -301,6 +302,9 @@ func (p *Peer) handle(msg Msg) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("msg code out of range: %v", msg.Code)
|
||||
}
|
||||
if metrics.Enabled() {
|
||||
metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsInboundTraffic, proto.Name, proto.Version, msg.Code-proto.offset), nil).Mark(int64(msg.meterSize))
|
||||
}
|
||||
select {
|
||||
case proto.in <- msg:
|
||||
return nil
|
||||
|
|
@ -400,6 +404,9 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) {
|
|||
if msg.Code >= rw.Length {
|
||||
return newPeerError(errInvalidMsgCode, "not handled")
|
||||
}
|
||||
msg.meterCap = rw.cap()
|
||||
msg.meterCode = msg.Code
|
||||
|
||||
msg.Code += rw.offset
|
||||
select {
|
||||
case <-rw.wstart:
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import (
|
|||
|
||||
"github.com/XinFinOrg/XDPoSChain/crypto"
|
||||
"github.com/XinFinOrg/XDPoSChain/crypto/ecies"
|
||||
"github.com/XinFinOrg/XDPoSChain/metrics"
|
||||
"github.com/XinFinOrg/XDPoSChain/p2p/discover"
|
||||
"github.com/XinFinOrg/XDPoSChain/rlp"
|
||||
"github.com/golang/snappy"
|
||||
|
|
@ -610,6 +611,10 @@ func (rw *rlpxFrameRW) WriteMsg(msg Msg) error {
|
|||
msg.Payload = bytes.NewReader(payload)
|
||||
msg.Size = uint32(len(payload))
|
||||
}
|
||||
msg.meterSize = msg.Size
|
||||
if metrics.Enabled() && msg.meterCap.Name != "" { // don't meter non-subprotocol messages
|
||||
metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsOutboundTraffic, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode), nil).Mark(int64(msg.meterSize))
|
||||
}
|
||||
// write header
|
||||
headbuf := make([]byte, 32)
|
||||
fsize := uint32(len(ptype)) + msg.Size
|
||||
|
|
@ -694,6 +699,7 @@ func (rw *rlpxFrameRW) ReadMsg() (msg Msg, err error) {
|
|||
return msg, err
|
||||
}
|
||||
msg.Size = uint32(content.Len())
|
||||
msg.meterSize = msg.Size
|
||||
msg.Payload = content
|
||||
|
||||
// if snappy is enabled, verify and decompress message
|
||||
|
|
|
|||
Loading…
Reference in a new issue