mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-01 12:38:40 +00:00
add fallback mechanism for availability failure
This commit is contained in:
parent
226fbf6d44
commit
ff731ce79c
3 changed files with 34 additions and 10 deletions
|
|
@ -364,24 +364,40 @@ func (f *BlobFetcher) loop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-waitTrigger:
|
case <-waitTrigger:
|
||||||
// At least one transaction's waiting time ran out, pop all expired ones
|
// At least one transaction's waiting time ran out. Instead of dropping,
|
||||||
// and update the blobpool according to availability
|
// convert timed-out partial fetches to full fetches so we don't lose
|
||||||
|
// the transaction. All peers in the waitlist announced full custody
|
||||||
|
// (that was the entry condition), so they can serve as full fetch sources.
|
||||||
|
reschedule := make(map[string]struct{})
|
||||||
for hash, instance := range f.waittime {
|
for hash, instance := range f.waittime {
|
||||||
if time.Duration(f.clock.Now()-instance)+txGatherSlack > blobAvailabilityTimeout {
|
if time.Duration(f.clock.Now()-instance)+txGatherSlack > blobAvailabilityTimeout {
|
||||||
// No need to check availability count (transactions that passed
|
// partial -> full conversion
|
||||||
// the threshold are already promoted to the announces map on notification)
|
delete(f.partial, hash)
|
||||||
|
f.full[hash] = struct{}{}
|
||||||
|
blobAnnounceTimeoutMeter.Mark(1)
|
||||||
|
|
||||||
for peer := range f.waitlist[hash] {
|
for peer := range f.waitlist[hash] {
|
||||||
|
if f.announces[peer] == nil {
|
||||||
|
f.announces[peer] = make(map[common.Hash]*cellWithSeq)
|
||||||
|
}
|
||||||
|
f.announces[peer][hash] = &cellWithSeq{
|
||||||
|
cells: types.CustodyBitmapData,
|
||||||
|
seq: f.txSeq,
|
||||||
|
}
|
||||||
|
f.txSeq++
|
||||||
delete(f.waitslots[peer], hash)
|
delete(f.waitslots[peer], hash)
|
||||||
if len(f.waitslots[peer]) == 0 {
|
if len(f.waitslots[peer]) == 0 {
|
||||||
delete(f.waitslots, peer)
|
delete(f.waitslots, peer)
|
||||||
}
|
}
|
||||||
|
reschedule[peer] = struct{}{}
|
||||||
}
|
}
|
||||||
delete(f.waittime, hash)
|
delete(f.waittime, hash)
|
||||||
delete(f.waitlist, hash)
|
delete(f.waitlist, hash)
|
||||||
blobAnnounceTimeoutMeter.Mark(1)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if len(reschedule) > 0 {
|
||||||
|
f.scheduleFetches(timeoutTimer, timeoutTrigger, reschedule)
|
||||||
|
}
|
||||||
// If transactions are still waiting for availability, reschedule the wait timer
|
// If transactions are still waiting for availability, reschedule the wait timer
|
||||||
if len(f.waittime) > 0 {
|
if len(f.waittime) > 0 {
|
||||||
f.rescheduleWait(waitTimer, waitTrigger)
|
f.rescheduleWait(waitTimer, waitTrigger)
|
||||||
|
|
|
||||||
|
|
@ -527,12 +527,20 @@ func TestBlobFetcherAvailabilityTimeout(t *testing.T) {
|
||||||
isWaitingAvailability{testBlobTxHashes[0]: map[string]struct{}{"A": {}}},
|
isWaitingAvailability{testBlobTxHashes[0]: map[string]struct{}{"A": {}}},
|
||||||
isBlobScheduled{announces: nil, fetching: nil},
|
isBlobScheduled{announces: nil, fetching: nil},
|
||||||
|
|
||||||
// Run clock for timeout
|
// Run clock for timeout → partial converts to full, peer A moves to announces
|
||||||
doWait{time: blobAvailabilityTimeout, step: true},
|
doWait{time: blobAvailabilityTimeout, step: true},
|
||||||
|
|
||||||
// After timeout, waitlist should be empty
|
// After timeout, waitlist should be empty but tx promoted to full fetch
|
||||||
isWaitingAvailability{},
|
isWaitingAvailability{},
|
||||||
isBlobScheduled{announces: nil, fetching: nil},
|
isDecidedFull{testBlobTxHashes[0]: struct{}{}},
|
||||||
|
isBlobScheduled{
|
||||||
|
announces: map[string][]blobAnnounce{
|
||||||
|
"A": {{hash: testBlobTxHashes[0], custody: halfCustody}},
|
||||||
|
},
|
||||||
|
fetching: map[string][]blobAnnounce{
|
||||||
|
"A": {{hash: testBlobTxHashes[0], custody: halfCustody}},
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -60,7 +60,7 @@ var (
|
||||||
|
|
||||||
blobAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/announces/in", nil)
|
blobAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/announces/in", nil)
|
||||||
blobAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/announces/dos", nil)
|
blobAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/announces/dos", nil)
|
||||||
// This metric is to track the number of availability failure
|
// This metric tracks partial→full conversions due to availability timeout
|
||||||
blobAnnounceTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/announces/timeout", nil)
|
blobAnnounceTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/announces/timeout", nil)
|
||||||
|
|
||||||
blobRequestOutMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/request/out", nil)
|
blobRequestOutMeter = metrics.NewRegisteredMeter("eth/fetcher/blob/request/out", nil)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue