mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-13 11:36:37 +00:00
eth: add logic to drop peers randomly when saturated (#31476)
As of now, Geth disconnects peers only on protocol error or timeout, meaning once connection slots are filled, the peerset is largely fixed. As mentioned in https://github.com/ethereum/go-ethereum/issues/31321, Geth should occasionally disconnect peers to ensure some churn. What/when to disconnect could depend on: - the state of geth (e.g. sync or not) - current number of peers - peer level metrics This PR adds a very slow churn using a random drop. --------- Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com> Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
parent
ecd5c18610
commit
c5c75977ab
4 changed files with 204 additions and 6 deletions
|
|
@ -76,6 +76,7 @@ type Ethereum struct {
|
||||||
|
|
||||||
handler *handler
|
handler *handler
|
||||||
discmix *enode.FairMix
|
discmix *enode.FairMix
|
||||||
|
dropper *dropper
|
||||||
|
|
||||||
// DB interfaces
|
// DB interfaces
|
||||||
chainDb ethdb.Database // Block chain database
|
chainDb ethdb.Database // Block chain database
|
||||||
|
|
@ -300,6 +301,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eth.dropper = newDropper(eth.p2pServer.MaxDialedConns(), eth.p2pServer.MaxInboundConns())
|
||||||
|
|
||||||
eth.miner = miner.New(eth, config.Miner, eth.engine)
|
eth.miner = miner.New(eth, config.Miner, eth.engine)
|
||||||
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
|
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
|
||||||
eth.miner.SetPrioAddresses(config.TxPool.Locals)
|
eth.miner.SetPrioAddresses(config.TxPool.Locals)
|
||||||
|
|
@ -410,6 +413,9 @@ func (s *Ethereum) Start() error {
|
||||||
// Start the networking layer
|
// Start the networking layer
|
||||||
s.handler.Start(s.p2pServer.MaxPeers)
|
s.handler.Start(s.p2pServer.MaxPeers)
|
||||||
|
|
||||||
|
// Start the connection manager
|
||||||
|
s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() })
|
||||||
|
|
||||||
// start log indexer
|
// start log indexer
|
||||||
s.filterMaps.Start()
|
s.filterMaps.Start()
|
||||||
go s.updateFilterMapsHeads()
|
go s.updateFilterMapsHeads()
|
||||||
|
|
@ -511,6 +517,7 @@ func (s *Ethereum) setupDiscovery() error {
|
||||||
func (s *Ethereum) Stop() error {
|
func (s *Ethereum) Stop() error {
|
||||||
// Stop all the peer-related stuff first.
|
// Stop all the peer-related stuff first.
|
||||||
s.discmix.Close()
|
s.discmix.Close()
|
||||||
|
s.dropper.Stop()
|
||||||
s.handler.Stop()
|
s.handler.Stop()
|
||||||
|
|
||||||
// Then stop everything else.
|
// Then stop everything else.
|
||||||
|
|
|
||||||
167
eth/dropper.go
Normal file
167
eth/dropper.go
Normal file
|
|
@ -0,0 +1,167 @@
|
||||||
|
// Copyright 2025 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package eth
|
||||||
|
|
||||||
|
import (
|
||||||
|
mrand "math/rand"
|
||||||
|
"slices"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/common/mclock"
|
||||||
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Interval between peer drop events (uniform between min and max)
|
||||||
|
peerDropIntervalMin = 3 * time.Minute
|
||||||
|
// Interval between peer drop events (uniform between min and max)
|
||||||
|
peerDropIntervalMax = 7 * time.Minute
|
||||||
|
// Avoid dropping peers for some time after connection
|
||||||
|
doNotDropBefore = 10 * time.Minute
|
||||||
|
// How close to max should we initiate the drop timer. O should be fine,
|
||||||
|
// dropping when no more peers can be added. Larger numbers result in more
|
||||||
|
// aggressive drop behavior.
|
||||||
|
peerDropThreshold = 0
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// droppedInbound is the number of inbound peers dropped
|
||||||
|
droppedInbound = metrics.NewRegisteredMeter("eth/dropper/inbound", nil)
|
||||||
|
// droppedOutbound is the number of outbound peers dropped
|
||||||
|
droppedOutbound = metrics.NewRegisteredMeter("eth/dropper/outbound", nil)
|
||||||
|
)
|
||||||
|
|
||||||
|
// dropper monitors the state of the peer pool and makes changes as follows:
|
||||||
|
// - during sync the Downloader handles peer connections, so dropper is disabled
|
||||||
|
// - if not syncing and the peer count is close to the limit, it drops peers
|
||||||
|
// randomly every peerDropInterval to make space for new peers
|
||||||
|
// - peers are dropped separately from the inboud pool and from the dialed pool
|
||||||
|
type dropper struct {
|
||||||
|
maxDialPeers int // maximum number of dialed peers
|
||||||
|
maxInboundPeers int // maximum number of inbound peers
|
||||||
|
peersFunc getPeersFunc
|
||||||
|
syncingFunc getSyncingFunc
|
||||||
|
|
||||||
|
// peerDropTimer introduces churn if we are close to limit capacity.
|
||||||
|
// We handle Dialed and Inbound connections separately
|
||||||
|
peerDropTimer *time.Timer
|
||||||
|
|
||||||
|
wg sync.WaitGroup // wg for graceful shutdown
|
||||||
|
shutdownCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Callback type to get the list of connected peers.
|
||||||
|
type getPeersFunc func() []*p2p.Peer
|
||||||
|
|
||||||
|
// Callback type to get syncing status.
|
||||||
|
// Returns true while syncing, false when synced.
|
||||||
|
type getSyncingFunc func() bool
|
||||||
|
|
||||||
|
func newDropper(maxDialPeers, maxInboundPeers int) *dropper {
|
||||||
|
cm := &dropper{
|
||||||
|
maxDialPeers: maxDialPeers,
|
||||||
|
maxInboundPeers: maxInboundPeers,
|
||||||
|
peerDropTimer: time.NewTimer(randomDuration(peerDropIntervalMin, peerDropIntervalMax)),
|
||||||
|
shutdownCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
if peerDropIntervalMin > peerDropIntervalMax {
|
||||||
|
panic("peerDropIntervalMin duration must be less than or equal to peerDropIntervalMax duration")
|
||||||
|
}
|
||||||
|
return cm
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the dropper.
|
||||||
|
func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) {
|
||||||
|
cm.peersFunc = srv.Peers
|
||||||
|
cm.syncingFunc = syncingFunc
|
||||||
|
cm.wg.Add(1)
|
||||||
|
go cm.loop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop the dropper.
|
||||||
|
func (cm *dropper) Stop() {
|
||||||
|
cm.peerDropTimer.Stop()
|
||||||
|
close(cm.shutdownCh)
|
||||||
|
cm.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// dropRandomPeer selects one of the peers randomly and drops it from the peer pool.
|
||||||
|
func (cm *dropper) dropRandomPeer() bool {
|
||||||
|
peers := cm.peersFunc()
|
||||||
|
var numInbound int
|
||||||
|
for _, p := range peers {
|
||||||
|
if p.Inbound() {
|
||||||
|
numInbound++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
numDialed := len(peers) - numInbound
|
||||||
|
|
||||||
|
selectDoNotDrop := func(p *p2p.Peer) bool {
|
||||||
|
// Avoid dropping trusted and static peers, or recent peers.
|
||||||
|
// Only drop peers if their respective category (dialed/inbound)
|
||||||
|
// is close to limit capacity.
|
||||||
|
return p.Trusted() || p.StaticDialed() ||
|
||||||
|
p.Lifetime() < mclock.AbsTime(doNotDropBefore) ||
|
||||||
|
(p.DynDialed() && cm.maxDialPeers-numDialed > peerDropThreshold) ||
|
||||||
|
(p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold)
|
||||||
|
}
|
||||||
|
|
||||||
|
droppable := slices.DeleteFunc(peers, selectDoNotDrop)
|
||||||
|
if len(droppable) > 0 {
|
||||||
|
p := droppable[mrand.Intn(len(droppable))]
|
||||||
|
log.Debug("Dropping random peer", "inbound", p.Inbound(),
|
||||||
|
"id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers))
|
||||||
|
p.Disconnect(p2p.DiscUselessPeer)
|
||||||
|
if p.Inbound() {
|
||||||
|
droppedInbound.Mark(1)
|
||||||
|
} else {
|
||||||
|
droppedOutbound.Mark(1)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// randomDuration generates a random duration between min and max.
|
||||||
|
func randomDuration(min, max time.Duration) time.Duration {
|
||||||
|
if min > max {
|
||||||
|
panic("min duration must be less than or equal to max duration")
|
||||||
|
}
|
||||||
|
return time.Duration(mrand.Int63n(int64(max-min)) + int64(min))
|
||||||
|
}
|
||||||
|
|
||||||
|
// loop is the main loop of the connection dropper.
|
||||||
|
func (cm *dropper) loop() {
|
||||||
|
defer cm.wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-cm.peerDropTimer.C:
|
||||||
|
// Drop a random peer if we are not syncing and the peer count is close to the limit.
|
||||||
|
if !cm.syncingFunc() {
|
||||||
|
cm.dropRandomPeer()
|
||||||
|
}
|
||||||
|
cm.peerDropTimer.Reset(randomDuration(peerDropIntervalMin, peerDropIntervalMax))
|
||||||
|
case <-cm.shutdownCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
26
p2p/peer.go
26
p2p/peer.go
|
|
@ -220,11 +220,35 @@ func (p *Peer) String() string {
|
||||||
return fmt.Sprintf("Peer %x %v", id[:8], p.RemoteAddr())
|
return fmt.Sprintf("Peer %x %v", id[:8], p.RemoteAddr())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inbound returns true if the peer is an inbound connection
|
// Inbound returns true if the peer is an inbound (not dialed) connection.
|
||||||
func (p *Peer) Inbound() bool {
|
func (p *Peer) Inbound() bool {
|
||||||
return p.rw.is(inboundConn)
|
return p.rw.is(inboundConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Trusted returns true if the peer is configured as trusted.
|
||||||
|
// Trusted peers are accepted in above the MaxInboundConns limit.
|
||||||
|
// The peer can be either inbound or dialed.
|
||||||
|
func (p *Peer) Trusted() bool {
|
||||||
|
return p.rw.is(trustedConn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DynDialed returns true if the peer was dialed successfully (passed handshake) and
|
||||||
|
// it is not configured as static.
|
||||||
|
func (p *Peer) DynDialed() bool {
|
||||||
|
return p.rw.is(dynDialedConn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StaticDialed returns true if the peer was dialed successfully (passed handshake) and
|
||||||
|
// it is configured as static.
|
||||||
|
func (p *Peer) StaticDialed() bool {
|
||||||
|
return p.rw.is(staticDialedConn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lifetime returns the time since peer creation.
|
||||||
|
func (p *Peer) Lifetime() mclock.AbsTime {
|
||||||
|
return mclock.Now() - p.created
|
||||||
|
}
|
||||||
|
|
||||||
func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer {
|
func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer {
|
||||||
protomap := matchProtocols(protocols, conn.caps, conn)
|
protomap := matchProtocols(protocols, conn.caps, conn)
|
||||||
p := &Peer{
|
p := &Peer{
|
||||||
|
|
|
||||||
|
|
@ -508,7 +508,7 @@ func (srv *Server) setupDiscovery() error {
|
||||||
func (srv *Server) setupDialScheduler() {
|
func (srv *Server) setupDialScheduler() {
|
||||||
config := dialConfig{
|
config := dialConfig{
|
||||||
self: srv.localnode.ID(),
|
self: srv.localnode.ID(),
|
||||||
maxDialPeers: srv.maxDialedConns(),
|
maxDialPeers: srv.MaxDialedConns(),
|
||||||
maxActiveDials: srv.MaxPendingPeers,
|
maxActiveDials: srv.MaxPendingPeers,
|
||||||
log: srv.Logger,
|
log: srv.Logger,
|
||||||
netRestrict: srv.NetRestrict,
|
netRestrict: srv.NetRestrict,
|
||||||
|
|
@ -527,11 +527,11 @@ func (srv *Server) setupDialScheduler() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srv *Server) maxInboundConns() int {
|
func (srv *Server) MaxInboundConns() int {
|
||||||
return srv.MaxPeers - srv.maxDialedConns()
|
return srv.MaxPeers - srv.MaxDialedConns()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srv *Server) maxDialedConns() (limit int) {
|
func (srv *Server) MaxDialedConns() (limit int) {
|
||||||
if srv.NoDial || srv.MaxPeers == 0 {
|
if srv.NoDial || srv.MaxPeers == 0 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
@ -736,7 +736,7 @@ func (srv *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount in
|
||||||
switch {
|
switch {
|
||||||
case !c.is(trustedConn) && len(peers) >= srv.MaxPeers:
|
case !c.is(trustedConn) && len(peers) >= srv.MaxPeers:
|
||||||
return DiscTooManyPeers
|
return DiscTooManyPeers
|
||||||
case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
|
case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.MaxInboundConns():
|
||||||
return DiscTooManyPeers
|
return DiscTooManyPeers
|
||||||
case peers[c.node.ID()] != nil:
|
case peers[c.node.ID()] != nil:
|
||||||
return DiscAlreadyConnected
|
return DiscAlreadyConnected
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue