eth: renaming Connection Manager to Dropper

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2025-04-07 16:41:12 +02:00
parent 8bb7f1ed11
commit 42d2c9b588
No known key found for this signature in database
GPG key ID: 0FE274EE8C95166E
2 changed files with 25 additions and 25 deletions

View file

@ -76,7 +76,7 @@ type Ethereum struct {
handler *handler handler *handler
discmix *enode.FairMix discmix *enode.FairMix
connman *connManager dropper *dropper
// DB interfaces // DB interfaces
chainDb ethdb.Database // Block chain database chainDb ethdb.Database // Block chain database
@ -290,7 +290,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err return nil, err
} }
eth.connman = newConnManager(&connmanConfig{ eth.dropper = newDropper(&dropperConfig{
maxDialPeers: eth.p2pServer.MaxDialedConns(), maxDialPeers: eth.p2pServer.MaxDialedConns(),
maxInboundPeers: eth.p2pServer.MaxInboundConns(), maxInboundPeers: eth.p2pServer.MaxInboundConns(),
}) })
@ -406,7 +406,7 @@ func (s *Ethereum) Start() error {
s.handler.Start(s.p2pServer.MaxPeers) s.handler.Start(s.p2pServer.MaxPeers)
// Start the connection manager // Start the connection manager
s.connman.Start(s.p2pServer, func() bool { return !s.Synced() }) s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() })
// start log indexer // start log indexer
s.filterMaps.Start() s.filterMaps.Start()
@ -509,7 +509,7 @@ func (s *Ethereum) setupDiscovery() error {
func (s *Ethereum) Stop() error { func (s *Ethereum) Stop() error {
// Stop all the peer-related stuff first. // Stop all the peer-related stuff first.
s.discmix.Close() s.discmix.Close()
s.connman.Stop() s.dropper.Stop()
s.handler.Stop() s.handler.Stop()
// Then stop everything else. // Then stop everything else.

View file

@ -46,13 +46,13 @@ const (
syncCheckInterval = 60 * time.Second syncCheckInterval = 60 * time.Second
) )
// connManager monitors the state of the peer pool and makes changes as follows: // dropper monitors the state of the peer pool and makes changes as follows:
// - during sync the Downloader handles peer connections co connManager is disabled // - during sync the Downloader handles peer connections, so dropper is disabled
// - if not syncing and the peer count is close to the limit, it drops peers // - if not syncing and the peer count is close to the limit, it drops peers
// randomly every peerDropInterval to make space for new peers // randomly every peerDropInterval to make space for new peers
// - peers are dropped separately from the inboud pool and from the dialed pool // - peers are dropped separately from the inboud pool and from the dialed pool
type connManager struct { type dropper struct {
connmanConfig dropperConfig
peersFunc getPeersFunc peersFunc getPeersFunc
syncingFunc getSyncingFunc syncingFunc getSyncingFunc
@ -75,7 +75,7 @@ type getPeersFunc func() []*p2p.Peer
// Returns true while syncing, false when synced. // Returns true while syncing, false when synced.
type getSyncingFunc func() bool type getSyncingFunc func() bool
type connmanConfig struct { type dropperConfig struct {
maxDialPeers int // maximum number of dialed peers maxDialPeers int // maximum number of dialed peers
maxInboundPeers int // maximum number of inbound peers maxInboundPeers int // maximum number of inbound peers
log log.Logger log log.Logger
@ -83,7 +83,7 @@ type connmanConfig struct {
rand *mrand.Rand rand *mrand.Rand
} }
func (cfg connmanConfig) withDefaults() connmanConfig { func (cfg dropperConfig) withDefaults() dropperConfig {
if cfg.log == nil { if cfg.log == nil {
cfg.log = log.Root() cfg.log = log.Root()
} }
@ -99,10 +99,10 @@ func (cfg connmanConfig) withDefaults() connmanConfig {
return cfg return cfg
} }
func newConnManager(config *connmanConfig) *connManager { func newDropper(config *dropperConfig) *dropper {
cfg := config.withDefaults() cfg := config.withDefaults()
cm := &connManager{ cm := &dropper{
connmanConfig: cfg, dropperConfig: cfg,
peerDropDialedTimer: mclock.NewAlarm(cfg.clock), peerDropDialedTimer: mclock.NewAlarm(cfg.clock),
peerDropInboundTimer: mclock.NewAlarm(cfg.clock), peerDropInboundTimer: mclock.NewAlarm(cfg.clock),
peerEventCh: make(chan *p2p.PeerEvent), peerEventCh: make(chan *p2p.PeerEvent),
@ -111,13 +111,13 @@ func newConnManager(config *connmanConfig) *connManager {
if peerDropIntervalMin > peerDropIntervalMax { if peerDropIntervalMin > peerDropIntervalMax {
panic("peerDropIntervalMin duration must be less than or equal to peerDropIntervalMax duration") panic("peerDropIntervalMin duration must be less than or equal to peerDropIntervalMax duration")
} }
cm.log.Info("New Connection Manager", "maxDialPeers", cm.maxDialPeers, "threshold", peerDropThreshold, cm.log.Info("New Dropper", "maxDialPeers", cm.maxDialPeers, "threshold", peerDropThreshold,
"intervalMin", peerDropIntervalMin, "intervalMax", peerDropIntervalMax) "intervalMin", peerDropIntervalMin, "intervalMax", peerDropIntervalMax)
return cm return cm
} }
// Start the connection manager. // Start the dropper.
func (cm *connManager) Start(srv *p2p.Server, syncingFunc getSyncingFunc) { func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) {
cm.wg.Add(1) cm.wg.Add(1)
cm.peersFunc = srv.Peers cm.peersFunc = srv.Peers
cm.syncingFunc = syncingFunc cm.syncingFunc = syncingFunc
@ -125,8 +125,8 @@ func (cm *connManager) Start(srv *p2p.Server, syncingFunc getSyncingFunc) {
go cm.loop() go cm.loop()
} }
// Stop the connection manager. // Stop the dropper.
func (cm *connManager) Stop() { func (cm *dropper) Stop() {
cm.sub.Unsubscribe() cm.sub.Unsubscribe()
cm.peerDropInboundTimer.Stop() cm.peerDropInboundTimer.Stop()
cm.peerDropDialedTimer.Stop() cm.peerDropDialedTimer.Stop()
@ -135,14 +135,14 @@ func (cm *connManager) Stop() {
} }
// numPeers returns the current number of peers and its breakdown (dialed or inbound). // numPeers returns the current number of peers and its breakdown (dialed or inbound).
func (cm *connManager) numPeers() (numPeers int, numDialed int, numInbound int) { func (cm *dropper) numPeers() (numPeers int, numDialed int, numInbound int) {
peers := cm.peersFunc() peers := cm.peersFunc()
dialed := slices.DeleteFunc(peers, (*p2p.Peer).Inbound) dialed := slices.DeleteFunc(peers, (*p2p.Peer).Inbound)
return len(peers), len(dialed), len(peers) - len(dialed) return len(peers), len(dialed), len(peers) - len(dialed)
} }
// dropRandomPeer selects one of the peers randomly and drops it from the peer pool. // dropRandomPeer selects one of the peers randomly and drops it from the peer pool.
func (cm *connManager) dropRandomPeer(dialed bool) bool { func (cm *dropper) dropRandomPeer(dialed bool) bool {
peers := cm.peersFunc() peers := cm.peersFunc()
selectDoNotDrop := func(p *p2p.Peer) bool { selectDoNotDrop := func(p *p2p.Peer) bool {
@ -183,9 +183,9 @@ func randomDuration(rand *mrand.Rand, min, max time.Duration) time.Duration {
} }
// updatePeerDropTimers checks and starts/stops the timer for peer drop. // updatePeerDropTimers checks and starts/stops the timer for peer drop.
func (cm *connManager) updatePeerDropTimers(syncing bool) { func (cm *dropper) updatePeerDropTimers(syncing bool) {
numPeers, numDialed, numInbound := cm.numPeers() numPeers, numDialed, numInbound := cm.numPeers()
cm.log.Trace("ConnManager status", "syncing", syncing, cm.log.Trace("Dropper status", "syncing", syncing,
"peers", numPeers, "out", numDialed, "in", numInbound, "peers", numPeers, "out", numDialed, "in", numInbound,
"maxout", cm.maxDialPeers, "maxin", cm.maxInboundPeers) "maxout", cm.maxDialPeers, "maxin", cm.maxInboundPeers)
@ -211,8 +211,8 @@ func (cm *connManager) updatePeerDropTimers(syncing bool) {
} }
} }
// loop is the main loop of the connection manager. // loop is the main loop of the connection dropper.
func (cm *connManager) loop() { func (cm *dropper) loop() {
defer cm.wg.Done() defer cm.wg.Done()
// Set up periodic timer to pull syncing status. // Set up periodic timer to pull syncing status.
@ -222,7 +222,7 @@ func (cm *connManager) loop() {
// - subscribe to DownloaderAPI (which itself polls the sync status) // - subscribe to DownloaderAPI (which itself polls the sync status)
syncing := cm.syncingFunc() syncing := cm.syncingFunc()
cm.log.Trace("Sync status", "syncing", syncing) cm.log.Trace("Sync status", "syncing", syncing)
syncCheckTimer := mclock.NewAlarm(cm.connmanConfig.clock) syncCheckTimer := mclock.NewAlarm(cm.dropperConfig.clock)
syncCheckTimer.Schedule(cm.clock.Now().Add(syncCheckInterval)) syncCheckTimer.Schedule(cm.clock.Now().Add(syncCheckInterval))
defer syncCheckTimer.Stop() defer syncCheckTimer.Stop()