From 5a918be50d8917196158514e8f688459e526dc10 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 10 Apr 2026 08:23:30 +0200 Subject: [PATCH] eth: protect high-value peers from random dropping based on inclusion stats MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The dropper periodically disconnects random peers to create churn. This was blind to peer quality. Add inclusion-based peer protection using two categories: 1. Total inclusions: protects peers with the highest cumulative count of delivered txs that were included on chain 2. Recent inclusions (EMA): protects peers with the best recent inclusion rate, giving newly productive peers faster protection Each category independently protects the top 10% of inbound and top 10% of dialed peers. The union of both sets is protected. Only peers with positive scores qualify. The dropper defines its own PeerInclusionStats struct and callback type (getPeerInclusionStatsFunc) so any stats provider (e.g. a transaction tracker) can plug in without a package dependency. The callback is nil by default (protection disabled until wired). The protectionCategories slice is designed for easy extension — adding a new category requires only appending a struct with a name, scoring function, and protection fraction. --- eth/backend.go | 2 +- eth/dropper.go | 136 ++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 123 insertions(+), 15 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index e9bea59734..98aab2de00 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -460,7 +460,7 @@ func (s *Ethereum) Start() error { s.handler.Start(s.p2pServer.MaxPeers) // Start the connection manager - s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }) + s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, nil) // start log indexer s.filterMaps.Start() diff --git a/eth/dropper.go b/eth/dropper.go index dada5d07c0..31da9301d8 100644 --- a/eth/dropper.go +++ b/eth/dropper.go @@ -19,6 +19,7 @@ package eth import ( mrand "math/rand" "slices" + "sort" "sync" "time" @@ -40,6 +41,10 @@ const ( // dropping when no more peers can be added. Larger numbers result in more // aggressive drop behavior. peerDropThreshold = 0 + // Fraction of inbound/dialed peers to protect based on inclusion stats. + // The top inclusionProtectionFrac of each category (by score) are + // shielded from random dropping. 0.1 = top 10%. + inclusionProtectionFrac = 0.1 ) var ( @@ -47,8 +52,37 @@ var ( droppedInbound = metrics.NewRegisteredMeter("eth/dropper/inbound", nil) // droppedOutbound is the number of outbound peers dropped droppedOutbound = metrics.NewRegisteredMeter("eth/dropper/outbound", nil) + // droppedProtected counts times a drop was skipped because all + // droppable candidates were protected by inclusion stats. + droppedProtected = metrics.NewRegisteredMeter("eth/dropper/protected", nil) ) +// PeerInclusionStats holds the per-peer inclusion data needed by the dropper +// to decide which peers to protect. Any stats provider (e.g. txtracker) can +// implement getPeerInclusionStatsFunc by returning this struct per peer ID. +type PeerInclusionStats struct { + Included int64 // Cumulative on-chain inclusions attributed to this peer + RecentIncluded float64 // EMA of per-block inclusions (0 if not tracked) +} + +// Callback type to get per-peer inclusion statistics. +type getPeerInclusionStatsFunc func() map[string]PeerInclusionStats + +// protectionCategory defines a peer scoring function and the fraction of peers +// to protect per inbound/dialed category. Multiple categories are unioned. +type protectionCategory struct { + name string + score func(PeerInclusionStats) float64 + frac float64 // fraction of max peers to protect (0.0–1.0) +} + +// protectionCategories is the list of protection criteria. Each category +// independently selects its top-N peers per pool; the union is protected. +var protectionCategories = []protectionCategory{ + {"total-included", func(s PeerInclusionStats) float64 { return float64(s.Included) }, inclusionProtectionFrac}, + {"recent-included", func(s PeerInclusionStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac}, +} + // dropper monitors the state of the peer pool and makes changes as follows: // - during sync the Downloader handles peer connections, so dropper is disabled // - if not syncing and the peer count is close to the limit, it drops peers @@ -59,6 +93,7 @@ type dropper struct { maxInboundPeers int // maximum number of inbound peers peersFunc getPeersFunc syncingFunc getSyncingFunc + peerStatsFunc getPeerInclusionStatsFunc // optional: inclusion stats for protection // peerDropTimer introduces churn if we are close to limit capacity. // We handle Dialed and Inbound connections separately @@ -88,10 +123,12 @@ func newDropper(maxDialPeers, maxInboundPeers int) *dropper { return cm } -// Start the dropper. -func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) { +// Start the dropper. peerStatsFunc is optional (nil disables inclusion +// protection). +func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc, peerStatsFunc getPeerInclusionStatsFunc) { cm.peersFunc = srv.Peers cm.syncingFunc = syncingFunc + cm.peerStatsFunc = peerStatsFunc cm.wg.Add(1) go cm.loop() } @@ -125,19 +162,90 @@ func (cm *dropper) dropRandomPeer() bool { } droppable := slices.DeleteFunc(peers, selectDoNotDrop) - if len(droppable) > 0 { - p := droppable[mrand.Intn(len(droppable))] - log.Debug("Dropping random peer", "inbound", p.Inbound(), - "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) - p.Disconnect(p2p.DiscUselessPeer) - if p.Inbound() { - droppedInbound.Mark(1) - } else { - droppedOutbound.Mark(1) - } - return true + if len(droppable) == 0 { + return false } - return false + // Protect peers with the highest inclusion stats. + if cm.peerStatsFunc != nil { + droppable = cm.filterProtectedPeers(droppable) + if len(droppable) == 0 { + droppedProtected.Mark(1) + return false + } + } + p := droppable[mrand.Intn(len(droppable))] + log.Debug("Dropping random peer", "inbound", p.Inbound(), + "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) + p.Disconnect(p2p.DiscUselessPeer) + if p.Inbound() { + droppedInbound.Mark(1) + } else { + droppedOutbound.Mark(1) + } + return true +} + +// filterProtectedPeers removes peers from the droppable list that are +// protected by any of the protection categories. Each category independently +// selects the top-N peers per inbound/dialed pool by score; the union of all +// selections is protected. +func (cm *dropper) filterProtectedPeers(droppable []*p2p.Peer) []*p2p.Peer { + stats := cm.peerStatsFunc() + if len(stats) == 0 { + return droppable + } + type peerWithStats struct { + peer *p2p.Peer + s PeerInclusionStats + } + var inbound, dialed []peerWithStats + for _, p := range droppable { + id := p.ID().String() + entry := peerWithStats{p, stats[id]} + if p.Inbound() { + inbound = append(inbound, entry) + } else { + dialed = append(dialed, entry) + } + } + protectedSet := make(map[*p2p.Peer]struct{}) + + protectTopN := func(entries []peerWithStats, maxPeers int, cat protectionCategory) { + n := int(float64(maxPeers) * cat.frac) + if n == 0 || len(entries) == 0 { + return + } + sort.Slice(entries, func(i, j int) bool { + return cat.score(entries[i].s) > cat.score(entries[j].s) + }) + for i := 0; i < n && i < len(entries); i++ { + if cat.score(entries[i].s) > 0 { + protectedSet[entries[i].peer] = struct{}{} + } + } + } + for _, cat := range protectionCategories { + inCopy := make([]peerWithStats, len(inbound)) + copy(inCopy, inbound) + dialCopy := make([]peerWithStats, len(dialed)) + copy(dialCopy, dialed) + + protectTopN(inCopy, cm.maxInboundPeers, cat) + protectTopN(dialCopy, cm.maxDialPeers, cat) + } + if len(protectedSet) == 0 { + return droppable + } + log.Debug("Protecting high-value peers from drop", + "protected", len(protectedSet), "droppable", len(droppable)) + + result := make([]*p2p.Peer, 0, len(droppable)) + for _, p := range droppable { + if _, ok := protectedSet[p]; !ok { + result = append(result, p) + } + } + return result } // randomDuration generates a random duration between min and max.