eth/connmanager: monitor sync status

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2025-03-26 01:44:53 +01:00
parent cb5d672649
commit 301b396939
2 changed files with 61 additions and 29 deletions

View file

@ -405,7 +405,7 @@ func (s *Ethereum) Start() error {
s.handler.Start(s.p2pServer.MaxPeers)
// Start the connection manager
s.connman.Start(s.p2pServer, s.Synced)
s.connman.Start(s.p2pServer, func() bool { return !s.Synced() })
// start log indexer
s.filterMaps.Start()

View file

@ -40,15 +40,18 @@ const (
// dropping when no more peers can be added. Larger numbers result in more
// aggressive drop behavior.
peerDropThreshold = 0
// Sync status poll interval (no need to be too reactive here)
syncCheckInterval = 60 * time.Second
)
// connManager monitors the state of the peer pool and makes changes as follows:
// - if the peer count is close to the limit, it drops peers randomly every
// peerDropInterval to make space for new peers
// - during sync the Downloader handles peer connections co connManager is disabled
// - if not syncing and the peer count is close to the limit, it drops peers
// randomly every peerDropInterval to make space for new peers
type connManager struct {
connmanConfig
peersFunc getPeersFunc
syncFunc getSyncFunc
peersFunc getPeersFunc
syncingFunc getSyncingFunc
// the peerDrop timer introduces churn if we are close to limit capacity
peerDropTimer *mclock.Alarm
@ -62,8 +65,9 @@ type connManager struct {
// callback type to get the list of connected peers.
type getPeersFunc func() []*p2p.Peer
// callback type to get sync status.
type getSyncFunc func() bool
// Callback type to get syncing status.
// Returns true while syncing, false when synced.
type getSyncingFunc func() bool
type connmanConfig struct {
maxDialPeers int // maximum number of dialed peers
@ -100,15 +104,16 @@ func newConnManager(config *connmanConfig) *connManager {
return cm
}
func (cm *connManager) Start(srv *p2p.Server, syncFunc getSyncFunc) {
// Start the connection manager.
func (cm *connManager) Start(srv *p2p.Server, syncingFunc getSyncingFunc) {
cm.wg.Add(1)
cm.peersFunc = srv.Peers
cm.syncFunc = syncFunc
cm.syncingFunc = syncingFunc
cm.sub = srv.SubscribeEvents(cm.peerEventCh)
go cm.loop()
}
// stop the connection manager.
// Stop the connection manager.
func (cm *connManager) Stop() {
cm.sub.Unsubscribe()
cm.peerDropTimer.Stop()
@ -149,32 +154,59 @@ func (cm *connManager) dropRandomPeer() bool {
return false
}
// updatePeerDropTimer checks and starts/stops the timer for peer drop.
func (cm *connManager) updatePeerDropTimer(syncing bool) {
numpeers, out, in := cm.numPeers()
cm.log.Trace("ConnManager status", "syncing", syncing,
"peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers)
if !syncing {
// If a drop was already scheduled, Schedule does nothing.
if cm.maxDialPeers-cm.numDialPeers() <= peerDropThreshold {
cm.peerDropTimer.Schedule(cm.clock.Now().Add(peerDropInterval))
} else {
cm.peerDropTimer.Stop()
}
} else {
// Downloader is managing connections while syncing.
cm.peerDropTimer.Stop()
}
}
// loop is the main loop of the connection manager.
func (cm *connManager) loop() {
defer cm.wg.Done()
// Set up periodic timer to pull syncing status.
// We could get syncing status in a few ways:
// - poll the sync status (we use this for now)
// - subscribe to Downloader.mux
// - subscribe to DownloaderAPI (which itself polls the sync status)
syncing := cm.syncingFunc()
cm.log.Trace("Sync status", "syncing", syncing)
syncCheckTimer := mclock.NewAlarm(cm.connmanConfig.clock)
syncCheckTimer.Schedule(cm.clock.Now().Add(syncCheckInterval))
defer syncCheckTimer.Stop()
for {
select {
case ev := <-cm.peerEventCh:
switch ev.Type {
case p2p.PeerEventTypeAdd:
// check and start timer for peer drop
// If a drop was already scheduled, Schedule does nothing.
numpeers, out, in := cm.numPeers()
cm.log.Trace("addPeerCh", "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers)
if cm.maxDialPeers-cm.numDialPeers() <= peerDropThreshold {
cm.peerDropTimer.Schedule(cm.clock.Now().Add(peerDropInterval))
}
case p2p.PeerEventTypeDrop:
// check and stop timer for peer drop
numpeers, out, in := cm.numPeers()
cm.log.Trace("remPeerCh", "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers)
if cm.maxDialPeers-cm.numDialPeers() > peerDropThreshold {
cm.peerDropTimer.Stop()
}
case <-syncCheckTimer.C():
// Update info about syncing status, and rearm the timer.
syncingNew := cm.syncingFunc()
if syncing != syncingNew {
// Syncing status changed, we might need to update the timer.
cm.log.Trace("Sync status changed", "syncing", syncingNew)
syncing = syncingNew
cm.updatePeerDropTimer(syncing)
}
syncCheckTimer.Schedule(cm.clock.Now().Add(syncCheckInterval))
case ev := <-cm.peerEventCh:
if ev.Type == p2p.PeerEventTypeAdd || ev.Type == p2p.PeerEventTypeDrop {
// Number of peers changed, we might need to start the timer.
cm.updatePeerDropTimer(syncing)
}
case <-cm.peerDropTimer.C():
cm.dropRandomPeer()
case <-cm.shutdownCh: