add metrics

This commit is contained in:
healthykim 2026-03-19 15:07:57 +09:00
parent cc1fde37a4
commit 5e9f1a019a
2 changed files with 19 additions and 5 deletions

View file

@ -164,6 +164,7 @@ func NewBlobFetcher(
// Notify is called when a Type 3 transaction is observed on the network. (TransactionPacket / NewPooledTransactionHashesPacket) // Notify is called when a Type 3 transaction is observed on the network. (TransactionPacket / NewPooledTransactionHashesPacket)
func (f *BlobFetcher) Notify(peer string, txs []common.Hash, cells types.CustodyBitmap) error { func (f *BlobFetcher) Notify(peer string, txs []common.Hash, cells types.CustodyBitmap) error {
blobAnnounceInMeter.Mark(int64(len(txs)))
blobAnnounce := &blobTxAnnounce{origin: peer, txs: txs, cells: cells} blobAnnounce := &blobTxAnnounce{origin: peer, txs: txs, cells: cells}
select { select {
case f.notify <- blobAnnounce: case f.notify <- blobAnnounce:
@ -196,6 +197,7 @@ func (f *BlobFetcher) Drop(peer string) error {
} }
func (f *BlobFetcher) UpdateCustody(cells types.CustodyBitmap) { func (f *BlobFetcher) UpdateCustody(cells types.CustodyBitmap) {
// todo use lock or process inside of loop
f.custody = &cells f.custody = &cells
} }
@ -221,13 +223,13 @@ func (f *BlobFetcher) loop() {
// This prevents a peer from dominating the queue with txs without responding to the request // This prevents a peer from dominating the queue with txs without responding to the request
used := len(f.waitslots[ann.origin]) + len(f.announces[ann.origin]) used := len(f.waitslots[ann.origin]) + len(f.announces[ann.origin])
if used >= maxPayloadAnnounces { if used >= maxPayloadAnnounces {
// Already full blobAnnounceDOSMeter.Mark(int64(len(ann.txs)))
break break
} }
want := used + len(ann.txs) want := used + len(ann.txs)
if want >= maxPayloadAnnounces { if want >= maxPayloadAnnounces {
// drop part of announcements blobAnnounceDOSMeter.Mark(int64(want - maxPayloadAnnounces))
ann.txs = ann.txs[:maxPayloadAnnounces-used] ann.txs = ann.txs[:maxPayloadAnnounces-used]
} }
@ -303,6 +305,7 @@ func (f *BlobFetcher) loop() {
} }
if len(f.waitlist[hash]) >= availabilityThreshold { if len(f.waitlist[hash]) >= availabilityThreshold {
// Passed availability check, move to fetching stage // Passed availability check, move to fetching stage
blobFetcherWaitTime.Update(int64(time.Duration(f.clock.Now() - f.waittime[hash])))
for peer := range f.waitlist[hash] { for peer := range f.waitlist[hash] {
if f.announces[peer] == nil { if f.announces[peer] == nil {
f.announces[peer] = make(map[common.Hash]*cellWithSeq) f.announces[peer] = make(map[common.Hash]*cellWithSeq)
@ -381,7 +384,7 @@ func (f *BlobFetcher) loop() {
newRequests := make([]*cellRequest, 0) newRequests := make([]*cellRequest, 0)
for _, req := range requests { for _, req := range requests {
if time.Duration(f.clock.Now()-req.time)+txGatherSlack > blobFetchTimeout { if time.Duration(f.clock.Now()-req.time)+txGatherSlack > blobFetchTimeout {
// Reschedule all timeout cells to alternate peers blobRequestTimeoutMeter.Mark(int64(len(req.txs)))
for _, hash := range req.txs { for _, hash := range req.txs {
// Do not request the same tx from this peer // Do not request the same tx from this peer
delete(f.announces[peer], hash) delete(f.announces[peer], hash)
@ -472,6 +475,7 @@ func (f *BlobFetcher) loop() {
} }
if completed { if completed {
blobFetcherFetchTime.Update(int64(time.Duration(f.clock.Now() - request.time)))
addedHashes = append(addedHashes, hash) addedHashes = append(addedHashes, hash)
fetchStatus := f.fetches[hash] fetchStatus := f.fetches[hash]
sort.Slice(fetchStatus.cells, func(i, j int) bool { sort.Slice(fetchStatus.cells, func(i, j int) bool {
@ -491,6 +495,7 @@ func (f *BlobFetcher) loop() {
} }
} }
// Update mempool status for arrived hashes // Update mempool status for arrived hashes
blobRequestDoneMeter.Mark(int64(len(delivery.txs)))
if len(addedHashes) > 0 { if len(addedHashes) > 0 {
f.addPayload(addedHashes, addedCells, delivery.cellBitmap) f.addPayload(addedHashes, addedCells, delivery.cellBitmap)
} }
@ -585,6 +590,14 @@ func (f *BlobFetcher) loop() {
case <-f.quit: case <-f.quit:
return return
} }
// Update metrics gauges
blobFetcherWaitingPeers.Update(int64(len(f.waitslots)))
blobFetcherWaitingHashes.Update(int64(len(f.waitlist)))
blobFetcherQueueingPeers.Update(int64(len(f.announces) - len(f.requests)))
blobFetcherQueueingHashes.Update(int64(len(f.announces)))
blobFetcherFetchingPeers.Update(int64(len(f.requests)))
blobFetcherFetchingHashes.Update(int64(len(f.fetches)))
// Loop did something, ping the step notifier if needed (tests) // Loop did something, ping the step notifier if needed (tests)
if f.step != nil { if f.step != nil {
f.step <- struct{}{} f.step <- struct{}{}
@ -722,7 +735,9 @@ func (f *BlobFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}
f.requests[peer] = request f.requests[peer] = request
go func(peer string, request []*cellRequest) { go func(peer string, request []*cellRequest) {
for _, req := range request { for _, req := range request {
blobRequestOutMeter.Mark(int64(len(req.txs)))
if err := f.fetchPayloads(peer, req.txs, req.cells); err != nil { if err := f.fetchPayloads(peer, req.txs, req.cells); err != nil {
blobRequestFailMeter.Mark(int64(len(req.txs)))
f.Drop(peer) f.Drop(peer)
break break
} }

View file

@ -66,8 +66,7 @@ var (
blobRequestDoneMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/request/done", nil) blobRequestDoneMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/request/done", nil)
blobRequestTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/request/timeout", nil) blobRequestTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/request/timeout", nil)
blobReplyInMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/replies/in", nil) blobReplyInMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/replies/in", nil)
blobReplyRejectMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/replies/reject", nil)
blobFetcherWaitingPeers = metrics.NewRegisteredGauge("eth/fetcher/blob/waiting/peers", nil) blobFetcherWaitingPeers = metrics.NewRegisteredGauge("eth/fetcher/blob/waiting/peers", nil)
blobFetcherWaitingHashes = metrics.NewRegisteredGauge("eth/fetcher/blob/waiting/hashes", nil) blobFetcherWaitingHashes = metrics.NewRegisteredGauge("eth/fetcher/blob/waiting/hashes", nil)