diff --git a/eth/fetcher/blob_fetcher.go b/eth/fetcher/blob_fetcher.go index 6f030231bf..7b01d4beca 100644 --- a/eth/fetcher/blob_fetcher.go +++ b/eth/fetcher/blob_fetcher.go @@ -103,18 +103,18 @@ type BlobFetcher struct { full map[common.Hash]struct{} partial map[common.Hash]struct{} - // Buffer 1: Set of blob txs whose blob data is waiting for availability confirmation (not pull decision) + // Buffer 1: Set of blob txs whose blob data is waiting for availability confirmation (partial fetch) waitlist map[common.Hash]map[string]struct{} // Peer set that announced blob availability waittime map[common.Hash]mclock.AbsTime // Timestamp when added to waitlist waitslots map[string]map[common.Hash]struct{} // Waiting announcements grouped by peer (DoS protection) // waitSlots should also include announcements with partial cells - // Buffer 2: Transactions queued for fetching (pull decision + not pull decision) + // Buffer 2: Transactions queued for fetching (full fetch + partial fetch) // "announces" is shared with stage 3, for DoS protection announces map[string]map[common.Hash]*cellWithSeq // Set of announced transactions, grouped by origin peer // Buffer 2 - // Stage 3: Transactions whose payloads/cells are currently being fetched (pull decision + not pull decision) + // Stage 3: Transactions whose payloads/cells are currently being fetched (full fetch + partial fetch) fetches map[common.Hash]*fetchStatus // Hash -> Bitmap, in-flight transaction cells requests map[string][]*cellRequest // In-flight transaction retrievals // todo simplify @@ -219,7 +219,6 @@ func (f *BlobFetcher) loop() { case ann := <-f.notify: // Drop part of the announcements if too many have accumulated from that peer // This prevents a peer from dominating the queue with txs without responding to the request - // todo maxPayloadAnnounces -> according to the number of blobs used := len(f.waitslots[ann.origin]) + len(f.announces[ann.origin]) if used >= maxPayloadAnnounces { // Already full @@ -289,6 +288,7 @@ func (f *BlobFetcher) loop() { // 2) Decided to send partial request of the tx if f.waitlist[hash] != nil { if ann.cells != *types.CustodyBitmapAll { + // Availability check is only meaningful with full availability announcements continue } // Transaction is at the stage of availability check @@ -302,6 +302,7 @@ func (f *BlobFetcher) loop() { } } if len(f.waitlist[hash]) >= availabilityThreshold { + // Passed availability check, move to fetching stage for peer := range f.waitlist[hash] { if f.announces[peer] == nil { f.announces[peer] = make(map[common.Hash]*cellWithSeq) @@ -317,14 +318,16 @@ func (f *BlobFetcher) loop() { reschedule[peer] = struct{}{} } delete(f.waitlist, hash) + //todo delete(f.waittime, hash) } continue } if ann.cells.Intersection(f.custody).OneCount() == 0 { - // No custody overlapping + // If there's no custody overlapping in ann, it can be ignored continue } - + // Add this peer as a possible fetch source + // todo: Did we remove fetch from partial if f.announces[ann.origin] == nil { f.announces[ann.origin] = make(map[common.Hash]*cellWithSeq) } @@ -343,7 +346,6 @@ func (f *BlobFetcher) loop() { // If this is a new peer and that peer sent transaction with payload flag, // schedule transaction fetches from it - //todo if !oldPeer && len(f.announces[ann.origin]) > 0 { f.scheduleFetches(timeoutTimer, timeoutTrigger, reschedule) } @@ -351,10 +353,10 @@ func (f *BlobFetcher) loop() { case <-waitTrigger: // At least one transaction's waiting time ran out, pop all expired ones // and update the blobpool according to availability - // Availability failure case for hash, instance := range f.waittime { if time.Duration(f.clock.Now()-instance)+txGatherSlack > blobAvailabilityTimeout { - // Check if enough peers have announced availability + // No need to check availability count (transactions that passed + // the threshold are already promoted to the announces map on notification) for peer := range f.waitlist[hash] { delete(f.waitslots[peer], hash) if len(f.waitslots[peer]) == 0 { @@ -660,29 +662,29 @@ func (f *BlobFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{} custodies = make([]*types.CustodyBitmap, 0, maxTxRetrievals) ) f.forEachAnnounce(f.announces[peer], func(hash common.Hash, cells *types.CustodyBitmap) bool { - var difference *types.CustodyBitmap + var unfetched *types.CustodyBitmap if f.fetches[hash] == nil { // tx is not being fetched - difference = cells + unfetched = cells } else { - difference = cells.Difference(f.fetches[hash].fetching) + unfetched = cells.Difference(f.fetches[hash].fetching) } - // Mark fetching for differences - if difference.OneCount() != 0 { + // Mark fetching for unfetched cells + if unfetched.OneCount() > 0 { if f.fetches[hash] == nil { f.fetches[hash] = &fetchStatus{ - fetching: difference, + fetching: unfetched, fetched: make([]uint64, 0), cells: make([]kzg4844.Cell, 0), } } else { - f.fetches[hash].fetching = f.fetches[hash].fetching.Union(difference) + f.fetches[hash].fetching = f.fetches[hash].fetching.Union(unfetched) } // Accumulate the hash and stop if the limit was reached hashes = append(hashes, hash) - custodies = append(custodies, difference) + custodies = append(custodies, unfetched) } // Mark alternatives