eth/protocols/snap: add stateless peer cooldown for partial state mode

The statelessPeers map permanently blacklists peers that return empty
responses for the entire Sync() cycle. In partial state mode, the faster
account advancement (due to skipping storage/code for non-tracked
contracts) creates bursty request patterns that can trigger transient
empty responses. Combined with the permanent blacklist, this causes a
cascade where all peers get banned and sync stalls permanently.

Replace the permanent map[string]struct{} with map[string]time.Time to
track when each peer was marked. For partial state mode, peers are given
a 30-second cooldown instead of permanent banishment. After the cooldown
expires, the peer is eligible for task assignment again. Full sync mode
behavior is unchanged (permanent blacklist preserved).
This commit is contained in:
CPerezz 2026-02-08 00:50:55 +01:00
parent bcb2a1bcd5
commit c6f49c4708
No known key found for this signature in database
GPG key ID: 62045F34B97177DD

View file

@ -97,6 +97,11 @@ const (
// batchSizeThreshold is the maximum size allowed for gentrie batch. // batchSizeThreshold is the maximum size allowed for gentrie batch.
batchSizeThreshold = 8 * 1024 * 1024 batchSizeThreshold = 8 * 1024 * 1024
// statelessCooldown is how long a peer that returned empty responses is
// excluded from task assignment in partial state mode. In full sync mode,
// stateless marking remains permanent (cooldown is not checked).
statelessCooldown = 30 * time.Second
) )
var ( var (
@ -463,7 +468,7 @@ type Syncer struct {
rates *msgrate.Trackers // Message throughput rates for peers rates *msgrate.Trackers // Message throughput rates for peers
// Request tracking during syncing phase // Request tracking during syncing phase
statelessPeers map[string]struct{} // Peers that failed to deliver state data statelessPeers map[string]time.Time // Peers that failed to deliver state data (value = when marked)
accountIdlers map[string]struct{} // Peers that aren't serving account requests accountIdlers map[string]struct{} // Peers that aren't serving account requests
bytecodeIdlers map[string]struct{} // Peers that aren't serving bytecode requests bytecodeIdlers map[string]struct{} // Peers that aren't serving bytecode requests
storageIdlers map[string]struct{} // Peers that aren't serving storage requests storageIdlers map[string]struct{} // Peers that aren't serving storage requests
@ -645,7 +650,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
trieTasks: make(map[string]common.Hash), trieTasks: make(map[string]common.Hash),
codeTasks: make(map[common.Hash]struct{}), codeTasks: make(map[common.Hash]struct{}),
} }
s.statelessPeers = make(map[string]struct{}) s.statelessPeers = make(map[string]time.Time)
s.lock.Unlock() s.lock.Unlock()
if s.startTime.IsZero() { if s.startTime.IsZero() {
@ -1061,8 +1066,11 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
} }
targetTTL := s.rates.TargetTimeout() targetTTL := s.rates.TargetTimeout()
for id := range s.accountIdlers { for id := range s.accountIdlers {
if _, ok := s.statelessPeers[id]; ok { if markedAt, ok := s.statelessPeers[id]; ok {
continue if !s.isPartialSync() || time.Since(markedAt) < statelessCooldown {
continue
}
delete(s.statelessPeers, id)
} }
idlers.ids = append(idlers.ids, id) idlers.ids = append(idlers.ids, id)
idlers.caps = append(idlers.caps, s.rates.Capacity(id, AccountRangeMsg, targetTTL)) idlers.caps = append(idlers.caps, s.rates.Capacity(id, AccountRangeMsg, targetTTL))
@ -1159,8 +1167,11 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *
} }
targetTTL := s.rates.TargetTimeout() targetTTL := s.rates.TargetTimeout()
for id := range s.bytecodeIdlers { for id := range s.bytecodeIdlers {
if _, ok := s.statelessPeers[id]; ok { if markedAt, ok := s.statelessPeers[id]; ok {
continue if !s.isPartialSync() || time.Since(markedAt) < statelessCooldown {
continue
}
delete(s.statelessPeers, id)
} }
idlers.ids = append(idlers.ids, id) idlers.ids = append(idlers.ids, id)
idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL)) idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL))
@ -1262,8 +1273,11 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
} }
targetTTL := s.rates.TargetTimeout() targetTTL := s.rates.TargetTimeout()
for id := range s.storageIdlers { for id := range s.storageIdlers {
if _, ok := s.statelessPeers[id]; ok { if markedAt, ok := s.statelessPeers[id]; ok {
continue if !s.isPartialSync() || time.Since(markedAt) < statelessCooldown {
continue
}
delete(s.statelessPeers, id)
} }
idlers.ids = append(idlers.ids, id) idlers.ids = append(idlers.ids, id)
idlers.caps = append(idlers.caps, s.rates.Capacity(id, StorageRangesMsg, targetTTL)) idlers.caps = append(idlers.caps, s.rates.Capacity(id, StorageRangesMsg, targetTTL))
@ -1419,8 +1433,11 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
} }
targetTTL := s.rates.TargetTimeout() targetTTL := s.rates.TargetTimeout()
for id := range s.trienodeHealIdlers { for id := range s.trienodeHealIdlers {
if _, ok := s.statelessPeers[id]; ok { if markedAt, ok := s.statelessPeers[id]; ok {
continue if !s.isPartialSync() || time.Since(markedAt) < statelessCooldown {
continue
}
delete(s.statelessPeers, id)
} }
idlers.ids = append(idlers.ids, id) idlers.ids = append(idlers.ids, id)
idlers.caps = append(idlers.caps, s.rates.Capacity(id, TrieNodesMsg, targetTTL)) idlers.caps = append(idlers.caps, s.rates.Capacity(id, TrieNodesMsg, targetTTL))
@ -1547,8 +1564,11 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai
} }
targetTTL := s.rates.TargetTimeout() targetTTL := s.rates.TargetTimeout()
for id := range s.bytecodeHealIdlers { for id := range s.bytecodeHealIdlers {
if _, ok := s.statelessPeers[id]; ok { if markedAt, ok := s.statelessPeers[id]; ok {
continue if !s.isPartialSync() || time.Since(markedAt) < statelessCooldown {
continue
}
delete(s.statelessPeers, id)
} }
idlers.ids = append(idlers.ids, id) idlers.ids = append(idlers.ids, id)
idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL)) idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL))
@ -2627,7 +2647,7 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
// synced to our head. // synced to our head.
if len(hashes) == 0 && len(accounts) == 0 && len(proof) == 0 { if len(hashes) == 0 && len(accounts) == 0 && len(proof) == 0 {
logger.Debug("Peer rejected account range request", "root", s.root) logger.Debug("Peer rejected account range request", "root", s.root)
s.statelessPeers[peer.ID()] = struct{}{} s.statelessPeers[peer.ID()] = time.Now()
s.lock.Unlock() s.lock.Unlock()
// Signal this request as failed, and ready for rescheduling // Signal this request as failed, and ready for rescheduling
@ -2737,7 +2757,7 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
// yet synced. // yet synced.
if len(bytecodes) == 0 { if len(bytecodes) == 0 {
logger.Debug("Peer rejected bytecode request") logger.Debug("Peer rejected bytecode request")
s.statelessPeers[peer.ID()] = struct{}{} s.statelessPeers[peer.ID()] = time.Now()
s.lock.Unlock() s.lock.Unlock()
// Signal this request as failed, and ready for rescheduling // Signal this request as failed, and ready for rescheduling
@ -2865,7 +2885,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
// synced to our head. // synced to our head.
if len(hashes) == 0 && len(proof) == 0 { if len(hashes) == 0 && len(proof) == 0 {
logger.Debug("Peer rejected storage request") logger.Debug("Peer rejected storage request")
s.statelessPeers[peer.ID()] = struct{}{} s.statelessPeers[peer.ID()] = time.Now()
s.lock.Unlock() s.lock.Unlock()
s.scheduleRevertStorageRequest(req) // reschedule request s.scheduleRevertStorageRequest(req) // reschedule request
return nil return nil
@ -2984,7 +3004,7 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
// yet synced. // yet synced.
if len(trienodes) == 0 { if len(trienodes) == 0 {
logger.Debug("Peer rejected trienode heal request") logger.Debug("Peer rejected trienode heal request")
s.statelessPeers[peer.ID()] = struct{}{} s.statelessPeers[peer.ID()] = time.Now()
s.lock.Unlock() s.lock.Unlock()
// Signal this request as failed, and ready for rescheduling // Signal this request as failed, and ready for rescheduling
@ -3091,7 +3111,7 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
// yet synced. // yet synced.
if len(bytecodes) == 0 { if len(bytecodes) == 0 {
logger.Debug("Peer rejected bytecode heal request") logger.Debug("Peer rejected bytecode heal request")
s.statelessPeers[peer.ID()] = struct{}{} s.statelessPeers[peer.ID()] = time.Now()
s.lock.Unlock() s.lock.Unlock()
// Signal this request as failed, and ready for rescheduling // Signal this request as failed, and ready for rescheduling