eth/connmanager: handle inbound and dialed peers separately

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2025-03-26 14:05:02 +01:00
parent 301b396939
commit 77d634cd9b
2 changed files with 67 additions and 45 deletions

View file

@ -291,7 +291,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
} }
eth.connman = newConnManager(&connmanConfig{ eth.connman = newConnManager(&connmanConfig{
maxDialPeers: eth.p2pServer.MaxDialedConns(), maxDialPeers: eth.p2pServer.MaxDialedConns(),
maxInboundPeers: eth.p2pServer.MaxInboundConns(),
}) })
eth.miner = miner.New(eth, config.Miner, eth.engine) eth.miner = miner.New(eth, config.Miner, eth.engine)

View file

@ -48,21 +48,25 @@ const (
// - during sync the Downloader handles peer connections co connManager is disabled // - during sync the Downloader handles peer connections co connManager is disabled
// - if not syncing and the peer count is close to the limit, it drops peers // - if not syncing and the peer count is close to the limit, it drops peers
// randomly every peerDropInterval to make space for new peers // randomly every peerDropInterval to make space for new peers
// - peers are dropped separately from the inboud pool and from the dialed pool
type connManager struct { type connManager struct {
connmanConfig connmanConfig
peersFunc getPeersFunc peersFunc getPeersFunc
syncingFunc getSyncingFunc syncingFunc getSyncingFunc
// the peerDrop timer introduces churn if we are close to limit capacity // The peerDrop timers introduce churn if we are close to limit capacity.
peerDropTimer *mclock.Alarm // We handle Dialed and Inbound connections separately
peerEventCh chan *p2p.PeerEvent peerDropDialedTimer *mclock.Alarm
sub event.Subscription peerDropInboundTimer *mclock.Alarm
wg sync.WaitGroup peerEventCh chan *p2p.PeerEvent // channel for peer event changes
sub event.Subscription // subscription to peerEventCh
wg sync.WaitGroup // wg for graceful shutdown
shutdownCh chan struct{} shutdownCh chan struct{}
} }
// callback type to get the list of connected peers. // Callback type to get the list of connected peers.
type getPeersFunc func() []*p2p.Peer type getPeersFunc func() []*p2p.Peer
// Callback type to get syncing status. // Callback type to get syncing status.
@ -70,10 +74,11 @@ type getPeersFunc func() []*p2p.Peer
type getSyncingFunc func() bool type getSyncingFunc func() bool
type connmanConfig struct { type connmanConfig struct {
maxDialPeers int // maximum number of dialed peers maxDialPeers int // maximum number of dialed peers
log log.Logger maxInboundPeers int // maximum number of inbound peers
clock mclock.Clock log log.Logger
rand *mrand.Rand clock mclock.Clock
rand *mrand.Rand
} }
func (cfg connmanConfig) withDefaults() connmanConfig { func (cfg connmanConfig) withDefaults() connmanConfig {
@ -95,10 +100,11 @@ func (cfg connmanConfig) withDefaults() connmanConfig {
func newConnManager(config *connmanConfig) *connManager { func newConnManager(config *connmanConfig) *connManager {
cfg := config.withDefaults() cfg := config.withDefaults()
cm := &connManager{ cm := &connManager{
connmanConfig: cfg, connmanConfig: cfg,
peerDropTimer: mclock.NewAlarm(cfg.clock), peerDropDialedTimer: mclock.NewAlarm(cfg.clock),
peerEventCh: make(chan *p2p.PeerEvent), peerDropInboundTimer: mclock.NewAlarm(cfg.clock),
shutdownCh: make(chan struct{}), peerEventCh: make(chan *p2p.PeerEvent),
shutdownCh: make(chan struct{}),
} }
cm.log.Info("New Connection Manager", "maxDialPeers", cm.maxDialPeers, "threshold", peerDropThreshold, "interval", peerDropInterval) cm.log.Info("New Connection Manager", "maxDialPeers", cm.maxDialPeers, "threshold", peerDropThreshold, "interval", peerDropInterval)
return cm return cm
@ -116,61 +122,74 @@ func (cm *connManager) Start(srv *p2p.Server, syncingFunc getSyncingFunc) {
// Stop the connection manager. // Stop the connection manager.
func (cm *connManager) Stop() { func (cm *connManager) Stop() {
cm.sub.Unsubscribe() cm.sub.Unsubscribe()
cm.peerDropTimer.Stop() cm.peerDropInboundTimer.Stop()
cm.peerDropDialedTimer.Stop()
close(cm.shutdownCh) close(cm.shutdownCh)
cm.wg.Wait() cm.wg.Wait()
} }
// numDialPeers returns the current number of peers dialed (not inbound). // numPeers returns the current number of peers and its breakdown (dialed or inbound).
func (cm *connManager) numDialPeers() int { func (cm *connManager) numPeers() (numPeers int, numDialed int, numInbound int) {
dialed := slices.DeleteFunc(cm.peersFunc(), (*p2p.Peer).Inbound)
return len(dialed)
}
func (cm *connManager) numPeers() (int, int, int) {
peers := cm.peersFunc() peers := cm.peersFunc()
dialed := slices.DeleteFunc(peers, (*p2p.Peer).Inbound) dialed := slices.DeleteFunc(peers, (*p2p.Peer).Inbound)
return len(peers), len(dialed), len(peers) - len(dialed) return len(peers), len(dialed), len(peers) - len(dialed)
} }
// dropRandomPeer selects one of the peers randomly and drops it from the peer pool. // dropRandomPeer selects one of the peers randomly and drops it from the peer pool.
func (cm *connManager) dropRandomPeer() bool { func (cm *connManager) dropRandomPeer(dialed bool) bool {
peers := cm.peersFunc() peers := cm.peersFunc()
// Only drop from dyndialed peers. Avoid dropping trusted peers.
// Give some time to peers before considering them for a drop.
selectDoNotDrop := func(p *p2p.Peer) bool { selectDoNotDrop := func(p *p2p.Peer) bool {
return !p.DynDialed() || if dialed {
p.Trusted() || // Only drop from dyndialed peers. Avoid dropping trusted peers.
p.Lifetime() < mclock.AbsTime(doNotDropBefore) // Give some time to peers before considering them for a drop.
return !p.DynDialed() ||
p.Trusted() ||
p.Lifetime() < mclock.AbsTime(doNotDropBefore)
} else {
// Only drop from inbound peers. Avoid dropping trusted peers.
// Give some time to peers before considering them for a drop.
return p.DynDialed() || p.StaticDialed() ||
p.Trusted() ||
p.Lifetime() < mclock.AbsTime(doNotDropBefore)
}
} }
droppable := slices.DeleteFunc(peers, selectDoNotDrop) droppable := slices.DeleteFunc(peers, selectDoNotDrop)
if len(droppable) > 0 { if len(droppable) > 0 {
p := droppable[cm.rand.Intn(len(droppable))] p := droppable[cm.rand.Intn(len(droppable))]
cm.log.Debug("dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) cm.log.Debug("dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()),
"dialed", dialed, "peercountbefore", len(peers))
p.Disconnect(p2p.DiscTooManyPeers) p.Disconnect(p2p.DiscTooManyPeers)
return true return true
} }
return false return false
} }
// updatePeerDropTimer checks and starts/stops the timer for peer drop. // updatePeerDropTimers checks and starts/stops the timer for peer drop.
func (cm *connManager) updatePeerDropTimer(syncing bool) { func (cm *connManager) updatePeerDropTimers(syncing bool) {
numpeers, out, in := cm.numPeers() numPeers, numDialed, numInbound := cm.numPeers()
cm.log.Trace("ConnManager status", "syncing", syncing, cm.log.Trace("ConnManager status", "syncing", syncing,
"peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers) "peers", numPeers, "out", numDialed, "in", numInbound,
"maxout", cm.maxDialPeers, "maxin", cm.maxInboundPeers)
if !syncing { if !syncing {
// If a drop was already scheduled, Schedule does nothing. // If a drop was already scheduled, Schedule does nothing.
if cm.maxDialPeers-cm.numDialPeers() <= peerDropThreshold { if cm.maxDialPeers-numDialed <= peerDropThreshold {
cm.peerDropTimer.Schedule(cm.clock.Now().Add(peerDropInterval)) cm.peerDropDialedTimer.Schedule(cm.clock.Now().Add(peerDropInterval))
} else { } else {
cm.peerDropTimer.Stop() cm.peerDropDialedTimer.Stop()
}
if cm.maxInboundPeers-numInbound <= peerDropThreshold {
cm.peerDropInboundTimer.Schedule(cm.clock.Now().Add(peerDropInterval))
} else {
cm.peerDropInboundTimer.Stop()
} }
} else { } else {
// Downloader is managing connections while syncing. // Downloader is managing connections while syncing.
cm.peerDropTimer.Stop() cm.peerDropDialedTimer.Stop()
cm.peerDropInboundTimer.Stop()
} }
} }
@ -193,22 +212,24 @@ func (cm *connManager) loop() {
for { for {
select { select {
case <-syncCheckTimer.C(): case <-syncCheckTimer.C():
// Update info about syncing status, and rearm the timer. // Update info about syncing status, and rearm the timers.
syncingNew := cm.syncingFunc() syncingNew := cm.syncingFunc()
if syncing != syncingNew { if syncing != syncingNew {
// Syncing status changed, we might need to update the timer. // Syncing status changed, we might need to update the timers.
cm.log.Trace("Sync status changed", "syncing", syncingNew) cm.log.Trace("Sync status changed", "syncing", syncingNew)
syncing = syncingNew syncing = syncingNew
cm.updatePeerDropTimer(syncing) cm.updatePeerDropTimers(syncing)
} }
syncCheckTimer.Schedule(cm.clock.Now().Add(syncCheckInterval)) syncCheckTimer.Schedule(cm.clock.Now().Add(syncCheckInterval))
case ev := <-cm.peerEventCh: case ev := <-cm.peerEventCh:
if ev.Type == p2p.PeerEventTypeAdd || ev.Type == p2p.PeerEventTypeDrop { if ev.Type == p2p.PeerEventTypeAdd || ev.Type == p2p.PeerEventTypeDrop {
// Number of peers changed, we might need to start the timer. // Number of peers changed, we might need to start the timers.
cm.updatePeerDropTimer(syncing) cm.updatePeerDropTimers(syncing)
} }
case <-cm.peerDropTimer.C(): case <-cm.peerDropDialedTimer.C():
cm.dropRandomPeer() cm.dropRandomPeer(true)
case <-cm.peerDropInboundTimer.C():
cm.dropRandomPeer(false)
case <-cm.shutdownCh: case <-cm.shutdownCh:
return return
} }