diff --git a/eth/backend.go b/eth/backend.go index f5e00f927a..61a0582e08 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -290,10 +290,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } - eth.dropper = newDropper(&dropperConfig{ - maxDialPeers: eth.p2pServer.MaxDialedConns(), - maxInboundPeers: eth.p2pServer.MaxInboundConns(), - }) + eth.dropper = newDropper(eth.p2pServer.MaxDialedConns(), eth.p2pServer.MaxInboundConns()) eth.miner = miner.New(eth, config.Miner, eth.engine) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) diff --git a/eth/dropper.go b/eth/dropper.go index ba6ae5726e..826b563f55 100644 --- a/eth/dropper.go +++ b/eth/dropper.go @@ -24,7 +24,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" ) @@ -50,17 +49,16 @@ const ( // randomly every peerDropInterval to make space for new peers // - peers are dropped separately from the inboud pool and from the dialed pool type dropper struct { - cfg dropperConfig - peersFunc getPeersFunc - syncingFunc getSyncingFunc + maxDialPeers int // maximum number of dialed peers + maxInboundPeers int // maximum number of inbound peers + peersFunc getPeersFunc + syncingFunc getSyncingFunc - // The peerDrop timers introduce churn if we are close to limit capacity. + // peerDropTimer introduces churn if we are close to limit capacity. // We handle Dialed and Inbound connections separately - peerDropDialedTimer *mclock.Alarm - peerDropInboundTimer *mclock.Alarm + peerDropTimer *time.Timer peerEventCh chan *p2p.PeerEvent // channel for peer event changes - sub event.Subscription // subscription to peerEventCh wg sync.WaitGroup // wg for graceful shutdown shutdownCh chan struct{} @@ -73,85 +71,62 @@ type getPeersFunc func() []*p2p.Peer // Returns true while syncing, false when synced. type getSyncingFunc func() bool -type dropperConfig struct { - maxDialPeers int // maximum number of dialed peers - maxInboundPeers int // maximum number of inbound peers - clock mclock.Clock -} - -func (cfg dropperConfig) withDefaults() dropperConfig { - if cfg.clock == nil { - cfg.clock = mclock.System{} - } - return cfg -} - -func newDropper(config *dropperConfig) *dropper { - cfg := config.withDefaults() +func newDropper(maxDialPeers, maxInboundPeers int) *dropper { cm := &dropper{ - cfg: cfg, - peerDropDialedTimer: mclock.NewAlarm(cfg.clock), - peerDropInboundTimer: mclock.NewAlarm(cfg.clock), - peerEventCh: make(chan *p2p.PeerEvent), - shutdownCh: make(chan struct{}), + maxDialPeers: maxDialPeers, + maxInboundPeers: maxInboundPeers, + peerDropTimer: time.NewTimer(randomDuration(peerDropIntervalMin, peerDropIntervalMax)), + peerEventCh: make(chan *p2p.PeerEvent), + shutdownCh: make(chan struct{}), } if peerDropIntervalMin > peerDropIntervalMax { panic("peerDropIntervalMin duration must be less than or equal to peerDropIntervalMax duration") } - log.Info("New Dropper", "maxDialPeers", cm.cfg.maxDialPeers, "threshold", peerDropThreshold, + log.Info("New Dropper", "maxDialPeers", cm.maxDialPeers, "threshold", peerDropThreshold, "intervalMin", peerDropIntervalMin, "intervalMax", peerDropIntervalMax) return cm } // Start the dropper. func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) { - cm.wg.Add(1) cm.peersFunc = srv.Peers cm.syncingFunc = syncingFunc - cm.sub = srv.SubscribeEvents(cm.peerEventCh) + cm.wg.Add(1) go cm.loop() } // Stop the dropper. func (cm *dropper) Stop() { - cm.sub.Unsubscribe() - cm.peerDropInboundTimer.Stop() - cm.peerDropDialedTimer.Stop() + cm.peerDropTimer.Stop() close(cm.shutdownCh) cm.wg.Wait() } -// numPeers returns the current number of peers and its breakdown (dialed or inbound). -func (cm *dropper) numPeers() (numPeers int, numDialed int, numInbound int) { - peers := cm.peersFunc() - dialed := slices.DeleteFunc(peers, (*p2p.Peer).Inbound) - return len(peers), len(dialed), len(peers) - len(dialed) -} - // dropRandomPeer selects one of the peers randomly and drops it from the peer pool. -func (cm *dropper) dropRandomPeer(dialed bool) bool { +func (cm *dropper) dropRandomPeer() bool { peers := cm.peersFunc() - - selectDoNotDrop := func(p *p2p.Peer) bool { - if dialed { - // Only drop from dyndialed peers. Avoid dropping trusted peers. - // Give some time to peers before considering them for a drop. - return !p.DynDialed() || - p.Trusted() || - p.Lifetime() < mclock.AbsTime(doNotDropBefore) - } else { - // Only drop from inbound peers. Avoid dropping trusted peers. - // Give some time to peers before considering them for a drop. - return p.DynDialed() || p.StaticDialed() || - p.Trusted() || - p.Lifetime() < mclock.AbsTime(doNotDropBefore) + var numInbound int + for _, p := range peers { + if p.Inbound() { + numInbound++ } } + numDialed := len(peers) - numInbound + + selectDoNotDrop := func(p *p2p.Peer) bool { + // Avoid dropping trusted and static peers, or recent peers. + // Only drop peers if their respective category (dialed/inbound) + // is close to limit capacity. + return p.Trusted() || p.StaticDialed() || + p.Lifetime() < mclock.AbsTime(doNotDropBefore) || + (p.DynDialed() && cm.maxDialPeers-numDialed > peerDropThreshold) || + (p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold) + } + droppable := slices.DeleteFunc(peers, selectDoNotDrop) if len(droppable) > 0 { p := droppable[mrand.Intn(len(droppable))] - log.Debug("dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), - "dialed", dialed, "peercountbefore", len(peers)) + log.Debug("Dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "inbound", p.Inbound(), "peercountbefore", len(peers)) p.Disconnect(p2p.DiscTooManyPeers) return true } @@ -166,35 +141,6 @@ func randomDuration(min, max time.Duration) time.Duration { return time.Duration(mrand.Int63n(int64(max-min)) + int64(min)) } -// updatePeerDropTimers checks and starts/stops the timer for peer drop. -func (cm *dropper) updatePeerDropTimers(syncing bool) { - numPeers, numDialed, numInbound := cm.numPeers() - log.Trace("Dropper status", "syncing", syncing, - "peers", numPeers, "out", numDialed, "in", numInbound, - "maxout", cm.cfg.maxDialPeers, "maxin", cm.cfg.maxInboundPeers) - - if !syncing { - // If a drop was already scheduled, Schedule does nothing. - if cm.cfg.maxDialPeers-numDialed <= peerDropThreshold { - interval := randomDuration(peerDropIntervalMin, peerDropIntervalMax) - cm.peerDropDialedTimer.Schedule(cm.cfg.clock.Now().Add(interval)) - } else { - cm.peerDropDialedTimer.Stop() - } - - if cm.cfg.maxInboundPeers-numInbound <= peerDropThreshold { - interval := randomDuration(peerDropIntervalMin, peerDropIntervalMax) - cm.peerDropInboundTimer.Schedule(cm.cfg.clock.Now().Add(interval)) - } else { - cm.peerDropInboundTimer.Stop() - } - } else { - // Downloader is managing connections while syncing. - cm.peerDropDialedTimer.Stop() - cm.peerDropInboundTimer.Stop() - } -} - // loop is the main loop of the connection dropper. func (cm *dropper) loop() { defer cm.wg.Done() @@ -205,32 +151,24 @@ func (cm *dropper) loop() { // - subscribe to Downloader.mux // - subscribe to DownloaderAPI (which itself polls the sync status) syncing := cm.syncingFunc() - log.Trace("Sync status", "syncing", syncing) - syncCheckTimer := mclock.NewAlarm(cm.cfg.clock) - syncCheckTimer.Schedule(cm.cfg.clock.Now().Add(syncCheckInterval)) + syncCheckTimer := time.NewTimer(syncCheckInterval) defer syncCheckTimer.Stop() for { select { - case <-syncCheckTimer.C(): + case <-syncCheckTimer.C: // Update info about syncing status, and rearm the timers. syncingNew := cm.syncingFunc() if syncing != syncingNew { // Syncing status changed, we might need to update the timers. - log.Trace("Sync status changed", "syncing", syncingNew) syncing = syncingNew - cm.updatePeerDropTimers(syncing) } - syncCheckTimer.Schedule(cm.cfg.clock.Now().Add(syncCheckInterval)) - case ev := <-cm.peerEventCh: - if ev.Type == p2p.PeerEventTypeAdd || ev.Type == p2p.PeerEventTypeDrop { - // Number of peers changed, we might need to start the timers. - cm.updatePeerDropTimers(syncing) + syncCheckTimer.Reset(syncCheckInterval) + case <-cm.peerDropTimer.C: + if !syncing { + cm.dropRandomPeer() } - case <-cm.peerDropDialedTimer.C(): - cm.dropRandomPeer(true) - case <-cm.peerDropInboundTimer.C(): - cm.dropRandomPeer(false) + cm.peerDropTimer.Reset(randomDuration(peerDropIntervalMin, peerDropIntervalMax)) case <-cm.shutdownCh: return }