diff --git a/eth/backend.go b/eth/backend.go index 909d153a2b..39b1d8e6bc 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -76,6 +76,7 @@ type Ethereum struct { handler *handler discmix *enode.FairMix + connman *connManager // DB interfaces chainDb ethdb.Database // Block chain database @@ -289,6 +290,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } + eth.connman = newConnManager(&connmanConfig{ + maxDialPeers: eth.p2pServer.MaxDialedConns(), + }) + eth.miner = miner.New(eth, config.Miner, eth.engine) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) eth.miner.SetPrioAddresses(config.TxPool.Locals) @@ -399,6 +404,9 @@ func (s *Ethereum) Start() error { // Start the networking layer s.handler.Start(s.p2pServer.MaxPeers) + // Start the connection manager + s.connman.Start(s.p2pServer) + // start log indexer s.filterMaps.Start() go s.updateFilterMapsHeads() @@ -500,6 +508,7 @@ func (s *Ethereum) setupDiscovery() error { func (s *Ethereum) Stop() error { // Stop all the peer-related stuff first. s.discmix.Close() + s.connman.Stop() s.handler.Stop() // Then stop everything else. diff --git a/p2p/connmanager.go b/eth/connmanager.go similarity index 67% rename from p2p/connmanager.go rename to eth/connmanager.go index cc43f6e321..0c32887bf3 100644 --- a/p2p/connmanager.go +++ b/eth/connmanager.go @@ -14,17 +14,20 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package p2p +package eth import ( crand "crypto/rand" "encoding/binary" mrand "math/rand" + "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" ) const ( @@ -43,16 +46,20 @@ const ( // peerDropInterval to make space for new peers type connManager struct { connmanConfig + srv *p2p.Server peersFunc getPeersFunc // the peerDrop timer introduces churn if we are close to limit capacity peerDropTimer *mclock.Alarm - addPeerCh chan *conn - remPeerCh chan *conn + peerEventCh chan *p2p.PeerEvent + sub event.Subscription + + wg sync.WaitGroup + shutdownCh chan struct{} } // callback type to get the list of connected peers. -type getPeersFunc func() []*Peer +type getPeersFunc func() []*p2p.Peer type connmanConfig struct { maxDialPeers int // maximum number of dialed peers @@ -77,33 +84,32 @@ func (cfg connmanConfig) withDefaults() connmanConfig { return cfg } -func newConnManager(config connmanConfig, peersFunc getPeersFunc) *connManager { +func newConnManager(config *connmanConfig) *connManager { cfg := config.withDefaults() cm := &connManager{ connmanConfig: cfg, peerDropTimer: mclock.NewAlarm(cfg.clock), - peersFunc: peersFunc, - addPeerCh: make(chan *conn), - remPeerCh: make(chan *conn), + peerEventCh: make(chan *p2p.PeerEvent), + shutdownCh: make(chan struct{}), } cm.log.Info("New Connection Manager", "maxDialPeers", cm.maxDialPeers, "threshold", peerDropThreshold, "interval", peerDropInterval) - go cm.loop() return cm } +func (cm *connManager) Start(srv *p2p.Server) { + cm.wg.Add(1) + cm.srv = srv + cm.peersFunc = srv.Peers + cm.sub = srv.SubscribeEvents(cm.peerEventCh) + go cm.loop() +} + // stop the connection manager. -func (cm *connManager) stop() { +func (cm *connManager) Stop() { + cm.sub.Unsubscribe() cm.peerDropTimer.Stop() -} - -// peerAdded notifies about peerset change. -func (cm *connManager) peerAdded(c *conn) { - cm.addPeerCh <- c -} - -// peerRemoved notifies about peerset change. -func (cm *connManager) peerRemoved(c *conn) { - cm.remPeerCh <- c + close(cm.shutdownCh) + cm.wg.Wait() } // filter is a helper function to filter the peerset. @@ -118,13 +124,13 @@ func filter[T any](s []T, test func(T) bool) (filtered []T) { // numDialPeers returns the current number of peers dialed (not inbound). func (cm *connManager) numDialPeers() int { - selectDialed := func(p *Peer) bool { return !p.rw.is(inboundConn) } + selectDialed := func(p *p2p.Peer) bool { return !p.Inbound() } dialed := filter(cm.peersFunc(), selectDialed) return len(dialed) } func (cm *connManager) numPeers() (int, int, int) { - selectDialed := func(p *Peer) bool { return !p.rw.is(inboundConn) } + selectDialed := func(p *p2p.Peer) bool { return !p.Inbound() } peers := cm.peersFunc() dialed := filter(peers, selectDialed) return len(peers), len(dialed), len(peers) - len(dialed) @@ -136,15 +142,15 @@ func (cm *connManager) dropRandomPeer() bool { // Only drop from dyndialed peers. Avoid dropping trusted peers. // Give some time to peers before considering them for a drop. - selectDroppable := func(p *Peer) bool { - return p.rw.is(dynDialedConn) && !p.rw.is(trustedConn) && - mclock.Now()-p.created >= mclock.AbsTime(doNotDropBefore) + selectDroppable := func(p *p2p.Peer) bool { + return p.DynDialed() && !p.Trusted() && + p.Lifetime() >= mclock.AbsTime(doNotDropBefore) } droppable := filter(peers, selectDroppable) if len(droppable) > 0 { p := droppable[cm.rand.Intn(len(droppable))] - cm.log.Trace("dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(mclock.Now()-p.created), "peercountbefore", len(peers)) - p.Disconnect(DiscDropped) + cm.log.Info("dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) + p.Disconnect(p2p.DiscTooManyPeers) return true } return false @@ -152,31 +158,34 @@ func (cm *connManager) dropRandomPeer() bool { // loop is the main loop of the connection manager. func (cm *connManager) loop() { + defer cm.wg.Done() for { - select { + case ev := <-cm.peerEventCh: + switch ev.Type { + case p2p.PeerEventTypeAdd: + // check and start timer for peer drop + // If a drop was already scheduled, Schedule does nothing. + numpeers, out, in := cm.numPeers() + cm.log.Info("addPeerCh", "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers) + if cm.maxDialPeers-cm.numDialPeers() <= peerDropThreshold { + cm.peerDropTimer.Schedule(cm.clock.Now().Add(peerDropInterval)) + } - case <-cm.addPeerCh: - // check and start timer for peer drop - // If a drop was already scheduled, Schedule does nothing. - numpeers, out, in := cm.numPeers() - cm.log.Trace("addPeerCh", "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers) - if cm.maxDialPeers-cm.numDialPeers() <= peerDropThreshold { - cm.peerDropTimer.Schedule(cm.clock.Now().Add(peerDropInterval)) - } - - case <-cm.remPeerCh: - // check and stop timer for peer drop - numpeers, out, in := cm.numPeers() - cm.log.Trace("remPeerCh", "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers) - if cm.maxDialPeers-cm.numDialPeers() > peerDropThreshold { - cm.peerDropTimer.Stop() + case p2p.PeerEventTypeDrop: + // check and stop timer for peer drop + numpeers, out, in := cm.numPeers() + cm.log.Info("remPeerCh", "peers", numpeers, "out", out, "in", in, "maxout", cm.maxDialPeers) + if cm.maxDialPeers-cm.numDialPeers() > peerDropThreshold { + cm.peerDropTimer.Stop() + } } case <-cm.peerDropTimer.C(): cm.dropRandomPeer() + case <-cm.shutdownCh: + return } } - cm.log.Warn("Exiting connmanager loop") } diff --git a/p2p/peer_error.go b/p2p/peer_error.go index 768bcdf607..dcdadf7fe3 100644 --- a/p2p/peer_error.go +++ b/p2p/peer_error.go @@ -69,7 +69,6 @@ const ( DiscUnexpectedIdentity DiscSelf DiscReadTimeout - DiscDropped DiscSubprotocolError = DiscReason(0x10) DiscInvalid = 0xff @@ -88,7 +87,6 @@ var discReasonToString = [...]string{ DiscUnexpectedIdentity: "unexpected identity", DiscSelf: "connected to self", DiscReadTimeout: "read timeout", - DiscDropped: "dropped to make space for others", DiscSubprotocolError: "subprotocol error", DiscInvalid: "invalid disconnect reason", } diff --git a/p2p/server.go b/p2p/server.go index 70aea81e99..4e72e29fa0 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -97,7 +97,6 @@ type Server struct { discv5 *discover.UDPv5 discmix *enode.FairMix dialsched *dialScheduler - connman *connManager // This is read by the NAT port mapping loop. portMappingRegister chan *portMapping @@ -412,8 +411,6 @@ func (srv *Server) Start() (err error) { } srv.setupDialScheduler() - srv.setupConnManager() - srv.loopWG.Add(1) go srv.run() return nil @@ -530,11 +527,6 @@ func (srv *Server) setupDialScheduler() { } } -func (srv *Server) setupConnManager() { - config := connmanConfig{maxDialPeers: srv.maxDialedConns()} - srv.connman = newConnManager(config, srv.Peers) -} - func (srv *Server) MaxInboundConns() int { return srv.MaxPeers - srv.MaxDialedConns() } @@ -689,7 +681,6 @@ running: peers[c.node.ID()] = p srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name()) srv.dialsched.peerAdded(c) - srv.connman.peerAdded(c) if p.Inbound() { inboundCount++ serveSuccessMeter.Mark(1) @@ -708,7 +699,6 @@ running: delete(peers, pd.ID()) srv.log.Debug("Removing p2p peer", "peercount", len(peers), "id", pd.ID(), "duration", d, "req", pd.requested, "err", pd.err) srv.dialsched.peerRemoved(pd.rw) - srv.connman.peerRemoved(pd.rw) if pd.Inbound() { inboundCount-- activeInboundPeerGauge.Dec(1)