eth/dropper: simplify code

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2025-04-10 20:13:28 +02:00
parent 5da26a98d0
commit 75c8ee1439
No known key found for this signature in database
GPG key ID: 0FE274EE8C95166E
2 changed files with 41 additions and 106 deletions

View file

@ -290,10 +290,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
return nil, err
}
eth.dropper = newDropper(&dropperConfig{
maxDialPeers: eth.p2pServer.MaxDialedConns(),
maxInboundPeers: eth.p2pServer.MaxInboundConns(),
})
eth.dropper = newDropper(eth.p2pServer.MaxDialedConns(), eth.p2pServer.MaxInboundConns())
eth.miner = miner.New(eth, config.Miner, eth.engine)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))

View file

@ -24,7 +24,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
)
@ -50,17 +49,16 @@ const (
// randomly every peerDropInterval to make space for new peers
// - peers are dropped separately from the inboud pool and from the dialed pool
type dropper struct {
cfg dropperConfig
peersFunc getPeersFunc
syncingFunc getSyncingFunc
maxDialPeers int // maximum number of dialed peers
maxInboundPeers int // maximum number of inbound peers
peersFunc getPeersFunc
syncingFunc getSyncingFunc
// The peerDrop timers introduce churn if we are close to limit capacity.
// peerDropTimer introduces churn if we are close to limit capacity.
// We handle Dialed and Inbound connections separately
peerDropDialedTimer *mclock.Alarm
peerDropInboundTimer *mclock.Alarm
peerDropTimer *time.Timer
peerEventCh chan *p2p.PeerEvent // channel for peer event changes
sub event.Subscription // subscription to peerEventCh
wg sync.WaitGroup // wg for graceful shutdown
shutdownCh chan struct{}
@ -73,85 +71,62 @@ type getPeersFunc func() []*p2p.Peer
// Returns true while syncing, false when synced.
type getSyncingFunc func() bool
type dropperConfig struct {
maxDialPeers int // maximum number of dialed peers
maxInboundPeers int // maximum number of inbound peers
clock mclock.Clock
}
func (cfg dropperConfig) withDefaults() dropperConfig {
if cfg.clock == nil {
cfg.clock = mclock.System{}
}
return cfg
}
func newDropper(config *dropperConfig) *dropper {
cfg := config.withDefaults()
func newDropper(maxDialPeers, maxInboundPeers int) *dropper {
cm := &dropper{
cfg: cfg,
peerDropDialedTimer: mclock.NewAlarm(cfg.clock),
peerDropInboundTimer: mclock.NewAlarm(cfg.clock),
peerEventCh: make(chan *p2p.PeerEvent),
shutdownCh: make(chan struct{}),
maxDialPeers: maxDialPeers,
maxInboundPeers: maxInboundPeers,
peerDropTimer: time.NewTimer(randomDuration(peerDropIntervalMin, peerDropIntervalMax)),
peerEventCh: make(chan *p2p.PeerEvent),
shutdownCh: make(chan struct{}),
}
if peerDropIntervalMin > peerDropIntervalMax {
panic("peerDropIntervalMin duration must be less than or equal to peerDropIntervalMax duration")
}
log.Info("New Dropper", "maxDialPeers", cm.cfg.maxDialPeers, "threshold", peerDropThreshold,
log.Info("New Dropper", "maxDialPeers", cm.maxDialPeers, "threshold", peerDropThreshold,
"intervalMin", peerDropIntervalMin, "intervalMax", peerDropIntervalMax)
return cm
}
// Start the dropper.
func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) {
cm.wg.Add(1)
cm.peersFunc = srv.Peers
cm.syncingFunc = syncingFunc
cm.sub = srv.SubscribeEvents(cm.peerEventCh)
cm.wg.Add(1)
go cm.loop()
}
// Stop the dropper.
func (cm *dropper) Stop() {
cm.sub.Unsubscribe()
cm.peerDropInboundTimer.Stop()
cm.peerDropDialedTimer.Stop()
cm.peerDropTimer.Stop()
close(cm.shutdownCh)
cm.wg.Wait()
}
// numPeers returns the current number of peers and its breakdown (dialed or inbound).
func (cm *dropper) numPeers() (numPeers int, numDialed int, numInbound int) {
peers := cm.peersFunc()
dialed := slices.DeleteFunc(peers, (*p2p.Peer).Inbound)
return len(peers), len(dialed), len(peers) - len(dialed)
}
// dropRandomPeer selects one of the peers randomly and drops it from the peer pool.
func (cm *dropper) dropRandomPeer(dialed bool) bool {
func (cm *dropper) dropRandomPeer() bool {
peers := cm.peersFunc()
selectDoNotDrop := func(p *p2p.Peer) bool {
if dialed {
// Only drop from dyndialed peers. Avoid dropping trusted peers.
// Give some time to peers before considering them for a drop.
return !p.DynDialed() ||
p.Trusted() ||
p.Lifetime() < mclock.AbsTime(doNotDropBefore)
} else {
// Only drop from inbound peers. Avoid dropping trusted peers.
// Give some time to peers before considering them for a drop.
return p.DynDialed() || p.StaticDialed() ||
p.Trusted() ||
p.Lifetime() < mclock.AbsTime(doNotDropBefore)
var numInbound int
for _, p := range peers {
if p.Inbound() {
numInbound++
}
}
numDialed := len(peers) - numInbound
selectDoNotDrop := func(p *p2p.Peer) bool {
// Avoid dropping trusted and static peers, or recent peers.
// Only drop peers if their respective category (dialed/inbound)
// is close to limit capacity.
return p.Trusted() || p.StaticDialed() ||
p.Lifetime() < mclock.AbsTime(doNotDropBefore) ||
(p.DynDialed() && cm.maxDialPeers-numDialed > peerDropThreshold) ||
(p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold)
}
droppable := slices.DeleteFunc(peers, selectDoNotDrop)
if len(droppable) > 0 {
p := droppable[mrand.Intn(len(droppable))]
log.Debug("dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()),
"dialed", dialed, "peercountbefore", len(peers))
log.Debug("Dropping random peer", "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "inbound", p.Inbound(), "peercountbefore", len(peers))
p.Disconnect(p2p.DiscTooManyPeers)
return true
}
@ -166,35 +141,6 @@ func randomDuration(min, max time.Duration) time.Duration {
return time.Duration(mrand.Int63n(int64(max-min)) + int64(min))
}
// updatePeerDropTimers checks and starts/stops the timer for peer drop.
func (cm *dropper) updatePeerDropTimers(syncing bool) {
numPeers, numDialed, numInbound := cm.numPeers()
log.Trace("Dropper status", "syncing", syncing,
"peers", numPeers, "out", numDialed, "in", numInbound,
"maxout", cm.cfg.maxDialPeers, "maxin", cm.cfg.maxInboundPeers)
if !syncing {
// If a drop was already scheduled, Schedule does nothing.
if cm.cfg.maxDialPeers-numDialed <= peerDropThreshold {
interval := randomDuration(peerDropIntervalMin, peerDropIntervalMax)
cm.peerDropDialedTimer.Schedule(cm.cfg.clock.Now().Add(interval))
} else {
cm.peerDropDialedTimer.Stop()
}
if cm.cfg.maxInboundPeers-numInbound <= peerDropThreshold {
interval := randomDuration(peerDropIntervalMin, peerDropIntervalMax)
cm.peerDropInboundTimer.Schedule(cm.cfg.clock.Now().Add(interval))
} else {
cm.peerDropInboundTimer.Stop()
}
} else {
// Downloader is managing connections while syncing.
cm.peerDropDialedTimer.Stop()
cm.peerDropInboundTimer.Stop()
}
}
// loop is the main loop of the connection dropper.
func (cm *dropper) loop() {
defer cm.wg.Done()
@ -205,32 +151,24 @@ func (cm *dropper) loop() {
// - subscribe to Downloader.mux
// - subscribe to DownloaderAPI (which itself polls the sync status)
syncing := cm.syncingFunc()
log.Trace("Sync status", "syncing", syncing)
syncCheckTimer := mclock.NewAlarm(cm.cfg.clock)
syncCheckTimer.Schedule(cm.cfg.clock.Now().Add(syncCheckInterval))
syncCheckTimer := time.NewTimer(syncCheckInterval)
defer syncCheckTimer.Stop()
for {
select {
case <-syncCheckTimer.C():
case <-syncCheckTimer.C:
// Update info about syncing status, and rearm the timers.
syncingNew := cm.syncingFunc()
if syncing != syncingNew {
// Syncing status changed, we might need to update the timers.
log.Trace("Sync status changed", "syncing", syncingNew)
syncing = syncingNew
cm.updatePeerDropTimers(syncing)
}
syncCheckTimer.Schedule(cm.cfg.clock.Now().Add(syncCheckInterval))
case ev := <-cm.peerEventCh:
if ev.Type == p2p.PeerEventTypeAdd || ev.Type == p2p.PeerEventTypeDrop {
// Number of peers changed, we might need to start the timers.
cm.updatePeerDropTimers(syncing)
syncCheckTimer.Reset(syncCheckInterval)
case <-cm.peerDropTimer.C:
if !syncing {
cm.dropRandomPeer()
}
case <-cm.peerDropDialedTimer.C():
cm.dropRandomPeer(true)
case <-cm.peerDropInboundTimer.C():
cm.dropRandomPeer(false)
cm.peerDropTimer.Reset(randomDuration(peerDropIntervalMin, peerDropIntervalMax))
case <-cm.shutdownCh:
return
}