better comments for blob fetcher

This commit is contained in:
healthykim 2026-03-19 13:00:42 +09:00
parent e84b6eb568
commit c4ea820d27

View file

@ -103,18 +103,18 @@ type BlobFetcher struct {
full map[common.Hash]struct{} full map[common.Hash]struct{}
partial map[common.Hash]struct{} partial map[common.Hash]struct{}
// Buffer 1: Set of blob txs whose blob data is waiting for availability confirmation (not pull decision) // Buffer 1: Set of blob txs whose blob data is waiting for availability confirmation (partial fetch)
waitlist map[common.Hash]map[string]struct{} // Peer set that announced blob availability waitlist map[common.Hash]map[string]struct{} // Peer set that announced blob availability
waittime map[common.Hash]mclock.AbsTime // Timestamp when added to waitlist waittime map[common.Hash]mclock.AbsTime // Timestamp when added to waitlist
waitslots map[string]map[common.Hash]struct{} // Waiting announcements grouped by peer (DoS protection) waitslots map[string]map[common.Hash]struct{} // Waiting announcements grouped by peer (DoS protection)
// waitSlots should also include announcements with partial cells // waitSlots should also include announcements with partial cells
// Buffer 2: Transactions queued for fetching (pull decision + not pull decision) // Buffer 2: Transactions queued for fetching (full fetch + partial fetch)
// "announces" is shared with stage 3, for DoS protection // "announces" is shared with stage 3, for DoS protection
announces map[string]map[common.Hash]*cellWithSeq // Set of announced transactions, grouped by origin peer announces map[string]map[common.Hash]*cellWithSeq // Set of announced transactions, grouped by origin peer
// Buffer 2 // Buffer 2
// Stage 3: Transactions whose payloads/cells are currently being fetched (pull decision + not pull decision) // Stage 3: Transactions whose payloads/cells are currently being fetched (full fetch + partial fetch)
fetches map[common.Hash]*fetchStatus // Hash -> Bitmap, in-flight transaction cells fetches map[common.Hash]*fetchStatus // Hash -> Bitmap, in-flight transaction cells
requests map[string][]*cellRequest // In-flight transaction retrievals requests map[string][]*cellRequest // In-flight transaction retrievals
// todo simplify // todo simplify
@ -219,7 +219,6 @@ func (f *BlobFetcher) loop() {
case ann := <-f.notify: case ann := <-f.notify:
// Drop part of the announcements if too many have accumulated from that peer // Drop part of the announcements if too many have accumulated from that peer
// This prevents a peer from dominating the queue with txs without responding to the request // This prevents a peer from dominating the queue with txs without responding to the request
// todo maxPayloadAnnounces -> according to the number of blobs
used := len(f.waitslots[ann.origin]) + len(f.announces[ann.origin]) used := len(f.waitslots[ann.origin]) + len(f.announces[ann.origin])
if used >= maxPayloadAnnounces { if used >= maxPayloadAnnounces {
// Already full // Already full
@ -289,6 +288,7 @@ func (f *BlobFetcher) loop() {
// 2) Decided to send partial request of the tx // 2) Decided to send partial request of the tx
if f.waitlist[hash] != nil { if f.waitlist[hash] != nil {
if ann.cells != *types.CustodyBitmapAll { if ann.cells != *types.CustodyBitmapAll {
// Availability check is only meaningful with full availability announcements
continue continue
} }
// Transaction is at the stage of availability check // Transaction is at the stage of availability check
@ -302,6 +302,7 @@ func (f *BlobFetcher) loop() {
} }
} }
if len(f.waitlist[hash]) >= availabilityThreshold { if len(f.waitlist[hash]) >= availabilityThreshold {
// Passed availability check, move to fetching stage
for peer := range f.waitlist[hash] { for peer := range f.waitlist[hash] {
if f.announces[peer] == nil { if f.announces[peer] == nil {
f.announces[peer] = make(map[common.Hash]*cellWithSeq) f.announces[peer] = make(map[common.Hash]*cellWithSeq)
@ -317,14 +318,16 @@ func (f *BlobFetcher) loop() {
reschedule[peer] = struct{}{} reschedule[peer] = struct{}{}
} }
delete(f.waitlist, hash) delete(f.waitlist, hash)
//todo delete(f.waittime, hash)
} }
continue continue
} }
if ann.cells.Intersection(f.custody).OneCount() == 0 { if ann.cells.Intersection(f.custody).OneCount() == 0 {
// No custody overlapping // If there's no custody overlapping in ann, it can be ignored
continue continue
} }
// Add this peer as a possible fetch source
// todo: Did we remove fetch from partial
if f.announces[ann.origin] == nil { if f.announces[ann.origin] == nil {
f.announces[ann.origin] = make(map[common.Hash]*cellWithSeq) f.announces[ann.origin] = make(map[common.Hash]*cellWithSeq)
} }
@ -343,7 +346,6 @@ func (f *BlobFetcher) loop() {
// If this is a new peer and that peer sent transaction with payload flag, // If this is a new peer and that peer sent transaction with payload flag,
// schedule transaction fetches from it // schedule transaction fetches from it
//todo
if !oldPeer && len(f.announces[ann.origin]) > 0 { if !oldPeer && len(f.announces[ann.origin]) > 0 {
f.scheduleFetches(timeoutTimer, timeoutTrigger, reschedule) f.scheduleFetches(timeoutTimer, timeoutTrigger, reschedule)
} }
@ -351,10 +353,10 @@ func (f *BlobFetcher) loop() {
case <-waitTrigger: case <-waitTrigger:
// At least one transaction's waiting time ran out, pop all expired ones // At least one transaction's waiting time ran out, pop all expired ones
// and update the blobpool according to availability // and update the blobpool according to availability
// Availability failure case
for hash, instance := range f.waittime { for hash, instance := range f.waittime {
if time.Duration(f.clock.Now()-instance)+txGatherSlack > blobAvailabilityTimeout { if time.Duration(f.clock.Now()-instance)+txGatherSlack > blobAvailabilityTimeout {
// Check if enough peers have announced availability // No need to check availability count (transactions that passed
// the threshold are already promoted to the announces map on notification)
for peer := range f.waitlist[hash] { for peer := range f.waitlist[hash] {
delete(f.waitslots[peer], hash) delete(f.waitslots[peer], hash)
if len(f.waitslots[peer]) == 0 { if len(f.waitslots[peer]) == 0 {
@ -660,29 +662,29 @@ func (f *BlobFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}
custodies = make([]*types.CustodyBitmap, 0, maxTxRetrievals) custodies = make([]*types.CustodyBitmap, 0, maxTxRetrievals)
) )
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, cells *types.CustodyBitmap) bool { f.forEachAnnounce(f.announces[peer], func(hash common.Hash, cells *types.CustodyBitmap) bool {
var difference *types.CustodyBitmap var unfetched *types.CustodyBitmap
if f.fetches[hash] == nil { if f.fetches[hash] == nil {
// tx is not being fetched // tx is not being fetched
difference = cells unfetched = cells
} else { } else {
difference = cells.Difference(f.fetches[hash].fetching) unfetched = cells.Difference(f.fetches[hash].fetching)
} }
// Mark fetching for differences // Mark fetching for unfetched cells
if difference.OneCount() != 0 { if unfetched.OneCount() > 0 {
if f.fetches[hash] == nil { if f.fetches[hash] == nil {
f.fetches[hash] = &fetchStatus{ f.fetches[hash] = &fetchStatus{
fetching: difference, fetching: unfetched,
fetched: make([]uint64, 0), fetched: make([]uint64, 0),
cells: make([]kzg4844.Cell, 0), cells: make([]kzg4844.Cell, 0),
} }
} else { } else {
f.fetches[hash].fetching = f.fetches[hash].fetching.Union(difference) f.fetches[hash].fetching = f.fetches[hash].fetching.Union(unfetched)
} }
// Accumulate the hash and stop if the limit was reached // Accumulate the hash and stop if the limit was reached
hashes = append(hashes, hash) hashes = append(hashes, hash)
custodies = append(custodies, difference) custodies = append(custodies, unfetched)
} }
// Mark alternatives // Mark alternatives