From 77d634cd9b09fcf89c38c196ac83362bebc03250 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Wed, 26 Mar 2025 14:05:02 +0100 Subject: [PATCH] eth/connmanager: handle inbound and dialed peers separately Signed-off-by: Csaba Kiraly --- eth/backend.go | 3 +- eth/connmanager.go | 109 +++++++++++++++++++++++++++------------------ 2 files changed, 67 insertions(+), 45 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index e9e20c40e1..1e2b58ed18 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -291,7 +291,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } eth.connman = newConnManager(&connmanConfig{ - maxDialPeers: eth.p2pServer.MaxDialedConns(), + maxDialPeers: eth.p2pServer.MaxDialedConns(), + maxInboundPeers: eth.p2pServer.MaxInboundConns(), }) eth.miner = miner.New(eth, config.Miner, eth.engine) diff --git a/eth/connmanager.go b/eth/connmanager.go index 947eb54751..e48d5e6af5 100644 --- a/eth/connmanager.go +++ b/eth/connmanager.go @@ -48,21 +48,25 @@ const ( // - during sync the Downloader handles peer connections co connManager is disabled // - if not syncing and the peer count is close to the limit, it drops peers // randomly every peerDropInterval to make space for new peers +// - peers are dropped separately from the inboud pool and from the dialed pool type connManager struct { connmanConfig peersFunc getPeersFunc syncingFunc getSyncingFunc - // the peerDrop timer introduces churn if we are close to limit capacity - peerDropTimer *mclock.Alarm - peerEventCh chan *p2p.PeerEvent - sub event.Subscription + // The peerDrop timers introduce churn if we are close to limit capacity. + // We handle Dialed and Inbound connections separately + peerDropDialedTimer *mclock.Alarm + peerDropInboundTimer *mclock.Alarm - wg sync.WaitGroup + peerEventCh chan *p2p.PeerEvent // channel for peer event changes + sub event.Subscription // subscription to peerEventCh + + wg sync.WaitGroup // wg for graceful shutdown shutdownCh chan struct{} } -// callback type to get the list of connected peers. +// Callback type to get the list of connected peers. type getPeersFunc func() []*p2p.Peer // Callback type to get syncing status. @@ -70,10 +74,11 @@ type getPeersFunc func() []*p2p.Peer type getSyncingFunc func() bool type connmanConfig struct { - maxDialPeers int // maximum number of dialed peers - log log.Logger - clock mclock.Clock - rand *mrand.Rand + maxDialPeers int // maximum number of dialed peers + maxInboundPeers int // maximum number of inbound peers + log log.Logger + clock mclock.Clock + rand *mrand.Rand } func (cfg connmanConfig) withDefaults() connmanConfig { @@ -95,10 +100,11 @@ func (cfg connmanConfig) withDefaults() connmanConfig { func newConnManager(config *connmanConfig) *connManager { cfg := config.withDefaults() cm := &connManager{ - connmanConfig: cfg, - peerDropTimer: mclock.NewAlarm(cfg.clock), - peerEventCh: make(chan *p2p.PeerEvent), - shutdownCh: make(chan struct{}), + connmanConfig: cfg, + peerDropDialedTimer: mclock.NewAlarm(cfg.clock), + peerDropInboundTimer: mclock.NewAlarm(cfg.clock), + peerEventCh: make(chan *p2p.PeerEvent), + shutdownCh: make(chan struct{}), } cm.log.Info("New Connection Manager", "maxDialPeers", cm.maxDialPeers, "threshold", peerDropThreshold, "interval", peerDropInterval) return cm @@ -116,61 +122,74 @@ func (cm *connManager) Start(srv *p2p.Server, syncingFunc getSyncingFunc) { // Stop the connection manager. func (cm *connManager) Stop() { cm.sub.Unsubscribe() - cm.peerDropTimer.Stop() + cm.peerDropInboundTimer.Stop() + cm.peerDropDialedTimer.Stop() close(cm.shutdownCh) cm.wg.Wait() } -// numDialPeers returns the current number of peers dialed (not inbound). -func (cm *connManager) numDialPeers() int { - dialed := slices.DeleteFunc(cm.peersFunc(), (*p2p.Peer).Inbound) - return len(dialed) -} - -func (cm *connManager) numPeers() (int, int, int) { +// numPeers returns the current number of peers and its breakdown (dialed or inbound). +func (cm *connManager) numPeers() (numPeers int, numDialed int, numInbound int) { peers := cm.peersFunc() dialed := slices.DeleteFunc(peers, (*p2p.Peer).Inbound) return len(peers), len(dialed), len(peers) - len(dialed) } // dropRandomPeer selects one of the peers randomly and drops it from the peer pool. -func (cm *connManager) dropRandomPeer() bool { +func (cm *connManager) dropRandomPeer(dialed bool) bool { peers := cm.peersFunc() - // Only drop from dyndialed peers. Avoid dropping trusted peers. - // Give some time to peers before considering them for a drop. selectDoNotDrop := func(p *p2p.Peer) bool { - return !p.DynDialed() || - p.Trusted() || - p.Lifetime() < mclock.AbsTime(doNotDropBefore) + if dialed { + // Only drop from dyndialed peers. Avoid dropping trusted peers. + // Give some time to peers before considering them for a drop. + return !p.DynDialed() || + p.Trusted() || + p.Lifetime() < mclock.AbsTime(doNotDropBefore) + } else { + // Only drop from inbound peers. Avoid dropping trusted peers. + // Give some time to peers before considering them for a drop. + return p.DynDialed() || p.StaticDialed() || + p.Trusted() || + p.Lifetime() < mclock.AbsTime(doNotDropBefore) + } } droppable := slices.DeleteFunc(peers, selectDoNotDrop) if len(droppable) > 0 { p := droppable[cm.rand.Intn(len(droppable))] - cm.log.Debug("dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) + cm.log.Debug("dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), + "dialed", dialed, "peercountbefore", len(peers)) p.Disconnect(p2p.DiscTooManyPeers) return true } return false } -// updatePeerDropTimer checks and starts/stops the timer for peer drop. -func (cm *connManager) updatePeerDropTimer(syncing bool) { +// updatePeerDropTimers checks and starts/stops the timer for peer drop. +func (cm *connManager) updatePeerDropTimers(syncing bool) { - numpeers, out, in := cm.numPeers() + numPeers, numDialed, numInbound := cm.numPeers() cm.log.Trace("ConnManager status", "syncing", syncing, - "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers) + "peers", numPeers, "out", numDialed, "in", numInbound, + "maxout", cm.maxDialPeers, "maxin", cm.maxInboundPeers) if !syncing { // If a drop was already scheduled, Schedule does nothing. - if cm.maxDialPeers-cm.numDialPeers() <= peerDropThreshold { - cm.peerDropTimer.Schedule(cm.clock.Now().Add(peerDropInterval)) + if cm.maxDialPeers-numDialed <= peerDropThreshold { + cm.peerDropDialedTimer.Schedule(cm.clock.Now().Add(peerDropInterval)) } else { - cm.peerDropTimer.Stop() + cm.peerDropDialedTimer.Stop() + } + + if cm.maxInboundPeers-numInbound <= peerDropThreshold { + cm.peerDropInboundTimer.Schedule(cm.clock.Now().Add(peerDropInterval)) + } else { + cm.peerDropInboundTimer.Stop() } } else { // Downloader is managing connections while syncing. - cm.peerDropTimer.Stop() + cm.peerDropDialedTimer.Stop() + cm.peerDropInboundTimer.Stop() } } @@ -193,22 +212,24 @@ func (cm *connManager) loop() { for { select { case <-syncCheckTimer.C(): - // Update info about syncing status, and rearm the timer. + // Update info about syncing status, and rearm the timers. syncingNew := cm.syncingFunc() if syncing != syncingNew { - // Syncing status changed, we might need to update the timer. + // Syncing status changed, we might need to update the timers. cm.log.Trace("Sync status changed", "syncing", syncingNew) syncing = syncingNew - cm.updatePeerDropTimer(syncing) + cm.updatePeerDropTimers(syncing) } syncCheckTimer.Schedule(cm.clock.Now().Add(syncCheckInterval)) case ev := <-cm.peerEventCh: if ev.Type == p2p.PeerEventTypeAdd || ev.Type == p2p.PeerEventTypeDrop { - // Number of peers changed, we might need to start the timer. - cm.updatePeerDropTimer(syncing) + // Number of peers changed, we might need to start the timers. + cm.updatePeerDropTimers(syncing) } - case <-cm.peerDropTimer.C(): - cm.dropRandomPeer() + case <-cm.peerDropDialedTimer.C(): + cm.dropRandomPeer(true) + case <-cm.peerDropInboundTimer.C(): + cm.dropRandomPeer(false) case <-cm.shutdownCh: return }