diff --git a/eth/fetcher/blob_fetcher.go b/eth/fetcher/blob_fetcher.go index 6226b89820..1ba0c1c232 100644 --- a/eth/fetcher/blob_fetcher.go +++ b/eth/fetcher/blob_fetcher.go @@ -50,6 +50,14 @@ const ( maxPayloadAnnounces = 4096 fetchProbability = 15 MAX_CELLS_PER_PARTIAL_REQUEST = 8 + + // maxCellRequests caps the burst of cell requests we can issue at once + // to a single peer. Worst case 256 * 6 = 1536 cells (~3 MB) + maxCellRequests = 256 + // refillInterval is the gap between token refill. Combined with + // maxCellRequests and 2-minute buffer timeout, a peer's worst case + // buffered cells is about 16 MB + refillInterval = time.Second / 9 ) type blobTxAnnounce struct { @@ -137,12 +145,21 @@ type BlobFetcher struct { fn BlobFetcherFunctions // callbacks + // peerTokens tracks each peer's remaining cell request token. + peerTokens map[string]*token + step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Monotonic clock or simulated clock for tests realTime func() time.Time // Real system time or simulated time for tests rand random // Randomizer } +// token is a per peer token bucket for outgoing cell requests. +type token struct { + amount int64 + last mclock.AbsTime +} + func NewBlobFetcher(fn BlobFetcherFunctions, custody *types.CustodyBitmap, rand random) *BlobFetcher { return &BlobFetcher{ notify: make(chan *blobTxAnnounce), @@ -158,6 +175,7 @@ func NewBlobFetcher(fn BlobFetcherFunctions, custody *types.CustodyBitmap, rand fetches: make(map[common.Hash]*fetchStatus), requests: make(map[string][]*cellRequest), alternates: make(map[common.Hash]map[string]*types.CustodyBitmap), + peerTokens: make(map[string]*token), fn: fn, custody: custody, clock: mclock.System{}, @@ -574,6 +592,7 @@ func (f *BlobFetcher) loop() { f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) // Partial delivery may enable others to deliver too case drop := <-f.drop: // A peer was dropped, remove all traces of it + delete(f.peerTokens, drop.peer) if _, ok := f.waitslots[drop.peer]; ok { for hash := range f.waitslots[drop.peer] { delete(f.waitlist[hash], drop.peer) @@ -682,6 +701,34 @@ func (f *BlobFetcher) rescheduleTimeout(timer *mclock.Timer, trigger chan struct trigger <- struct{}{} }) } + +// consumeToken consumes n tokens from peer's cell-request budget. +// It returns false if the remaining tokens cannot cover n. +func (f *BlobFetcher) consumeToken(peer string, n int) bool { + now := f.clock.Now() + b, ok := f.peerTokens[peer] + if !ok { + b = &token{amount: maxCellRequests, last: now} + f.peerTokens[peer] = b + } else { + // Here, fractional remaining elapsed time is left in b.last + // so that it can be carried over to the next call + elapsed := time.Duration(now - b.last) + if add := int64(elapsed / refillInterval); add > 0 { + b.amount += add + if b.amount > maxCellRequests { + b.amount = maxCellRequests + } + b.last += mclock.AbsTime(time.Duration(add) * refillInterval) + } + } + if b.amount < int64(n) { + return false + } + b.amount -= int64(n) + return true +} + func (f *BlobFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, whitelist map[string]struct{}) { // Gather the set of peers we want to retrieve from (default to all) actives := whitelist @@ -715,8 +762,10 @@ func (f *BlobFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{} unfetched = cells.Difference(f.fetches[hash].fetching) } - // Mark fetching for unfetched cells - if unfetched.OneCount() > 0 { + // Mark fetching for unfetched cells if the peer has enough token. + // Otherwise, the next peer who announced the hash and has token will be selected + // in the next loop + if unfetched.OneCount() > 0 && f.consumeToken(peer, int(unfetched.OneCount())) { if f.fetches[hash] == nil { f.fetches[hash] = &fetchStatus{ fetching: unfetched,