diff --git a/eth/backend.go b/eth/backend.go index 6716a77562..c5dec77962 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -76,6 +76,7 @@ type Ethereum struct { handler *handler discmix *enode.FairMix + dropper *dropper // DB interfaces chainDb ethdb.Database // Block chain database @@ -300,6 +301,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } + eth.dropper = newDropper(eth.p2pServer.MaxDialedConns(), eth.p2pServer.MaxInboundConns()) + eth.miner = miner.New(eth, config.Miner, eth.engine) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) eth.miner.SetPrioAddresses(config.TxPool.Locals) @@ -410,6 +413,9 @@ func (s *Ethereum) Start() error { // Start the networking layer s.handler.Start(s.p2pServer.MaxPeers) + // Start the connection manager + s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }) + // start log indexer s.filterMaps.Start() go s.updateFilterMapsHeads() @@ -511,6 +517,7 @@ func (s *Ethereum) setupDiscovery() error { func (s *Ethereum) Stop() error { // Stop all the peer-related stuff first. s.discmix.Close() + s.dropper.Stop() s.handler.Stop() // Then stop everything else. diff --git a/eth/dropper.go b/eth/dropper.go new file mode 100644 index 0000000000..51f2a7a95a --- /dev/null +++ b/eth/dropper.go @@ -0,0 +1,167 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package eth + +import ( + mrand "math/rand" + "slices" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p" +) + +const ( + // Interval between peer drop events (uniform between min and max) + peerDropIntervalMin = 3 * time.Minute + // Interval between peer drop events (uniform between min and max) + peerDropIntervalMax = 7 * time.Minute + // Avoid dropping peers for some time after connection + doNotDropBefore = 10 * time.Minute + // How close to max should we initiate the drop timer. O should be fine, + // dropping when no more peers can be added. Larger numbers result in more + // aggressive drop behavior. + peerDropThreshold = 0 +) + +var ( + // droppedInbound is the number of inbound peers dropped + droppedInbound = metrics.NewRegisteredMeter("eth/dropper/inbound", nil) + // droppedOutbound is the number of outbound peers dropped + droppedOutbound = metrics.NewRegisteredMeter("eth/dropper/outbound", nil) +) + +// dropper monitors the state of the peer pool and makes changes as follows: +// - during sync the Downloader handles peer connections, so dropper is disabled +// - if not syncing and the peer count is close to the limit, it drops peers +// randomly every peerDropInterval to make space for new peers +// - peers are dropped separately from the inboud pool and from the dialed pool +type dropper struct { + maxDialPeers int // maximum number of dialed peers + maxInboundPeers int // maximum number of inbound peers + peersFunc getPeersFunc + syncingFunc getSyncingFunc + + // peerDropTimer introduces churn if we are close to limit capacity. + // We handle Dialed and Inbound connections separately + peerDropTimer *time.Timer + + wg sync.WaitGroup // wg for graceful shutdown + shutdownCh chan struct{} +} + +// Callback type to get the list of connected peers. +type getPeersFunc func() []*p2p.Peer + +// Callback type to get syncing status. +// Returns true while syncing, false when synced. +type getSyncingFunc func() bool + +func newDropper(maxDialPeers, maxInboundPeers int) *dropper { + cm := &dropper{ + maxDialPeers: maxDialPeers, + maxInboundPeers: maxInboundPeers, + peerDropTimer: time.NewTimer(randomDuration(peerDropIntervalMin, peerDropIntervalMax)), + shutdownCh: make(chan struct{}), + } + if peerDropIntervalMin > peerDropIntervalMax { + panic("peerDropIntervalMin duration must be less than or equal to peerDropIntervalMax duration") + } + return cm +} + +// Start the dropper. +func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) { + cm.peersFunc = srv.Peers + cm.syncingFunc = syncingFunc + cm.wg.Add(1) + go cm.loop() +} + +// Stop the dropper. +func (cm *dropper) Stop() { + cm.peerDropTimer.Stop() + close(cm.shutdownCh) + cm.wg.Wait() +} + +// dropRandomPeer selects one of the peers randomly and drops it from the peer pool. +func (cm *dropper) dropRandomPeer() bool { + peers := cm.peersFunc() + var numInbound int + for _, p := range peers { + if p.Inbound() { + numInbound++ + } + } + numDialed := len(peers) - numInbound + + selectDoNotDrop := func(p *p2p.Peer) bool { + // Avoid dropping trusted and static peers, or recent peers. + // Only drop peers if their respective category (dialed/inbound) + // is close to limit capacity. + return p.Trusted() || p.StaticDialed() || + p.Lifetime() < mclock.AbsTime(doNotDropBefore) || + (p.DynDialed() && cm.maxDialPeers-numDialed > peerDropThreshold) || + (p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold) + } + + droppable := slices.DeleteFunc(peers, selectDoNotDrop) + if len(droppable) > 0 { + p := droppable[mrand.Intn(len(droppable))] + log.Debug("Dropping random peer", "inbound", p.Inbound(), + "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) + p.Disconnect(p2p.DiscUselessPeer) + if p.Inbound() { + droppedInbound.Mark(1) + } else { + droppedOutbound.Mark(1) + } + return true + } + return false +} + +// randomDuration generates a random duration between min and max. +func randomDuration(min, max time.Duration) time.Duration { + if min > max { + panic("min duration must be less than or equal to max duration") + } + return time.Duration(mrand.Int63n(int64(max-min)) + int64(min)) +} + +// loop is the main loop of the connection dropper. +func (cm *dropper) loop() { + defer cm.wg.Done() + + for { + select { + case <-cm.peerDropTimer.C: + // Drop a random peer if we are not syncing and the peer count is close to the limit. + if !cm.syncingFunc() { + cm.dropRandomPeer() + } + cm.peerDropTimer.Reset(randomDuration(peerDropIntervalMin, peerDropIntervalMax)) + case <-cm.shutdownCh: + return + } + } +} diff --git a/p2p/peer.go b/p2p/peer.go index 9ffb94e5a8..9a0a750ac8 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -220,11 +220,35 @@ func (p *Peer) String() string { return fmt.Sprintf("Peer %x %v", id[:8], p.RemoteAddr()) } -// Inbound returns true if the peer is an inbound connection +// Inbound returns true if the peer is an inbound (not dialed) connection. func (p *Peer) Inbound() bool { return p.rw.is(inboundConn) } +// Trusted returns true if the peer is configured as trusted. +// Trusted peers are accepted in above the MaxInboundConns limit. +// The peer can be either inbound or dialed. +func (p *Peer) Trusted() bool { + return p.rw.is(trustedConn) +} + +// DynDialed returns true if the peer was dialed successfully (passed handshake) and +// it is not configured as static. +func (p *Peer) DynDialed() bool { + return p.rw.is(dynDialedConn) +} + +// StaticDialed returns true if the peer was dialed successfully (passed handshake) and +// it is configured as static. +func (p *Peer) StaticDialed() bool { + return p.rw.is(staticDialedConn) +} + +// Lifetime returns the time since peer creation. +func (p *Peer) Lifetime() mclock.AbsTime { + return mclock.Now() - p.created +} + func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer { protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ diff --git a/p2p/server.go b/p2p/server.go index c1564352e5..4e72e29fa0 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -508,7 +508,7 @@ func (srv *Server) setupDiscovery() error { func (srv *Server) setupDialScheduler() { config := dialConfig{ self: srv.localnode.ID(), - maxDialPeers: srv.maxDialedConns(), + maxDialPeers: srv.MaxDialedConns(), maxActiveDials: srv.MaxPendingPeers, log: srv.Logger, netRestrict: srv.NetRestrict, @@ -527,11 +527,11 @@ func (srv *Server) setupDialScheduler() { } } -func (srv *Server) maxInboundConns() int { - return srv.MaxPeers - srv.maxDialedConns() +func (srv *Server) MaxInboundConns() int { + return srv.MaxPeers - srv.MaxDialedConns() } -func (srv *Server) maxDialedConns() (limit int) { +func (srv *Server) MaxDialedConns() (limit int) { if srv.NoDial || srv.MaxPeers == 0 { return 0 } @@ -736,7 +736,7 @@ func (srv *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount in switch { case !c.is(trustedConn) && len(peers) >= srv.MaxPeers: return DiscTooManyPeers - case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns(): + case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.MaxInboundConns(): return DiscTooManyPeers case peers[c.node.ID()] != nil: return DiscAlreadyConnected