mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 21:31:37 +00:00
metrics/influxdb: reuse code between v1 and v2 reporters (#26963)
This commit is contained in:
parent
9eae1243cd
commit
6e055a601d
5 changed files with 253 additions and 371 deletions
2
go.mod
2
go.mod
|
|
@ -18,7 +18,6 @@ require (
|
|||
github.com/hashicorp/golang-lru v0.5.4
|
||||
github.com/holiman/uint256 v1.2.4
|
||||
github.com/huin/goupnp v1.3.0
|
||||
github.com/influxdata/influxdb v1.7.9
|
||||
github.com/jackpal/go-nat-pmp v1.0.2
|
||||
github.com/julienschmidt/httprouter v1.3.0
|
||||
github.com/karalabe/hid v1.0.0
|
||||
|
|
@ -51,6 +50,7 @@ require (
|
|||
github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498
|
||||
github.com/ethereum/c-kzg-4844 v0.4.0
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.4.0
|
||||
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c
|
||||
github.com/kylelemons/godebug v1.1.0
|
||||
github.com/mattn/go-isatty v0.0.17
|
||||
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -99,10 +99,10 @@ github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXei
|
|||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
|
||||
github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
|
||||
github.com/influxdata/influxdb v1.7.9 h1:uSeBTNO4rBkbp1Be5FKRsAmglM9nlx25TzVQRQt1An4=
|
||||
github.com/influxdata/influxdb v1.7.9/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.4.0 h1:HGBfZYStlx3Kqvsv1h2pJixbCl/jhnFtxpKFAv9Tu5k=
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.4.0/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8=
|
||||
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSHzRbhzK8RdXOsAdfDgO49TtqC1oZ+acxPrkfTxcCs=
|
||||
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
|
||||
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
|
||||
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
|
||||
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
|
||||
|
|
|
|||
|
|
@ -2,242 +2,112 @@ package influxdb
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
uurl "net/url"
|
||||
"time"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/log"
|
||||
"github.com/XinFinOrg/XDPoSChain/metrics"
|
||||
"github.com/influxdata/influxdb/client"
|
||||
)
|
||||
|
||||
type reporter struct {
|
||||
reg metrics.Registry
|
||||
interval time.Duration
|
||||
|
||||
url uurl.URL
|
||||
database string
|
||||
username string
|
||||
password string
|
||||
namespace string
|
||||
tags map[string]string
|
||||
|
||||
client *client.Client
|
||||
|
||||
cache map[string]int64
|
||||
}
|
||||
|
||||
// InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
|
||||
func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
|
||||
InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
|
||||
}
|
||||
|
||||
// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
|
||||
func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
|
||||
u, err := uurl.Parse(url)
|
||||
if err != nil {
|
||||
log.Warn("unable to parse InfluxDB url %s. err=%v", url, err)
|
||||
return
|
||||
}
|
||||
|
||||
rep := &reporter{
|
||||
reg: r,
|
||||
interval: d,
|
||||
url: *u,
|
||||
database: database,
|
||||
username: username,
|
||||
password: password,
|
||||
namespace: namespace,
|
||||
tags: tags,
|
||||
cache: make(map[string]int64),
|
||||
}
|
||||
if err := rep.makeClient(); err != nil {
|
||||
log.Warn("unable to make InfluxDB client. err=%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
rep.run()
|
||||
}
|
||||
|
||||
func (r *reporter) makeClient() (err error) {
|
||||
r.client, err = client.NewClient(client.Config{
|
||||
URL: r.url,
|
||||
Username: r.username,
|
||||
Password: r.password,
|
||||
Timeout: 10 * time.Second,
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (r *reporter) run() {
|
||||
intervalTicker := time.NewTicker(r.interval)
|
||||
pingTicker := time.NewTicker(time.Second * 5)
|
||||
|
||||
defer intervalTicker.Stop()
|
||||
defer pingTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-intervalTicker.C:
|
||||
if err := r.send(); err != nil {
|
||||
log.Warn("unable to send to InfluxDB. err=%v", err)
|
||||
}
|
||||
case <-pingTicker.C:
|
||||
_, _, err := r.client.Ping()
|
||||
if err != nil {
|
||||
log.Warn("got error while sending a ping to InfluxDB, trying to recreate client. err=%v", err)
|
||||
|
||||
if err = r.makeClient(); err != nil {
|
||||
log.Warn("unable to make InfluxDB client. err=%v", err)
|
||||
}
|
||||
}
|
||||
func readMeter(namespace, name string, i interface{}) (string, map[string]interface{}) {
|
||||
switch metric := i.(type) {
|
||||
case metrics.Counter:
|
||||
measurement := fmt.Sprintf("%s%s.count", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"value": metric.Count(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *reporter) send() error {
|
||||
var pts []client.Point
|
||||
|
||||
r.reg.Each(func(name string, i interface{}) {
|
||||
now := time.Now()
|
||||
namespace := r.namespace
|
||||
|
||||
switch metric := i.(type) {
|
||||
case metrics.Counter:
|
||||
count := metric.Count()
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.count", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"value": count,
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
case metrics.CounterFloat64:
|
||||
count := metric.Count()
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.count", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"value": count,
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
case metrics.Gauge:
|
||||
ms := metric.Snapshot()
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"value": ms.Value(),
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
case metrics.GaugeFloat64:
|
||||
ms := metric.Snapshot()
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.gauge", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"value": ms.Value(),
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
case metrics.Histogram:
|
||||
ms := metric.Snapshot()
|
||||
if ms.Count() > 0 {
|
||||
ps := ms.Percentiles([]float64{0.25, 0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
|
||||
fields := map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"max": ms.Max(),
|
||||
"mean": ms.Mean(),
|
||||
"min": ms.Min(),
|
||||
"stddev": ms.StdDev(),
|
||||
"variance": ms.Variance(),
|
||||
"p25": ps[0],
|
||||
"p50": ps[1],
|
||||
"p75": ps[2],
|
||||
"p95": ps[3],
|
||||
"p99": ps[4],
|
||||
"p999": ps[5],
|
||||
"p9999": ps[6],
|
||||
}
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.histogram", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: fields,
|
||||
Time: now,
|
||||
})
|
||||
}
|
||||
case metrics.Meter:
|
||||
ms := metric.Snapshot()
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.meter", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"m1": ms.Rate1(),
|
||||
"m5": ms.Rate5(),
|
||||
"m15": ms.Rate15(),
|
||||
"mean": ms.RateMean(),
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
case metrics.Timer:
|
||||
ms := metric.Snapshot()
|
||||
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.timer", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"max": ms.Max(),
|
||||
"mean": ms.Mean(),
|
||||
"min": ms.Min(),
|
||||
"stddev": ms.StdDev(),
|
||||
"variance": ms.Variance(),
|
||||
"p50": ps[0],
|
||||
"p75": ps[1],
|
||||
"p95": ps[2],
|
||||
"p99": ps[3],
|
||||
"p999": ps[4],
|
||||
"p9999": ps[5],
|
||||
"m1": ms.Rate1(),
|
||||
"m5": ms.Rate5(),
|
||||
"m15": ms.Rate15(),
|
||||
"meanrate": ms.RateMean(),
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
case metrics.ResettingTimer:
|
||||
t := metric.Snapshot()
|
||||
|
||||
if len(t.Values()) > 0 {
|
||||
ps := t.Percentiles([]float64{50, 95, 99})
|
||||
val := t.Values()
|
||||
pts = append(pts, client.Point{
|
||||
Measurement: fmt.Sprintf("%s%s.span", namespace, name),
|
||||
Tags: r.tags,
|
||||
Fields: map[string]interface{}{
|
||||
"count": len(val),
|
||||
"max": val[len(val)-1],
|
||||
"mean": t.Mean(),
|
||||
"min": val[0],
|
||||
"p50": ps[0],
|
||||
"p95": ps[1],
|
||||
"p99": ps[2],
|
||||
},
|
||||
Time: now,
|
||||
})
|
||||
}
|
||||
return measurement, fields
|
||||
case metrics.CounterFloat64:
|
||||
measurement := fmt.Sprintf("%s%s.count", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"value": metric.Count(),
|
||||
}
|
||||
})
|
||||
return measurement, fields
|
||||
case metrics.Gauge:
|
||||
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"value": metric.Snapshot().Value(),
|
||||
}
|
||||
return measurement, fields
|
||||
case metrics.GaugeFloat64:
|
||||
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"value": metric.Snapshot().Value(),
|
||||
}
|
||||
return measurement, fields
|
||||
case metrics.Histogram:
|
||||
ms := metric.Snapshot()
|
||||
if ms.Count() <= 0 {
|
||||
break
|
||||
}
|
||||
ps := ms.Percentiles([]float64{0.25, 0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
|
||||
measurement := fmt.Sprintf("%s%s.histogram", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"max": ms.Max(),
|
||||
"mean": ms.Mean(),
|
||||
"min": ms.Min(),
|
||||
"stddev": ms.StdDev(),
|
||||
"variance": ms.Variance(),
|
||||
"p25": ps[0],
|
||||
"p50": ps[1],
|
||||
"p75": ps[2],
|
||||
"p95": ps[3],
|
||||
"p99": ps[4],
|
||||
"p999": ps[5],
|
||||
"p9999": ps[6],
|
||||
}
|
||||
return measurement, fields
|
||||
case metrics.Meter:
|
||||
ms := metric.Snapshot()
|
||||
measurement := fmt.Sprintf("%s%s.meter", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"m1": ms.Rate1(),
|
||||
"m5": ms.Rate5(),
|
||||
"m15": ms.Rate15(),
|
||||
"mean": ms.RateMean(),
|
||||
}
|
||||
return measurement, fields
|
||||
case metrics.Timer:
|
||||
ms := metric.Snapshot()
|
||||
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
|
||||
|
||||
bps := client.BatchPoints{
|
||||
Points: pts,
|
||||
Database: r.database,
|
||||
measurement := fmt.Sprintf("%s%s.timer", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"max": ms.Max(),
|
||||
"mean": ms.Mean(),
|
||||
"min": ms.Min(),
|
||||
"stddev": ms.StdDev(),
|
||||
"variance": ms.Variance(),
|
||||
"p50": ps[0],
|
||||
"p75": ps[1],
|
||||
"p95": ps[2],
|
||||
"p99": ps[3],
|
||||
"p999": ps[4],
|
||||
"p9999": ps[5],
|
||||
"m1": ms.Rate1(),
|
||||
"m5": ms.Rate5(),
|
||||
"m15": ms.Rate15(),
|
||||
"meanrate": ms.RateMean(),
|
||||
}
|
||||
return measurement, fields
|
||||
case metrics.ResettingTimer:
|
||||
t := metric.Snapshot()
|
||||
if len(t.Values()) == 0 {
|
||||
break
|
||||
}
|
||||
ps := t.Percentiles([]float64{50, 95, 99})
|
||||
val := t.Values()
|
||||
measurement := fmt.Sprintf("%s%s.span", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"count": len(val),
|
||||
"max": val[len(val)-1],
|
||||
"mean": t.Mean(),
|
||||
"min": val[0],
|
||||
"p50": ps[0],
|
||||
"p95": ps[1],
|
||||
"p99": ps[2],
|
||||
}
|
||||
return measurement, fields
|
||||
}
|
||||
|
||||
_, err := r.client.Write(bps)
|
||||
return err
|
||||
return "", nil
|
||||
}
|
||||
|
|
|
|||
145
metrics/influxdb/influxdbv1.go
Normal file
145
metrics/influxdb/influxdbv1.go
Normal file
|
|
@ -0,0 +1,145 @@
|
|||
package influxdb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
uurl "net/url"
|
||||
"time"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/log"
|
||||
"github.com/XinFinOrg/XDPoSChain/metrics"
|
||||
client "github.com/influxdata/influxdb1-client/v2"
|
||||
)
|
||||
|
||||
type reporter struct {
|
||||
reg metrics.Registry
|
||||
interval time.Duration
|
||||
|
||||
url uurl.URL
|
||||
database string
|
||||
username string
|
||||
password string
|
||||
namespace string
|
||||
tags map[string]string
|
||||
|
||||
client client.Client
|
||||
|
||||
cache map[string]int64
|
||||
}
|
||||
|
||||
// InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval.
|
||||
func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) {
|
||||
InfluxDBWithTags(r, d, url, database, username, password, namespace, nil)
|
||||
}
|
||||
|
||||
// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags
|
||||
func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) {
|
||||
u, err := uurl.Parse(url)
|
||||
if err != nil {
|
||||
log.Warn("Unable to parse InfluxDB", "url", url, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
rep := &reporter{
|
||||
reg: r,
|
||||
interval: d,
|
||||
url: *u,
|
||||
database: database,
|
||||
username: username,
|
||||
password: password,
|
||||
namespace: namespace,
|
||||
tags: tags,
|
||||
cache: make(map[string]int64),
|
||||
}
|
||||
if err := rep.makeClient(); err != nil {
|
||||
log.Warn("Unable to make InfluxDB client", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
rep.run()
|
||||
}
|
||||
|
||||
// InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags
|
||||
func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error {
|
||||
u, err := uurl.Parse(url)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to parse InfluxDB. url: %s, err: %v", url, err)
|
||||
}
|
||||
|
||||
rep := &reporter{
|
||||
reg: r,
|
||||
url: *u,
|
||||
database: database,
|
||||
username: username,
|
||||
password: password,
|
||||
namespace: namespace,
|
||||
tags: tags,
|
||||
cache: make(map[string]int64),
|
||||
}
|
||||
if err := rep.makeClient(); err != nil {
|
||||
return fmt.Errorf("unable to make InfluxDB client. err: %v", err)
|
||||
}
|
||||
|
||||
if err := rep.send(); err != nil {
|
||||
return fmt.Errorf("unable to send to InfluxDB. err: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *reporter) makeClient() (err error) {
|
||||
r.client, err = client.NewHTTPClient(client.HTTPConfig{
|
||||
Addr: r.url.String(),
|
||||
Username: r.username,
|
||||
Password: r.password,
|
||||
Timeout: 10 * time.Second,
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (r *reporter) run() {
|
||||
intervalTicker := time.NewTicker(r.interval)
|
||||
pingTicker := time.NewTicker(time.Second * 5)
|
||||
|
||||
defer intervalTicker.Stop()
|
||||
defer pingTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-intervalTicker.C:
|
||||
if err := r.send(); err != nil {
|
||||
log.Warn("Unable to send to InfluxDB", "err", err)
|
||||
}
|
||||
case <-pingTicker.C:
|
||||
_, _, err := r.client.Ping(0)
|
||||
if err != nil {
|
||||
log.Warn("Got error while sending a ping to InfluxDB, trying to recreate client", "err", err)
|
||||
|
||||
if err = r.makeClient(); err != nil {
|
||||
log.Warn("Unable to make InfluxDB client", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *reporter) send() error {
|
||||
bps, err := client.NewBatchPoints(
|
||||
client.BatchPointsConfig{
|
||||
Database: r.database,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.reg.Each(func(name string, i interface{}) {
|
||||
now := time.Now()
|
||||
measurement, fields := readMeter(r.namespace, name, i)
|
||||
if fields == nil {
|
||||
return
|
||||
}
|
||||
if p, err := client.NewPoint(measurement, r.tags, fields, now); err == nil {
|
||||
bps.AddPoint(p)
|
||||
}
|
||||
})
|
||||
return r.client.Write(bps)
|
||||
}
|
||||
|
|
@ -2,7 +2,6 @@ package influxdb
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/log"
|
||||
|
|
@ -78,145 +77,13 @@ func (r *v2Reporter) run() {
|
|||
func (r *v2Reporter) send() {
|
||||
r.reg.Each(func(name string, i interface{}) {
|
||||
now := time.Now()
|
||||
namespace := r.namespace
|
||||
|
||||
switch metric := i.(type) {
|
||||
|
||||
case metrics.Counter:
|
||||
v := metric.Count()
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.count", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"value": v,
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
|
||||
case metrics.CounterFloat64:
|
||||
v := metric.Count()
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.count", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"value": v,
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
|
||||
case metrics.Gauge:
|
||||
ms := metric.Snapshot()
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"value": ms.Value(),
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
|
||||
case metrics.GaugeFloat64:
|
||||
ms := metric.Snapshot()
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.gauge", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"value": ms.Value(),
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
|
||||
case metrics.Histogram:
|
||||
ms := metric.Snapshot()
|
||||
|
||||
if ms.Count() > 0 {
|
||||
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
|
||||
measurement := fmt.Sprintf("%s%s.histogram", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"max": ms.Max(),
|
||||
"mean": ms.Mean(),
|
||||
"min": ms.Min(),
|
||||
"stddev": ms.StdDev(),
|
||||
"variance": ms.Variance(),
|
||||
"p50": ps[0],
|
||||
"p75": ps[1],
|
||||
"p95": ps[2],
|
||||
"p99": ps[3],
|
||||
"p999": ps[4],
|
||||
"p9999": ps[5],
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
}
|
||||
|
||||
case metrics.Meter:
|
||||
ms := metric.Snapshot()
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.meter", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"m1": ms.Rate1(),
|
||||
"m5": ms.Rate5(),
|
||||
"m15": ms.Rate15(),
|
||||
"mean": ms.RateMean(),
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
|
||||
case metrics.Timer:
|
||||
ms := metric.Snapshot()
|
||||
ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999})
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.timer", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"count": ms.Count(),
|
||||
"max": ms.Max(),
|
||||
"mean": ms.Mean(),
|
||||
"min": ms.Min(),
|
||||
"stddev": ms.StdDev(),
|
||||
"variance": ms.Variance(),
|
||||
"p50": ps[0],
|
||||
"p75": ps[1],
|
||||
"p95": ps[2],
|
||||
"p99": ps[3],
|
||||
"p999": ps[4],
|
||||
"p9999": ps[5],
|
||||
"m1": ms.Rate1(),
|
||||
"m5": ms.Rate5(),
|
||||
"m15": ms.Rate15(),
|
||||
"meanrate": ms.RateMean(),
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
|
||||
case metrics.ResettingTimer:
|
||||
t := metric.Snapshot()
|
||||
|
||||
if len(t.Values()) > 0 {
|
||||
ps := t.Percentiles([]float64{50, 95, 99})
|
||||
val := t.Values()
|
||||
|
||||
measurement := fmt.Sprintf("%s%s.span", namespace, name)
|
||||
fields := map[string]interface{}{
|
||||
"count": len(val),
|
||||
"max": val[len(val)-1],
|
||||
"mean": t.Mean(),
|
||||
"min": val[0],
|
||||
"p50": ps[0],
|
||||
"p95": ps[1],
|
||||
"p99": ps[2],
|
||||
}
|
||||
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
}
|
||||
measurement, fields := readMeter(r.namespace, name, i)
|
||||
if fields == nil {
|
||||
return
|
||||
}
|
||||
pt := influxdb2.NewPoint(measurement, r.tags, fields, now)
|
||||
r.write.WritePoint(pt)
|
||||
})
|
||||
|
||||
// Force all unwritten data to be sent
|
||||
r.write.Flush()
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue