diff --git a/eth/fetcher/blob_fetcher.go b/eth/fetcher/blob_fetcher.go index 05a05fd3fb..097bc5aa66 100644 --- a/eth/fetcher/blob_fetcher.go +++ b/eth/fetcher/blob_fetcher.go @@ -164,6 +164,7 @@ func NewBlobFetcher( // Notify is called when a Type 3 transaction is observed on the network. (TransactionPacket / NewPooledTransactionHashesPacket) func (f *BlobFetcher) Notify(peer string, txs []common.Hash, cells types.CustodyBitmap) error { + blobAnnounceInMeter.Mark(int64(len(txs))) blobAnnounce := &blobTxAnnounce{origin: peer, txs: txs, cells: cells} select { case f.notify <- blobAnnounce: @@ -196,6 +197,7 @@ func (f *BlobFetcher) Drop(peer string) error { } func (f *BlobFetcher) UpdateCustody(cells types.CustodyBitmap) { + // todo use lock or process inside of loop f.custody = &cells } @@ -221,13 +223,13 @@ func (f *BlobFetcher) loop() { // This prevents a peer from dominating the queue with txs without responding to the request used := len(f.waitslots[ann.origin]) + len(f.announces[ann.origin]) if used >= maxPayloadAnnounces { - // Already full + blobAnnounceDOSMeter.Mark(int64(len(ann.txs))) break } want := used + len(ann.txs) if want >= maxPayloadAnnounces { - // drop part of announcements + blobAnnounceDOSMeter.Mark(int64(want - maxPayloadAnnounces)) ann.txs = ann.txs[:maxPayloadAnnounces-used] } @@ -303,6 +305,7 @@ func (f *BlobFetcher) loop() { } if len(f.waitlist[hash]) >= availabilityThreshold { // Passed availability check, move to fetching stage + blobFetcherWaitTime.Update(int64(time.Duration(f.clock.Now() - f.waittime[hash]))) for peer := range f.waitlist[hash] { if f.announces[peer] == nil { f.announces[peer] = make(map[common.Hash]*cellWithSeq) @@ -381,7 +384,7 @@ func (f *BlobFetcher) loop() { newRequests := make([]*cellRequest, 0) for _, req := range requests { if time.Duration(f.clock.Now()-req.time)+txGatherSlack > blobFetchTimeout { - // Reschedule all timeout cells to alternate peers + blobRequestTimeoutMeter.Mark(int64(len(req.txs))) for _, hash := range req.txs { // Do not request the same tx from this peer delete(f.announces[peer], hash) @@ -472,6 +475,7 @@ func (f *BlobFetcher) loop() { } if completed { + blobFetcherFetchTime.Update(int64(time.Duration(f.clock.Now() - request.time))) addedHashes = append(addedHashes, hash) fetchStatus := f.fetches[hash] sort.Slice(fetchStatus.cells, func(i, j int) bool { @@ -491,6 +495,7 @@ func (f *BlobFetcher) loop() { } } // Update mempool status for arrived hashes + blobRequestDoneMeter.Mark(int64(len(delivery.txs))) if len(addedHashes) > 0 { f.addPayload(addedHashes, addedCells, delivery.cellBitmap) } @@ -585,6 +590,14 @@ func (f *BlobFetcher) loop() { case <-f.quit: return } + // Update metrics gauges + blobFetcherWaitingPeers.Update(int64(len(f.waitslots))) + blobFetcherWaitingHashes.Update(int64(len(f.waitlist))) + blobFetcherQueueingPeers.Update(int64(len(f.announces) - len(f.requests))) + blobFetcherQueueingHashes.Update(int64(len(f.announces))) + blobFetcherFetchingPeers.Update(int64(len(f.requests))) + blobFetcherFetchingHashes.Update(int64(len(f.fetches))) + // Loop did something, ping the step notifier if needed (tests) if f.step != nil { f.step <- struct{}{} @@ -722,7 +735,9 @@ func (f *BlobFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{} f.requests[peer] = request go func(peer string, request []*cellRequest) { for _, req := range request { + blobRequestOutMeter.Mark(int64(len(req.txs))) if err := f.fetchPayloads(peer, req.txs, req.cells); err != nil { + blobRequestFailMeter.Mark(int64(len(req.txs))) f.Drop(peer) break } diff --git a/eth/fetcher/metrics.go b/eth/fetcher/metrics.go index 306690c64b..a2317e7c64 100644 --- a/eth/fetcher/metrics.go +++ b/eth/fetcher/metrics.go @@ -66,8 +66,7 @@ var ( blobRequestDoneMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/request/done", nil) blobRequestTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/request/timeout", nil) - blobReplyInMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/replies/in", nil) - blobReplyRejectMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/replies/reject", nil) + blobReplyInMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/replies/in", nil) blobFetcherWaitingPeers = metrics.NewRegisteredGauge("eth/fetcher/blob/waiting/peers", nil) blobFetcherWaitingHashes = metrics.NewRegisteredGauge("eth/fetcher/blob/waiting/hashes", nil)