diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 8b4e4de074..e9533f5b7a 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -97,6 +97,11 @@ const ( // batchSizeThreshold is the maximum size allowed for gentrie batch. batchSizeThreshold = 8 * 1024 * 1024 + + // statelessCooldown is how long a peer that returned empty responses is + // excluded from task assignment in partial state mode. In full sync mode, + // stateless marking remains permanent (cooldown is not checked). + statelessCooldown = 30 * time.Second ) var ( @@ -463,7 +468,7 @@ type Syncer struct { rates *msgrate.Trackers // Message throughput rates for peers // Request tracking during syncing phase - statelessPeers map[string]struct{} // Peers that failed to deliver state data + statelessPeers map[string]time.Time // Peers that failed to deliver state data (value = when marked) accountIdlers map[string]struct{} // Peers that aren't serving account requests bytecodeIdlers map[string]struct{} // Peers that aren't serving bytecode requests storageIdlers map[string]struct{} // Peers that aren't serving storage requests @@ -645,7 +650,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { trieTasks: make(map[string]common.Hash), codeTasks: make(map[common.Hash]struct{}), } - s.statelessPeers = make(map[string]struct{}) + s.statelessPeers = make(map[string]time.Time) s.lock.Unlock() if s.startTime.IsZero() { @@ -1061,8 +1066,11 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac } targetTTL := s.rates.TargetTimeout() for id := range s.accountIdlers { - if _, ok := s.statelessPeers[id]; ok { - continue + if markedAt, ok := s.statelessPeers[id]; ok { + if !s.isPartialSync() || time.Since(markedAt) < statelessCooldown { + continue + } + delete(s.statelessPeers, id) } idlers.ids = append(idlers.ids, id) idlers.caps = append(idlers.caps, s.rates.Capacity(id, AccountRangeMsg, targetTTL)) @@ -1159,8 +1167,11 @@ func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan * } targetTTL := s.rates.TargetTimeout() for id := range s.bytecodeIdlers { - if _, ok := s.statelessPeers[id]; ok { - continue + if markedAt, ok := s.statelessPeers[id]; ok { + if !s.isPartialSync() || time.Since(markedAt) < statelessCooldown { + continue + } + delete(s.statelessPeers, id) } idlers.ids = append(idlers.ids, id) idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL)) @@ -1262,8 +1273,11 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st } targetTTL := s.rates.TargetTimeout() for id := range s.storageIdlers { - if _, ok := s.statelessPeers[id]; ok { - continue + if markedAt, ok := s.statelessPeers[id]; ok { + if !s.isPartialSync() || time.Since(markedAt) < statelessCooldown { + continue + } + delete(s.statelessPeers, id) } idlers.ids = append(idlers.ids, id) idlers.caps = append(idlers.caps, s.rates.Capacity(id, StorageRangesMsg, targetTTL)) @@ -1419,8 +1433,11 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai } targetTTL := s.rates.TargetTimeout() for id := range s.trienodeHealIdlers { - if _, ok := s.statelessPeers[id]; ok { - continue + if markedAt, ok := s.statelessPeers[id]; ok { + if !s.isPartialSync() || time.Since(markedAt) < statelessCooldown { + continue + } + delete(s.statelessPeers, id) } idlers.ids = append(idlers.ids, id) idlers.caps = append(idlers.caps, s.rates.Capacity(id, TrieNodesMsg, targetTTL)) @@ -1547,8 +1564,11 @@ func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fai } targetTTL := s.rates.TargetTimeout() for id := range s.bytecodeHealIdlers { - if _, ok := s.statelessPeers[id]; ok { - continue + if markedAt, ok := s.statelessPeers[id]; ok { + if !s.isPartialSync() || time.Since(markedAt) < statelessCooldown { + continue + } + delete(s.statelessPeers, id) } idlers.ids = append(idlers.ids, id) idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL)) @@ -2627,7 +2647,7 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco // synced to our head. if len(hashes) == 0 && len(accounts) == 0 && len(proof) == 0 { logger.Debug("Peer rejected account range request", "root", s.root) - s.statelessPeers[peer.ID()] = struct{}{} + s.statelessPeers[peer.ID()] = time.Now() s.lock.Unlock() // Signal this request as failed, and ready for rescheduling @@ -2737,7 +2757,7 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error // yet synced. if len(bytecodes) == 0 { logger.Debug("Peer rejected bytecode request") - s.statelessPeers[peer.ID()] = struct{}{} + s.statelessPeers[peer.ID()] = time.Now() s.lock.Unlock() // Signal this request as failed, and ready for rescheduling @@ -2865,7 +2885,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo // synced to our head. if len(hashes) == 0 && len(proof) == 0 { logger.Debug("Peer rejected storage request") - s.statelessPeers[peer.ID()] = struct{}{} + s.statelessPeers[peer.ID()] = time.Now() s.lock.Unlock() s.scheduleRevertStorageRequest(req) // reschedule request return nil @@ -2984,7 +3004,7 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error // yet synced. if len(trienodes) == 0 { logger.Debug("Peer rejected trienode heal request") - s.statelessPeers[peer.ID()] = struct{}{} + s.statelessPeers[peer.ID()] = time.Now() s.lock.Unlock() // Signal this request as failed, and ready for rescheduling @@ -3091,7 +3111,7 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e // yet synced. if len(bytecodes) == 0 { logger.Debug("Peer rejected bytecode heal request") - s.statelessPeers[peer.ID()] = struct{}{} + s.statelessPeers[peer.ID()] = time.Now() s.lock.Unlock() // Signal this request as failed, and ready for rescheduling