From ff731ce79c27e090ad884f3ded9d476828dbae3e Mon Sep 17 00:00:00 2001 From: healthykim Date: Wed, 25 Mar 2026 15:07:29 +0900 Subject: [PATCH] add fallback mechanism for availability failure --- eth/fetcher/blob_fetcher.go | 28 ++++++++++++++++++++++------ eth/fetcher/blob_fetcher_test.go | 14 +++++++++++--- eth/fetcher/metrics.go | 2 +- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/eth/fetcher/blob_fetcher.go b/eth/fetcher/blob_fetcher.go index d8a2f205ea..f928a99517 100644 --- a/eth/fetcher/blob_fetcher.go +++ b/eth/fetcher/blob_fetcher.go @@ -364,24 +364,40 @@ func (f *BlobFetcher) loop() { } case <-waitTrigger: - // At least one transaction's waiting time ran out, pop all expired ones - // and update the blobpool according to availability + // At least one transaction's waiting time ran out. Instead of dropping, + // convert timed-out partial fetches to full fetches so we don't lose + // the transaction. All peers in the waitlist announced full custody + // (that was the entry condition), so they can serve as full fetch sources. + reschedule := make(map[string]struct{}) for hash, instance := range f.waittime { if time.Duration(f.clock.Now()-instance)+txGatherSlack > blobAvailabilityTimeout { - // No need to check availability count (transactions that passed - // the threshold are already promoted to the announces map on notification) + // partial -> full conversion + delete(f.partial, hash) + f.full[hash] = struct{}{} + blobAnnounceTimeoutMeter.Mark(1) + for peer := range f.waitlist[hash] { + if f.announces[peer] == nil { + f.announces[peer] = make(map[common.Hash]*cellWithSeq) + } + f.announces[peer][hash] = &cellWithSeq{ + cells: types.CustodyBitmapData, + seq: f.txSeq, + } + f.txSeq++ delete(f.waitslots[peer], hash) if len(f.waitslots[peer]) == 0 { delete(f.waitslots, peer) } + reschedule[peer] = struct{}{} } delete(f.waittime, hash) delete(f.waitlist, hash) - blobAnnounceTimeoutMeter.Mark(1) } } - + if len(reschedule) > 0 { + f.scheduleFetches(timeoutTimer, timeoutTrigger, reschedule) + } // If transactions are still waiting for availability, reschedule the wait timer if len(f.waittime) > 0 { f.rescheduleWait(waitTimer, waitTrigger) diff --git a/eth/fetcher/blob_fetcher_test.go b/eth/fetcher/blob_fetcher_test.go index c6e4400772..de11b30da3 100644 --- a/eth/fetcher/blob_fetcher_test.go +++ b/eth/fetcher/blob_fetcher_test.go @@ -527,12 +527,20 @@ func TestBlobFetcherAvailabilityTimeout(t *testing.T) { isWaitingAvailability{testBlobTxHashes[0]: map[string]struct{}{"A": {}}}, isBlobScheduled{announces: nil, fetching: nil}, - // Run clock for timeout + // Run clock for timeout → partial converts to full, peer A moves to announces doWait{time: blobAvailabilityTimeout, step: true}, - // After timeout, waitlist should be empty + // After timeout, waitlist should be empty but tx promoted to full fetch isWaitingAvailability{}, - isBlobScheduled{announces: nil, fetching: nil}, + isDecidedFull{testBlobTxHashes[0]: struct{}{}}, + isBlobScheduled{ + announces: map[string][]blobAnnounce{ + "A": {{hash: testBlobTxHashes[0], custody: halfCustody}}, + }, + fetching: map[string][]blobAnnounce{ + "A": {{hash: testBlobTxHashes[0], custody: halfCustody}}, + }, + }, }, }) } diff --git a/eth/fetcher/metrics.go b/eth/fetcher/metrics.go index 4eb4db0a99..1126fdc382 100644 --- a/eth/fetcher/metrics.go +++ b/eth/fetcher/metrics.go @@ -60,7 +60,7 @@ var ( blobAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/announces/in", nil) blobAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/announces/dos", nil) - // This metric is to track the number of availability failure + // This metric tracks partial→full conversions due to availability timeout blobAnnounceTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/announces/timeout", nil) blobRequestOutMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/request/out", nil)