diff --git a/eth/backend.go b/eth/backend.go index d5ec34e21d..551506f6f3 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -463,14 +463,7 @@ func (s *Ethereum) Start() error { s.handler.txTracker.Start(s.blockchain) // Start the connection manager with inclusion-based peer protection. - s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, func() map[string]PeerInclusionStats { - stats := s.handler.txTracker.GetAllPeerStats() - result := make(map[string]PeerInclusionStats, len(stats)) - for id, ps := range stats { - result[id] = PeerInclusionStats{Finalized: ps.Finalized, RecentIncluded: ps.RecentIncluded} - } - return result - }) + s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, s.handler.txTracker.GetAllPeerStats) // start log indexer s.filterMaps.Start() diff --git a/eth/dropper.go b/eth/dropper.go index d3eb718e6a..591e3c4277 100644 --- a/eth/dropper.go +++ b/eth/dropper.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/eth/txtracker" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" @@ -58,30 +59,22 @@ var ( dropSkipped = metrics.NewRegisteredMeter("eth/dropper/skipped", nil) ) -// PeerInclusionStats holds the per-peer inclusion data needed by the dropper -// to decide which peers to protect. Any stats provider (e.g. txtracker) can -// implement getPeerInclusionStatsFunc by returning this struct per peer ID. -type PeerInclusionStats struct { - Finalized int64 // Cumulative finalized inclusions attributed to this peer - RecentIncluded float64 // EMA of per-block inclusions (0 if not tracked) -} - // Callback type to get per-peer inclusion statistics. -type getPeerInclusionStatsFunc func() map[string]PeerInclusionStats +type getPeerStatsFunc func() map[string]txtracker.PeerStats // protectionCategory defines a peer scoring function and the fraction of peers // to protect per inbound/dialed category. Multiple categories are unioned. type protectionCategory struct { name string - score func(PeerInclusionStats) float64 + score func(txtracker.PeerStats) float64 frac float64 // fraction of max peers to protect (0.0–1.0) } // protectionCategories is the list of protection criteria. Each category // independently selects its top-N peers per pool; the union is protected. var protectionCategories = []protectionCategory{ - {"total-finalized", func(s PeerInclusionStats) float64 { return float64(s.Finalized) }, inclusionProtectionFrac}, - {"recent-included", func(s PeerInclusionStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac}, + {"total-finalized", func(s txtracker.PeerStats) float64 { return float64(s.Finalized) }, inclusionProtectionFrac}, + {"recent-included", func(s txtracker.PeerStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac}, } // dropper monitors the state of the peer pool and introduces churn by @@ -107,7 +100,7 @@ type dropper struct { maxInboundPeers int // maximum number of inbound peers peersFunc getPeersFunc syncingFunc getSyncingFunc - peerStatsFunc getPeerInclusionStatsFunc // optional: inclusion stats for protection + peerStatsFunc getPeerStatsFunc // optional: inclusion stats for protection // peerDropTimer introduces churn if we are close to limit capacity. // We handle Dialed and Inbound connections separately @@ -139,7 +132,7 @@ func newDropper(maxDialPeers, maxInboundPeers int) *dropper { // Start the dropper. peerStatsFunc is optional (nil disables inclusion // protection). -func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc, peerStatsFunc getPeerInclusionStatsFunc) { +func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc, peerStatsFunc getPeerStatsFunc) { cm.peersFunc = srv.Peers cm.syncingFunc = syncingFunc cm.peerStatsFunc = peerStatsFunc @@ -234,7 +227,7 @@ func (cm *dropper) protectedPeers(peers []*p2p.Peer) map[*p2p.Peer]bool { // Factored from protectedPeers so tests can exercise the per-pool // selection logic without needing to construct direction-flagged // *p2p.Peer instances (which require unexported p2p types). -func protectedPeersByPool(inbound, dialed []*p2p.Peer, stats map[string]PeerInclusionStats) map[*p2p.Peer]bool { +func protectedPeersByPool(inbound, dialed []*p2p.Peer, stats map[string]txtracker.PeerStats) map[*p2p.Peer]bool { result := make(map[*p2p.Peer]bool) // protectPool selects the top-frac peers from pool by score and adds them to result. protectPool := func(pool []*p2p.Peer, score func(*p2p.Peer) float64, frac float64) { diff --git a/eth/dropper_test.go b/eth/dropper_test.go index ea764c9ab3..a85eb0011b 100644 --- a/eth/dropper_test.go +++ b/eth/dropper_test.go @@ -20,6 +20,7 @@ import ( "fmt" "testing" + "github.com/ethereum/go-ethereum/eth/txtracker" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" ) @@ -35,7 +36,7 @@ func makePeers(n int) []*p2p.Peer { func TestProtectedPeersNoStats(t *testing.T) { cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} - cm.peerStatsFunc = func() map[string]PeerInclusionStats { return nil } + cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return nil } peers := makePeers(10) protected := cm.protectedPeers(peers) @@ -46,8 +47,8 @@ func TestProtectedPeersNoStats(t *testing.T) { func TestProtectedPeersEmptyStats(t *testing.T) { cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} - cm.peerStatsFunc = func() map[string]PeerInclusionStats { - return map[string]PeerInclusionStats{} + cm.peerStatsFunc = func() map[string]txtracker.PeerStats { + return map[string]txtracker.PeerStats{} } peers := makePeers(10) @@ -62,11 +63,11 @@ func TestProtectedPeersTopPeer(t *testing.T) { cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} peers := makePeers(20) - stats := make(map[string]PeerInclusionStats) - stats[peers[0].ID().String()] = PeerInclusionStats{Finalized: 100} - stats[peers[1].ID().String()] = PeerInclusionStats{RecentIncluded: 5.0} + stats := make(map[string]txtracker.PeerStats) + stats[peers[0].ID().String()] = txtracker.PeerStats{Finalized: 100} + stats[peers[1].ID().String()] = txtracker.PeerStats{RecentIncluded: 5.0} - cm.peerStatsFunc = func() map[string]PeerInclusionStats { return stats } + cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats } protected := cm.protectedPeers(peers) if len(protected) != 2 { @@ -84,11 +85,11 @@ func TestProtectedPeersZeroScore(t *testing.T) { cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} peers := makePeers(10) - stats := make(map[string]PeerInclusionStats) + stats := make(map[string]txtracker.PeerStats) for _, p := range peers { - stats[p.ID().String()] = PeerInclusionStats{} + stats[p.ID().String()] = txtracker.PeerStats{} } - cm.peerStatsFunc = func() map[string]PeerInclusionStats { return stats } + cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats } protected := cm.protectedPeers(peers) if len(protected) != 0 { @@ -101,10 +102,10 @@ func TestProtectedPeersOverlap(t *testing.T) { cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} peers := makePeers(20) - stats := make(map[string]PeerInclusionStats) - stats[peers[0].ID().String()] = PeerInclusionStats{Finalized: 100, RecentIncluded: 5.0} + stats := make(map[string]txtracker.PeerStats) + stats[peers[0].ID().String()] = txtracker.PeerStats{Finalized: 100, RecentIncluded: 5.0} - cm.peerStatsFunc = func() map[string]PeerInclusionStats { return stats } + cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats } protected := cm.protectedPeers(peers) if len(protected) != 1 { @@ -137,12 +138,12 @@ func TestProtectedByPoolPerPoolTopN(t *testing.T) { dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil) } // Strictly increasing scores: highest wins in each pool. - stats := make(map[string]PeerInclusionStats) + stats := make(map[string]txtracker.PeerStats) for i, p := range inbound { - stats[p.ID().String()] = PeerInclusionStats{Finalized: int64(1 + i)} + stats[p.ID().String()] = txtracker.PeerStats{Finalized: int64(1 + i)} } for i, p := range dialed { - stats[p.ID().String()] = PeerInclusionStats{Finalized: int64(1 + i)} + stats[p.ID().String()] = txtracker.PeerStats{Finalized: int64(1 + i)} } protected := protectedPeersByPool(inbound, dialed, stats) @@ -172,10 +173,10 @@ func TestProtectedByPoolCrossCategoryOverlap(t *testing.T) { // Finalized winners: P2 (tie-broken-ok), P0 // RecentIncluded winners: P2, P1 // Union: {P0, P1, P2}. - stats := make(map[string]PeerInclusionStats) - stats[dialed[0].ID().String()] = PeerInclusionStats{Finalized: 100, RecentIncluded: 0} - stats[dialed[1].ID().String()] = PeerInclusionStats{Finalized: 0, RecentIncluded: 5.0} - stats[dialed[2].ID().String()] = PeerInclusionStats{Finalized: 200, RecentIncluded: 10.0} + stats := make(map[string]txtracker.PeerStats) + stats[dialed[0].ID().String()] = txtracker.PeerStats{Finalized: 100, RecentIncluded: 0} + stats[dialed[1].ID().String()] = txtracker.PeerStats{Finalized: 0, RecentIncluded: 5.0} + stats[dialed[2].ID().String()] = txtracker.PeerStats{Finalized: 200, RecentIncluded: 10.0} protected := protectedPeersByPool(nil, dialed, stats) @@ -202,13 +203,13 @@ func TestProtectedByPoolPerPoolIndependence(t *testing.T) { id := enode.ID{byte(100 + i)} dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil) } - stats := make(map[string]PeerInclusionStats) + stats := make(map[string]txtracker.PeerStats) // Every inbound peer outscores every dialed peer. for i, p := range inbound { - stats[p.ID().String()] = PeerInclusionStats{Finalized: int64(1000 + i)} + stats[p.ID().String()] = txtracker.PeerStats{Finalized: int64(1000 + i)} } for i, p := range dialed { - stats[p.ID().String()] = PeerInclusionStats{Finalized: int64(1 + i)} + stats[p.ID().String()] = txtracker.PeerStats{Finalized: int64(1 + i)} } protected := protectedPeersByPool(inbound, dialed, stats)