diff --git a/eth/backend.go b/eth/backend.go index bf2ac1d5ed..e9e20c40e1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -405,7 +405,7 @@ func (s *Ethereum) Start() error { s.handler.Start(s.p2pServer.MaxPeers) // Start the connection manager - s.connman.Start(s.p2pServer, s.Synced) + s.connman.Start(s.p2pServer, func() bool { return !s.Synced() }) // start log indexer s.filterMaps.Start() diff --git a/eth/connmanager.go b/eth/connmanager.go index 92a86dbe13..947eb54751 100644 --- a/eth/connmanager.go +++ b/eth/connmanager.go @@ -40,15 +40,18 @@ const ( // dropping when no more peers can be added. Larger numbers result in more // aggressive drop behavior. peerDropThreshold = 0 + // Sync status poll interval (no need to be too reactive here) + syncCheckInterval = 60 * time.Second ) // connManager monitors the state of the peer pool and makes changes as follows: -// - if the peer count is close to the limit, it drops peers randomly every -// peerDropInterval to make space for new peers +// - during sync the Downloader handles peer connections co connManager is disabled +// - if not syncing and the peer count is close to the limit, it drops peers +// randomly every peerDropInterval to make space for new peers type connManager struct { connmanConfig - peersFunc getPeersFunc - syncFunc getSyncFunc + peersFunc getPeersFunc + syncingFunc getSyncingFunc // the peerDrop timer introduces churn if we are close to limit capacity peerDropTimer *mclock.Alarm @@ -62,8 +65,9 @@ type connManager struct { // callback type to get the list of connected peers. type getPeersFunc func() []*p2p.Peer -// callback type to get sync status. -type getSyncFunc func() bool +// Callback type to get syncing status. +// Returns true while syncing, false when synced. +type getSyncingFunc func() bool type connmanConfig struct { maxDialPeers int // maximum number of dialed peers @@ -100,15 +104,16 @@ func newConnManager(config *connmanConfig) *connManager { return cm } -func (cm *connManager) Start(srv *p2p.Server, syncFunc getSyncFunc) { +// Start the connection manager. +func (cm *connManager) Start(srv *p2p.Server, syncingFunc getSyncingFunc) { cm.wg.Add(1) cm.peersFunc = srv.Peers - cm.syncFunc = syncFunc + cm.syncingFunc = syncingFunc cm.sub = srv.SubscribeEvents(cm.peerEventCh) go cm.loop() } -// stop the connection manager. +// Stop the connection manager. func (cm *connManager) Stop() { cm.sub.Unsubscribe() cm.peerDropTimer.Stop() @@ -149,32 +154,59 @@ func (cm *connManager) dropRandomPeer() bool { return false } +// updatePeerDropTimer checks and starts/stops the timer for peer drop. +func (cm *connManager) updatePeerDropTimer(syncing bool) { + + numpeers, out, in := cm.numPeers() + cm.log.Trace("ConnManager status", "syncing", syncing, + "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers) + + if !syncing { + // If a drop was already scheduled, Schedule does nothing. + if cm.maxDialPeers-cm.numDialPeers() <= peerDropThreshold { + cm.peerDropTimer.Schedule(cm.clock.Now().Add(peerDropInterval)) + } else { + cm.peerDropTimer.Stop() + } + } else { + // Downloader is managing connections while syncing. + cm.peerDropTimer.Stop() + } +} + // loop is the main loop of the connection manager. func (cm *connManager) loop() { + defer cm.wg.Done() + // Set up periodic timer to pull syncing status. + // We could get syncing status in a few ways: + // - poll the sync status (we use this for now) + // - subscribe to Downloader.mux + // - subscribe to DownloaderAPI (which itself polls the sync status) + syncing := cm.syncingFunc() + cm.log.Trace("Sync status", "syncing", syncing) + syncCheckTimer := mclock.NewAlarm(cm.connmanConfig.clock) + syncCheckTimer.Schedule(cm.clock.Now().Add(syncCheckInterval)) + defer syncCheckTimer.Stop() + for { select { - case ev := <-cm.peerEventCh: - switch ev.Type { - case p2p.PeerEventTypeAdd: - // check and start timer for peer drop - // If a drop was already scheduled, Schedule does nothing. - numpeers, out, in := cm.numPeers() - cm.log.Trace("addPeerCh", "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers) - if cm.maxDialPeers-cm.numDialPeers() <= peerDropThreshold { - cm.peerDropTimer.Schedule(cm.clock.Now().Add(peerDropInterval)) - } - - case p2p.PeerEventTypeDrop: - // check and stop timer for peer drop - numpeers, out, in := cm.numPeers() - cm.log.Trace("remPeerCh", "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers) - if cm.maxDialPeers-cm.numDialPeers() > peerDropThreshold { - cm.peerDropTimer.Stop() - } + case <-syncCheckTimer.C(): + // Update info about syncing status, and rearm the timer. + syncingNew := cm.syncingFunc() + if syncing != syncingNew { + // Syncing status changed, we might need to update the timer. + cm.log.Trace("Sync status changed", "syncing", syncingNew) + syncing = syncingNew + cm.updatePeerDropTimer(syncing) + } + syncCheckTimer.Schedule(cm.clock.Now().Add(syncCheckInterval)) + case ev := <-cm.peerEventCh: + if ev.Type == p2p.PeerEventTypeAdd || ev.Type == p2p.PeerEventTypeDrop { + // Number of peers changed, we might need to start the timer. + cm.updatePeerDropTimer(syncing) } - case <-cm.peerDropTimer.C(): cm.dropRandomPeer() case <-cm.shutdownCh: