diff --git a/eth/backend.go b/eth/backend.go index e9bea59734..98aab2de00 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -460,7 +460,7 @@ func (s *Ethereum) Start() error { s.handler.Start(s.p2pServer.MaxPeers) // Start the connection manager - s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }) + s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, nil) // start log indexer s.filterMaps.Start() diff --git a/eth/dropper.go b/eth/dropper.go index dada5d07c0..31da9301d8 100644 --- a/eth/dropper.go +++ b/eth/dropper.go @@ -19,6 +19,7 @@ package eth import ( mrand "math/rand" "slices" + "sort" "sync" "time" @@ -40,6 +41,10 @@ const ( // dropping when no more peers can be added. Larger numbers result in more // aggressive drop behavior. peerDropThreshold = 0 + // Fraction of inbound/dialed peers to protect based on inclusion stats. + // The top inclusionProtectionFrac of each category (by score) are + // shielded from random dropping. 0.1 = top 10%. + inclusionProtectionFrac = 0.1 ) var ( @@ -47,8 +52,37 @@ var ( droppedInbound = metrics.NewRegisteredMeter("eth/dropper/inbound", nil) // droppedOutbound is the number of outbound peers dropped droppedOutbound = metrics.NewRegisteredMeter("eth/dropper/outbound", nil) + // droppedProtected counts times a drop was skipped because all + // droppable candidates were protected by inclusion stats. + droppedProtected = metrics.NewRegisteredMeter("eth/dropper/protected", nil) ) +// PeerInclusionStats holds the per-peer inclusion data needed by the dropper +// to decide which peers to protect. Any stats provider (e.g. txtracker) can +// implement getPeerInclusionStatsFunc by returning this struct per peer ID. +type PeerInclusionStats struct { + Included int64 // Cumulative on-chain inclusions attributed to this peer + RecentIncluded float64 // EMA of per-block inclusions (0 if not tracked) +} + +// Callback type to get per-peer inclusion statistics. +type getPeerInclusionStatsFunc func() map[string]PeerInclusionStats + +// protectionCategory defines a peer scoring function and the fraction of peers +// to protect per inbound/dialed category. Multiple categories are unioned. +type protectionCategory struct { + name string + score func(PeerInclusionStats) float64 + frac float64 // fraction of max peers to protect (0.0–1.0) +} + +// protectionCategories is the list of protection criteria. Each category +// independently selects its top-N peers per pool; the union is protected. +var protectionCategories = []protectionCategory{ + {"total-included", func(s PeerInclusionStats) float64 { return float64(s.Included) }, inclusionProtectionFrac}, + {"recent-included", func(s PeerInclusionStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac}, +} + // dropper monitors the state of the peer pool and makes changes as follows: // - during sync the Downloader handles peer connections, so dropper is disabled // - if not syncing and the peer count is close to the limit, it drops peers @@ -59,6 +93,7 @@ type dropper struct { maxInboundPeers int // maximum number of inbound peers peersFunc getPeersFunc syncingFunc getSyncingFunc + peerStatsFunc getPeerInclusionStatsFunc // optional: inclusion stats for protection // peerDropTimer introduces churn if we are close to limit capacity. // We handle Dialed and Inbound connections separately @@ -88,10 +123,12 @@ func newDropper(maxDialPeers, maxInboundPeers int) *dropper { return cm } -// Start the dropper. -func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) { +// Start the dropper. peerStatsFunc is optional (nil disables inclusion +// protection). +func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc, peerStatsFunc getPeerInclusionStatsFunc) { cm.peersFunc = srv.Peers cm.syncingFunc = syncingFunc + cm.peerStatsFunc = peerStatsFunc cm.wg.Add(1) go cm.loop() } @@ -125,19 +162,90 @@ func (cm *dropper) dropRandomPeer() bool { } droppable := slices.DeleteFunc(peers, selectDoNotDrop) - if len(droppable) > 0 { - p := droppable[mrand.Intn(len(droppable))] - log.Debug("Dropping random peer", "inbound", p.Inbound(), - "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) - p.Disconnect(p2p.DiscUselessPeer) - if p.Inbound() { - droppedInbound.Mark(1) - } else { - droppedOutbound.Mark(1) - } - return true + if len(droppable) == 0 { + return false } - return false + // Protect peers with the highest inclusion stats. + if cm.peerStatsFunc != nil { + droppable = cm.filterProtectedPeers(droppable) + if len(droppable) == 0 { + droppedProtected.Mark(1) + return false + } + } + p := droppable[mrand.Intn(len(droppable))] + log.Debug("Dropping random peer", "inbound", p.Inbound(), + "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) + p.Disconnect(p2p.DiscUselessPeer) + if p.Inbound() { + droppedInbound.Mark(1) + } else { + droppedOutbound.Mark(1) + } + return true +} + +// filterProtectedPeers removes peers from the droppable list that are +// protected by any of the protection categories. Each category independently +// selects the top-N peers per inbound/dialed pool by score; the union of all +// selections is protected. +func (cm *dropper) filterProtectedPeers(droppable []*p2p.Peer) []*p2p.Peer { + stats := cm.peerStatsFunc() + if len(stats) == 0 { + return droppable + } + type peerWithStats struct { + peer *p2p.Peer + s PeerInclusionStats + } + var inbound, dialed []peerWithStats + for _, p := range droppable { + id := p.ID().String() + entry := peerWithStats{p, stats[id]} + if p.Inbound() { + inbound = append(inbound, entry) + } else { + dialed = append(dialed, entry) + } + } + protectedSet := make(map[*p2p.Peer]struct{}) + + protectTopN := func(entries []peerWithStats, maxPeers int, cat protectionCategory) { + n := int(float64(maxPeers) * cat.frac) + if n == 0 || len(entries) == 0 { + return + } + sort.Slice(entries, func(i, j int) bool { + return cat.score(entries[i].s) > cat.score(entries[j].s) + }) + for i := 0; i < n && i < len(entries); i++ { + if cat.score(entries[i].s) > 0 { + protectedSet[entries[i].peer] = struct{}{} + } + } + } + for _, cat := range protectionCategories { + inCopy := make([]peerWithStats, len(inbound)) + copy(inCopy, inbound) + dialCopy := make([]peerWithStats, len(dialed)) + copy(dialCopy, dialed) + + protectTopN(inCopy, cm.maxInboundPeers, cat) + protectTopN(dialCopy, cm.maxDialPeers, cat) + } + if len(protectedSet) == 0 { + return droppable + } + log.Debug("Protecting high-value peers from drop", + "protected", len(protectedSet), "droppable", len(droppable)) + + result := make([]*p2p.Peer, 0, len(droppable)) + for _, p := range droppable { + if _, ok := protectedSet[p]; !ok { + result = append(result, p) + } + } + return result } // randomDuration generates a random duration between min and max.