eth/fetcher: add per peer token bucket

This commit is contained in:
healthykim 2026-05-21 19:53:17 +02:00
parent 8f2f286ae0
commit c841457587

View file

@ -50,6 +50,14 @@ const (
maxPayloadAnnounces = 4096
fetchProbability = 15
MAX_CELLS_PER_PARTIAL_REQUEST = 8
// maxCellRequests caps the burst of cell requests we can issue at once
// to a single peer. Worst case 256 * 6 = 1536 cells (~3 MB)
maxCellRequests = 256
// refillInterval is the gap between token refill. Combined with
// maxCellRequests and 2-minute buffer timeout, a peer's worst case
// buffered cells is about 16 MB
refillInterval = time.Second / 9
)
type blobTxAnnounce struct {
@ -137,12 +145,21 @@ type BlobFetcher struct {
fn BlobFetcherFunctions // callbacks
// peerTokens tracks each peer's remaining cell request token.
peerTokens map[string]*token
step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Monotonic clock or simulated clock for tests
realTime func() time.Time // Real system time or simulated time for tests
rand random // Randomizer
}
// token is a per peer token bucket for outgoing cell requests.
type token struct {
amount int64
last mclock.AbsTime
}
func NewBlobFetcher(fn BlobFetcherFunctions, custody *types.CustodyBitmap, rand random) *BlobFetcher {
return &BlobFetcher{
notify: make(chan *blobTxAnnounce),
@ -158,6 +175,7 @@ func NewBlobFetcher(fn BlobFetcherFunctions, custody *types.CustodyBitmap, rand
fetches: make(map[common.Hash]*fetchStatus),
requests: make(map[string][]*cellRequest),
alternates: make(map[common.Hash]map[string]*types.CustodyBitmap),
peerTokens: make(map[string]*token),
fn: fn,
custody: custody,
clock: mclock.System{},
@ -574,6 +592,7 @@ func (f *BlobFetcher) loop() {
f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) // Partial delivery may enable others to deliver too
case drop := <-f.drop:
// A peer was dropped, remove all traces of it
delete(f.peerTokens, drop.peer)
if _, ok := f.waitslots[drop.peer]; ok {
for hash := range f.waitslots[drop.peer] {
delete(f.waitlist[hash], drop.peer)
@ -682,6 +701,34 @@ func (f *BlobFetcher) rescheduleTimeout(timer *mclock.Timer, trigger chan struct
trigger <- struct{}{}
})
}
// consumeToken consumes n tokens from peer's cell-request budget.
// It returns false if the remaining tokens cannot cover n.
func (f *BlobFetcher) consumeToken(peer string, n int) bool {
now := f.clock.Now()
b, ok := f.peerTokens[peer]
if !ok {
b = &token{amount: maxCellRequests, last: now}
f.peerTokens[peer] = b
} else {
// Here, fractional remaining elapsed time is left in b.last
// so that it can be carried over to the next call
elapsed := time.Duration(now - b.last)
if add := int64(elapsed / refillInterval); add > 0 {
b.amount += add
if b.amount > maxCellRequests {
b.amount = maxCellRequests
}
b.last += mclock.AbsTime(time.Duration(add) * refillInterval)
}
}
if b.amount < int64(n) {
return false
}
b.amount -= int64(n)
return true
}
func (f *BlobFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, whitelist map[string]struct{}) {
// Gather the set of peers we want to retrieve from (default to all)
actives := whitelist
@ -715,8 +762,10 @@ func (f *BlobFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}
unfetched = cells.Difference(f.fetches[hash].fetching)
}
// Mark fetching for unfetched cells
if unfetched.OneCount() > 0 {
// Mark fetching for unfetched cells if the peer has enough token.
// Otherwise, the next peer who announced the hash and has token will be selected
// in the next loop
if unfetched.OneCount() > 0 && f.consumeToken(peer, int(unfetched.OneCount())) {
if f.fetches[hash] == nil {
f.fetches[hash] = &fetchStatus{
fetching: unfetched,