eth/connmanager: move Connection Manager to package eth

Better positioned in package eth to access relevant data about
connection quality.
This commit is contained in:
Csaba Kiraly 2025-03-25 18:24:18 +01:00
parent ea8d05a1c9
commit e0b0189d05
4 changed files with 62 additions and 56 deletions

View file

@ -76,6 +76,7 @@ type Ethereum struct {
handler *handler handler *handler
discmix *enode.FairMix discmix *enode.FairMix
connman *connManager
// DB interfaces // DB interfaces
chainDb ethdb.Database // Block chain database chainDb ethdb.Database // Block chain database
@ -289,6 +290,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err return nil, err
} }
eth.connman = newConnManager(&connmanConfig{
maxDialPeers: eth.p2pServer.MaxDialedConns(),
})
eth.miner = miner.New(eth, config.Miner, eth.engine) eth.miner = miner.New(eth, config.Miner, eth.engine)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
eth.miner.SetPrioAddresses(config.TxPool.Locals) eth.miner.SetPrioAddresses(config.TxPool.Locals)
@ -399,6 +404,9 @@ func (s *Ethereum) Start() error {
// Start the networking layer // Start the networking layer
s.handler.Start(s.p2pServer.MaxPeers) s.handler.Start(s.p2pServer.MaxPeers)
// Start the connection manager
s.connman.Start(s.p2pServer)
// start log indexer // start log indexer
s.filterMaps.Start() s.filterMaps.Start()
go s.updateFilterMapsHeads() go s.updateFilterMapsHeads()
@ -500,6 +508,7 @@ func (s *Ethereum) setupDiscovery() error {
func (s *Ethereum) Stop() error { func (s *Ethereum) Stop() error {
// Stop all the peer-related stuff first. // Stop all the peer-related stuff first.
s.discmix.Close() s.discmix.Close()
s.connman.Stop()
s.handler.Stop() s.handler.Stop()
// Then stop everything else. // Then stop everything else.

View file

@ -14,17 +14,20 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package p2p package eth
import ( import (
crand "crypto/rand" crand "crypto/rand"
"encoding/binary" "encoding/binary"
mrand "math/rand" mrand "math/rand"
"sync"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
) )
const ( const (
@ -43,16 +46,20 @@ const (
// peerDropInterval to make space for new peers // peerDropInterval to make space for new peers
type connManager struct { type connManager struct {
connmanConfig connmanConfig
srv *p2p.Server
peersFunc getPeersFunc peersFunc getPeersFunc
// the peerDrop timer introduces churn if we are close to limit capacity // the peerDrop timer introduces churn if we are close to limit capacity
peerDropTimer *mclock.Alarm peerDropTimer *mclock.Alarm
addPeerCh chan *conn peerEventCh chan *p2p.PeerEvent
remPeerCh chan *conn sub event.Subscription
wg sync.WaitGroup
shutdownCh chan struct{}
} }
// callback type to get the list of connected peers. // callback type to get the list of connected peers.
type getPeersFunc func() []*Peer type getPeersFunc func() []*p2p.Peer
type connmanConfig struct { type connmanConfig struct {
maxDialPeers int // maximum number of dialed peers maxDialPeers int // maximum number of dialed peers
@ -77,33 +84,32 @@ func (cfg connmanConfig) withDefaults() connmanConfig {
return cfg return cfg
} }
func newConnManager(config connmanConfig, peersFunc getPeersFunc) *connManager { func newConnManager(config *connmanConfig) *connManager {
cfg := config.withDefaults() cfg := config.withDefaults()
cm := &connManager{ cm := &connManager{
connmanConfig: cfg, connmanConfig: cfg,
peerDropTimer: mclock.NewAlarm(cfg.clock), peerDropTimer: mclock.NewAlarm(cfg.clock),
peersFunc: peersFunc, peerEventCh: make(chan *p2p.PeerEvent),
addPeerCh: make(chan *conn), shutdownCh: make(chan struct{}),
remPeerCh: make(chan *conn),
} }
cm.log.Info("New Connection Manager", "maxDialPeers", cm.maxDialPeers, "threshold", peerDropThreshold, "interval", peerDropInterval) cm.log.Info("New Connection Manager", "maxDialPeers", cm.maxDialPeers, "threshold", peerDropThreshold, "interval", peerDropInterval)
go cm.loop()
return cm return cm
} }
func (cm *connManager) Start(srv *p2p.Server) {
cm.wg.Add(1)
cm.srv = srv
cm.peersFunc = srv.Peers
cm.sub = srv.SubscribeEvents(cm.peerEventCh)
go cm.loop()
}
// stop the connection manager. // stop the connection manager.
func (cm *connManager) stop() { func (cm *connManager) Stop() {
cm.sub.Unsubscribe()
cm.peerDropTimer.Stop() cm.peerDropTimer.Stop()
} close(cm.shutdownCh)
cm.wg.Wait()
// peerAdded notifies about peerset change.
func (cm *connManager) peerAdded(c *conn) {
cm.addPeerCh <- c
}
// peerRemoved notifies about peerset change.
func (cm *connManager) peerRemoved(c *conn) {
cm.remPeerCh <- c
} }
// filter is a helper function to filter the peerset. // filter is a helper function to filter the peerset.
@ -118,13 +124,13 @@ func filter[T any](s []T, test func(T) bool) (filtered []T) {
// numDialPeers returns the current number of peers dialed (not inbound). // numDialPeers returns the current number of peers dialed (not inbound).
func (cm *connManager) numDialPeers() int { func (cm *connManager) numDialPeers() int {
selectDialed := func(p *Peer) bool { return !p.rw.is(inboundConn) } selectDialed := func(p *p2p.Peer) bool { return !p.Inbound() }
dialed := filter(cm.peersFunc(), selectDialed) dialed := filter(cm.peersFunc(), selectDialed)
return len(dialed) return len(dialed)
} }
func (cm *connManager) numPeers() (int, int, int) { func (cm *connManager) numPeers() (int, int, int) {
selectDialed := func(p *Peer) bool { return !p.rw.is(inboundConn) } selectDialed := func(p *p2p.Peer) bool { return !p.Inbound() }
peers := cm.peersFunc() peers := cm.peersFunc()
dialed := filter(peers, selectDialed) dialed := filter(peers, selectDialed)
return len(peers), len(dialed), len(peers) - len(dialed) return len(peers), len(dialed), len(peers) - len(dialed)
@ -136,15 +142,15 @@ func (cm *connManager) dropRandomPeer() bool {
// Only drop from dyndialed peers. Avoid dropping trusted peers. // Only drop from dyndialed peers. Avoid dropping trusted peers.
// Give some time to peers before considering them for a drop. // Give some time to peers before considering them for a drop.
selectDroppable := func(p *Peer) bool { selectDroppable := func(p *p2p.Peer) bool {
return p.rw.is(dynDialedConn) && !p.rw.is(trustedConn) && return p.DynDialed() && !p.Trusted() &&
mclock.Now()-p.created >= mclock.AbsTime(doNotDropBefore) p.Lifetime() >= mclock.AbsTime(doNotDropBefore)
} }
droppable := filter(peers, selectDroppable) droppable := filter(peers, selectDroppable)
if len(droppable) > 0 { if len(droppable) > 0 {
p := droppable[cm.rand.Intn(len(droppable))] p := droppable[cm.rand.Intn(len(droppable))]
cm.log.Trace("dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(mclock.Now()-p.created), "peercountbefore", len(peers)) cm.log.Info("dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers))
p.Disconnect(DiscDropped) p.Disconnect(p2p.DiscTooManyPeers)
return true return true
} }
return false return false
@ -152,31 +158,34 @@ func (cm *connManager) dropRandomPeer() bool {
// loop is the main loop of the connection manager. // loop is the main loop of the connection manager.
func (cm *connManager) loop() { func (cm *connManager) loop() {
defer cm.wg.Done()
for { for {
select { select {
case ev := <-cm.peerEventCh:
switch ev.Type {
case p2p.PeerEventTypeAdd:
// check and start timer for peer drop
// If a drop was already scheduled, Schedule does nothing.
numpeers, out, in := cm.numPeers()
cm.log.Info("addPeerCh", "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers)
if cm.maxDialPeers-cm.numDialPeers() <= peerDropThreshold {
cm.peerDropTimer.Schedule(cm.clock.Now().Add(peerDropInterval))
}
case <-cm.addPeerCh: case p2p.PeerEventTypeDrop:
// check and start timer for peer drop // check and stop timer for peer drop
// If a drop was already scheduled, Schedule does nothing. numpeers, out, in := cm.numPeers()
numpeers, out, in := cm.numPeers() cm.log.Info("remPeerCh", "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers)
cm.log.Trace("addPeerCh", "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers) if cm.maxDialPeers-cm.numDialPeers() > peerDropThreshold {
if cm.maxDialPeers-cm.numDialPeers() <= peerDropThreshold { cm.peerDropTimer.Stop()
cm.peerDropTimer.Schedule(cm.clock.Now().Add(peerDropInterval)) }
}
case <-cm.remPeerCh:
// check and stop timer for peer drop
numpeers, out, in := cm.numPeers()
cm.log.Trace("remPeerCh", "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers)
if cm.maxDialPeers-cm.numDialPeers() > peerDropThreshold {
cm.peerDropTimer.Stop()
} }
case <-cm.peerDropTimer.C(): case <-cm.peerDropTimer.C():
cm.dropRandomPeer() cm.dropRandomPeer()
case <-cm.shutdownCh:
return
} }
} }
cm.log.Warn("Exiting connmanager loop")
} }

View file

@ -69,7 +69,6 @@ const (
DiscUnexpectedIdentity DiscUnexpectedIdentity
DiscSelf DiscSelf
DiscReadTimeout DiscReadTimeout
DiscDropped
DiscSubprotocolError = DiscReason(0x10) DiscSubprotocolError = DiscReason(0x10)
DiscInvalid = 0xff DiscInvalid = 0xff
@ -88,7 +87,6 @@ var discReasonToString = [...]string{
DiscUnexpectedIdentity: "unexpected identity", DiscUnexpectedIdentity: "unexpected identity",
DiscSelf: "connected to self", DiscSelf: "connected to self",
DiscReadTimeout: "read timeout", DiscReadTimeout: "read timeout",
DiscDropped: "dropped to make space for others",
DiscSubprotocolError: "subprotocol error", DiscSubprotocolError: "subprotocol error",
DiscInvalid: "invalid disconnect reason", DiscInvalid: "invalid disconnect reason",
} }

View file

@ -97,7 +97,6 @@ type Server struct {
discv5 *discover.UDPv5 discv5 *discover.UDPv5
discmix *enode.FairMix discmix *enode.FairMix
dialsched *dialScheduler dialsched *dialScheduler
connman *connManager
// This is read by the NAT port mapping loop. // This is read by the NAT port mapping loop.
portMappingRegister chan *portMapping portMappingRegister chan *portMapping
@ -412,8 +411,6 @@ func (srv *Server) Start() (err error) {
} }
srv.setupDialScheduler() srv.setupDialScheduler()
srv.setupConnManager()
srv.loopWG.Add(1) srv.loopWG.Add(1)
go srv.run() go srv.run()
return nil return nil
@ -530,11 +527,6 @@ func (srv *Server) setupDialScheduler() {
} }
} }
func (srv *Server) setupConnManager() {
config := connmanConfig{maxDialPeers: srv.maxDialedConns()}
srv.connman = newConnManager(config, srv.Peers)
}
func (srv *Server) MaxInboundConns() int { func (srv *Server) MaxInboundConns() int {
return srv.MaxPeers - srv.MaxDialedConns() return srv.MaxPeers - srv.MaxDialedConns()
} }
@ -689,7 +681,6 @@ running:
peers[c.node.ID()] = p peers[c.node.ID()] = p
srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name()) srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
srv.dialsched.peerAdded(c) srv.dialsched.peerAdded(c)
srv.connman.peerAdded(c)
if p.Inbound() { if p.Inbound() {
inboundCount++ inboundCount++
serveSuccessMeter.Mark(1) serveSuccessMeter.Mark(1)
@ -708,7 +699,6 @@ running:
delete(peers, pd.ID()) delete(peers, pd.ID())
srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err) srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err)
srv.dialsched.peerRemoved(pd.rw) srv.dialsched.peerRemoved(pd.rw)
srv.connman.peerRemoved(pd.rw)
if pd.Inbound() { if pd.Inbound() {
inboundCount-- inboundCount--
activeInboundPeerGauge.Dec(1) activeInboundPeerGauge.Dec(1)