diff --git a/eth/backend.go b/eth/backend.go index 551506f6f3..1ca0989565 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -459,11 +459,13 @@ func (s *Ethereum) Start() error { // Start the networking layer s.handler.Start(s.p2pServer.MaxPeers) - // Start the transaction tracker (records tx deliveries, credits peer inclusions). - s.handler.txTracker.Start(s.blockchain) + // Start the transaction tracker; it emits per-block inclusion and + // finalization signals to peerStats, which the dropper queries for + // protection decisions. + s.handler.txTracker.Start(s.blockchain, s.handler.peerStats) // Start the connection manager with inclusion-based peer protection. - s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, s.handler.txTracker.GetAllPeerStats) + s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, s.handler.peerStats.GetAllPeerStats) // start log indexer s.filterMaps.Start() diff --git a/eth/dropper.go b/eth/dropper.go index 7b04656b09..0af4b97047 100644 --- a/eth/dropper.go +++ b/eth/dropper.go @@ -25,7 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" - "github.com/ethereum/go-ethereum/eth/txtracker" + "github.com/ethereum/go-ethereum/eth/peerstats" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" @@ -60,29 +60,29 @@ var ( ) // Callback type to get per-peer inclusion statistics. -type getPeerStatsFunc func() map[string]txtracker.PeerStats +type getPeerStatsFunc func() map[string]peerstats.PeerStats // protectionCategory defines a peer scoring function and the fraction of peers // to protect per inbound/dialed category. Multiple categories are unioned. type protectionCategory struct { name string - score func(txtracker.PeerStats) float64 + score func(peerstats.PeerStats) float64 frac float64 // fraction of max peers to protect (0.0–1.0) } // protectionCategories is the list of protection criteria. Each category // independently selects its top-N peers per pool; the union is protected. var protectionCategories = []protectionCategory{ - {"recent-finalized", func(s txtracker.PeerStats) float64 { return s.RecentFinalized }, inclusionProtectionFrac}, - {"recent-included", func(s txtracker.PeerStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac}, - {"request-latency", func(s txtracker.PeerStats) float64 { + {"recent-finalized", func(s peerstats.PeerStats) float64 { return s.RecentFinalized }, inclusionProtectionFrac}, + {"recent-included", func(s peerstats.PeerStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac}, + {"request-latency", func(s peerstats.PeerStats) float64 { // Low-latency peers should rank higher. Peers with too few samples // score 0 so the existing `score <= 0` filter excludes them — this // prevents a single lucky-fast reply from winning protection. Peers // whose EMA reaches the timeout also score 0 by this path because // the reciprocal of a very large duration is tiny but positive; the // per-pool top-N will still push faster peers ahead of them. - if s.RequestSamples < txtracker.MinLatencySamples { + if s.RequestSamples < peerstats.MinLatencySamples { return 0 } if s.RequestLatencyEMA <= 0 { @@ -244,7 +244,7 @@ func (cm *dropper) protectedPeers(peers []*p2p.Peer) map[*p2p.Peer]bool { // Factored from protectedPeers so tests can exercise the per-pool // selection logic without needing to construct direction-flagged // *p2p.Peer instances (which require unexported p2p types). -func protectedPeersByPool(inbound, dialed []*p2p.Peer, stats map[string]txtracker.PeerStats) map[*p2p.Peer]bool { +func protectedPeersByPool(inbound, dialed []*p2p.Peer, stats map[string]peerstats.PeerStats) map[*p2p.Peer]bool { result := make(map[*p2p.Peer]bool) // protectPool selects the top-frac peers from pool by score and adds them to result. protectPool := func(pool []*p2p.Peer, score func(*p2p.Peer) float64, frac float64) { diff --git a/eth/dropper_test.go b/eth/dropper_test.go index cd414925ce..e2ed638322 100644 --- a/eth/dropper_test.go +++ b/eth/dropper_test.go @@ -21,7 +21,7 @@ import ( "testing" "time" - "github.com/ethereum/go-ethereum/eth/txtracker" + "github.com/ethereum/go-ethereum/eth/peerstats" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" ) @@ -37,7 +37,7 @@ func makePeers(n int) []*p2p.Peer { func TestProtectedPeersNoStats(t *testing.T) { cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} - cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return nil } + cm.peerStatsFunc = func() map[string]peerstats.PeerStats { return nil } peers := makePeers(10) protected := cm.protectedPeers(peers) @@ -48,8 +48,8 @@ func TestProtectedPeersNoStats(t *testing.T) { func TestProtectedPeersEmptyStats(t *testing.T) { cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} - cm.peerStatsFunc = func() map[string]txtracker.PeerStats { - return map[string]txtracker.PeerStats{} + cm.peerStatsFunc = func() map[string]peerstats.PeerStats { + return map[string]peerstats.PeerStats{} } peers := makePeers(10) @@ -64,11 +64,11 @@ func TestProtectedPeersTopPeer(t *testing.T) { cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} peers := makePeers(20) - stats := make(map[string]txtracker.PeerStats) - stats[peers[0].ID().String()] = txtracker.PeerStats{RecentFinalized: 100} - stats[peers[1].ID().String()] = txtracker.PeerStats{RecentIncluded: 5.0} + stats := make(map[string]peerstats.PeerStats) + stats[peers[0].ID().String()] = peerstats.PeerStats{RecentFinalized: 100} + stats[peers[1].ID().String()] = peerstats.PeerStats{RecentIncluded: 5.0} - cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats } + cm.peerStatsFunc = func() map[string]peerstats.PeerStats { return stats } protected := cm.protectedPeers(peers) if len(protected) != 2 { @@ -86,11 +86,11 @@ func TestProtectedPeersZeroScore(t *testing.T) { cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} peers := makePeers(10) - stats := make(map[string]txtracker.PeerStats) + stats := make(map[string]peerstats.PeerStats) for _, p := range peers { - stats[p.ID().String()] = txtracker.PeerStats{} + stats[p.ID().String()] = peerstats.PeerStats{} } - cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats } + cm.peerStatsFunc = func() map[string]peerstats.PeerStats { return stats } protected := cm.protectedPeers(peers) if len(protected) != 0 { @@ -103,10 +103,10 @@ func TestProtectedPeersOverlap(t *testing.T) { cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} peers := makePeers(20) - stats := make(map[string]txtracker.PeerStats) - stats[peers[0].ID().String()] = txtracker.PeerStats{RecentFinalized: 100, RecentIncluded: 5.0} + stats := make(map[string]peerstats.PeerStats) + stats[peers[0].ID().String()] = peerstats.PeerStats{RecentFinalized: 100, RecentIncluded: 5.0} - cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats } + cm.peerStatsFunc = func() map[string]peerstats.PeerStats { return stats } protected := cm.protectedPeers(peers) if len(protected) != 1 { @@ -139,12 +139,12 @@ func TestProtectedByPoolPerPoolTopN(t *testing.T) { dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil) } // Strictly increasing scores: highest wins in each pool. - stats := make(map[string]txtracker.PeerStats) + stats := make(map[string]peerstats.PeerStats) for i, p := range inbound { - stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1 + i)} + stats[p.ID().String()] = peerstats.PeerStats{RecentFinalized: float64(1 + i)} } for i, p := range dialed { - stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1 + i)} + stats[p.ID().String()] = peerstats.PeerStats{RecentFinalized: float64(1 + i)} } protected := protectedPeersByPool(inbound, dialed, stats) @@ -174,10 +174,10 @@ func TestProtectedByPoolCrossCategoryOverlap(t *testing.T) { // RecentFinalized winners: P2 (tie-broken-ok), P0 // RecentIncluded winners: P2, P1 // Union: {P0, P1, P2}. - stats := make(map[string]txtracker.PeerStats) - stats[dialed[0].ID().String()] = txtracker.PeerStats{RecentFinalized: 100, RecentIncluded: 0} - stats[dialed[1].ID().String()] = txtracker.PeerStats{RecentFinalized: 0, RecentIncluded: 5.0} - stats[dialed[2].ID().String()] = txtracker.PeerStats{RecentFinalized: 200, RecentIncluded: 10.0} + stats := make(map[string]peerstats.PeerStats) + stats[dialed[0].ID().String()] = peerstats.PeerStats{RecentFinalized: 100, RecentIncluded: 0} + stats[dialed[1].ID().String()] = peerstats.PeerStats{RecentFinalized: 0, RecentIncluded: 5.0} + stats[dialed[2].ID().String()] = peerstats.PeerStats{RecentFinalized: 200, RecentIncluded: 10.0} protected := protectedPeersByPool(nil, dialed, stats) @@ -204,13 +204,13 @@ func TestProtectedByPoolPerPoolIndependence(t *testing.T) { id := enode.ID{byte(100 + i)} dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil) } - stats := make(map[string]txtracker.PeerStats) + stats := make(map[string]peerstats.PeerStats) // Every inbound peer outscores every dialed peer. for i, p := range inbound { - stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1000 + i)} + stats[p.ID().String()] = peerstats.PeerStats{RecentFinalized: float64(1000 + i)} } for i, p := range dialed { - stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1 + i)} + stats[p.ID().String()] = peerstats.PeerStats{RecentFinalized: float64(1 + i)} } protected := protectedPeersByPool(inbound, dialed, stats) @@ -239,17 +239,17 @@ func TestProtectedByPoolPerPoolIndependence(t *testing.T) { // (among those with enough samples) win top-N protection. func TestProtectedByPoolRequestLatencyBasic(t *testing.T) { dialed := makePeers(20) // frac=0.1 → n=2 per category - stats := make(map[string]txtracker.PeerStats) + stats := make(map[string]peerstats.PeerStats) // Three peers have enough samples; the two fastest should win. - stats[dialed[0].ID().String()] = txtracker.PeerStats{ + stats[dialed[0].ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 50 * time.Millisecond, RequestSamples: 50, } - stats[dialed[1].ID().String()] = txtracker.PeerStats{ + stats[dialed[1].ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 100 * time.Millisecond, RequestSamples: 50, } - stats[dialed[2].ID().String()] = txtracker.PeerStats{ + stats[dialed[2].ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 2 * time.Second, RequestSamples: 50, } @@ -275,16 +275,16 @@ func TestProtectedByPoolRequestLatencyBasic(t *testing.T) { // if their few samples indicate very low latency. func TestProtectedByPoolRequestLatencyBootstrapGuard(t *testing.T) { dialed := makePeers(20) - stats := make(map[string]txtracker.PeerStats) + stats := make(map[string]peerstats.PeerStats) // A lucky-fast peer with only 1 sample — must NOT be protected. - stats[dialed[0].ID().String()] = txtracker.PeerStats{ + stats[dialed[0].ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 1 * time.Millisecond, RequestSamples: 1, } // A warmed-up but slower peer — should be protected on latency. - stats[dialed[1].ID().String()] = txtracker.PeerStats{ + stats[dialed[1].ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 500 * time.Millisecond, - RequestSamples: txtracker.MinLatencySamples, + RequestSamples: peerstats.MinLatencySamples, } protected := protectedPeersByPool(nil, dialed, stats) @@ -308,10 +308,10 @@ func TestProtectedByPoolRequestLatencyPerPool(t *testing.T) { id := enode.ID{byte(100 + i)} dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil) } - stats := make(map[string]txtracker.PeerStats) + stats := make(map[string]peerstats.PeerStats) // All inbound peers are very fast (50ms). for _, p := range inbound { - stats[p.ID().String()] = txtracker.PeerStats{ + stats[p.ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 50 * time.Millisecond, RequestSamples: 50, } @@ -319,7 +319,7 @@ func TestProtectedByPoolRequestLatencyPerPool(t *testing.T) { // Dialed peers are slower (1s) — globally they would all lose, but // per-pool top-N should still protect two of them. for _, p := range dialed { - stats[p.ID().String()] = txtracker.PeerStats{ + stats[p.ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 1 * time.Second, RequestSamples: 50, } diff --git a/eth/handler.go b/eth/handler.go index d7d02295f4..8916dad662 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/eth/fetcher" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" + "github.com/ethereum/go-ethereum/eth/peerstats" "github.com/ethereum/go-ethereum/eth/txtracker" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -125,6 +126,7 @@ type handler struct { downloader *downloader.Downloader txFetcher *fetcher.TxFetcher txTracker *txtracker.Tracker + peerStats *peerstats.Stats peers *peerSet txBroadcastKey [16]byte @@ -191,7 +193,8 @@ func newHandler(config *handlerConfig) (*handler, error) { return nil } h.txTracker = txtracker.New() - h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted, h.txTracker.NotifyRequestLatency) + h.peerStats = peerstats.New() + h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted, h.peerStats.NotifyRequestLatency) return h, nil } @@ -406,7 +409,7 @@ func (h *handler) unregisterPeer(id string) { } h.downloader.UnregisterPeer(id) h.txFetcher.Drop(id) - h.txTracker.NotifyPeerDrop(id) + h.peerStats.NotifyPeerDrop(id) if err := h.peers.unregisterPeer(id); err != nil { logger.Error("Ethereum peer removal failed", "err", err) diff --git a/eth/peerstats/peerstats.go b/eth/peerstats/peerstats.go new file mode 100644 index 0000000000..567a27309a --- /dev/null +++ b/eth/peerstats/peerstats.go @@ -0,0 +1,172 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package peerstats maintains per-peer quality metrics used by the peer +// dropper to protect high-value peers from random disconnection. +// +// The package is a passive accumulator: it exposes entry points for its +// signal producers (txtracker for inclusion/finalization, the tx fetcher +// for latency, the handler for peer-drop cleanup) and a read-only +// snapshot for its consumer (the dropper). It has no goroutine of its +// own — all mutation is serialized by a single mutex. +// +// Signal sources: +// - NotifyBlock(inclusions, finalized) — per-block deltas from txtracker +// (computed under txtracker's own lock, then passed in after release) +// - NotifyRequestLatency(peer, latency) — per-request samples from the +// fetcher; timeouts are reported with the timeout value so slow peers +// contribute to the EMA +// - NotifyPeerDrop(peer) — called from the handler on disconnect +package peerstats + +import ( + "sync" + "time" +) + +const ( + // EMA smoothing factor for per-block inclusion rate. + emaAlpha = 0.05 + // EMA smoothing factor for per-block finalization rate. Very slow on + // purpose: finalization is permanent, and the score should reflect + // sustained contribution over long windows, not recent bursts. + // Half-life ≈ 6930 chain heads (~23 hours on 12s blocks). + finalizedEMAAlpha = 0.0001 + // EMA smoothing factor for per-request latency average. Slow on purpose: + // short bursts shouldn't shift the score, sustained behavior should. + // Half-life ≈ ln(0.5)/ln(0.99) ≈ 69 samples. + latencyEMAAlpha = 0.01 + // MinLatencySamples is the number of latency samples a peer must accumulate + // before its RequestLatencyEMA is considered meaningful for protection. + // Prevents a single lucky-fast reply from displacing established peers. + MinLatencySamples = 10 +) + +// PeerStats is the exported per-peer snapshot returned by GetAllPeerStats. +type PeerStats struct { + RecentFinalized float64 // EMA of per-block finalization credits (slow) + RecentIncluded float64 // EMA of per-block inclusions (fast) + RequestLatencyEMA time.Duration // Slow EMA of tx-request response latency (timeouts count as the timeout value) + RequestSamples int64 // Number of latency samples seen (for bootstrap guard) +} + +// peerStats is the internal mutable state per peer. +type peerStats struct { + recentFinalized float64 + recentIncluded float64 + requestLatencyEMA time.Duration + requestSamples int64 +} + +// Stats is the per-peer quality aggregator. +type Stats struct { + mu sync.Mutex + peers map[string]*peerStats +} + +// New creates an empty Stats. +func New() *Stats { + return &Stats{peers: make(map[string]*peerStats)} +} + +// NotifyBlock ingests a per-block update. `inclusions` is the count of the head +// block's transactions attributed to each peer; peers with a positive +// count get a stats entry created if one doesn't exist (this is how +// peerstats learns about newly-active peers). Peers not in the map but +// already tracked have their EMA decay with a zero sample. +// +// `finalized` is per-peer credits accumulated since the last NotifyBlock; +// credits are only applied to peers already tracked — we don't resurrect +// dropped peers from historical finalization data. +// +// NotifyBlock must NOT be called while the caller holds any other lock that +// could be acquired by peerstats callers in reverse order. Current callers +// (txtracker.handleChainHead) release their lock before invoking NotifyBlock. +func (s *Stats) NotifyBlock(inclusions, finalized map[string]int) { + s.mu.Lock() + defer s.mu.Unlock() + + // Ensure a stats entry exists for any peer that just had an inclusion. + // This is the primary path by which peerstats learns about a peer's + // inclusion activity. + for peer, count := range inclusions { + if count > 0 && s.peers[peer] == nil { + s.peers[peer] = &peerStats{} + } + } + // Update inclusion and finalization EMAs for every tracked peer. A + // peer not present in the respective delta map gets a 0 contribution + // — pure decay. Finalization credits for peers no longer tracked are + // ignored (don't resurrect dropped peers from historical data). + for peer, ps := range s.peers { + ps.recentIncluded = (1-emaAlpha)*ps.recentIncluded + emaAlpha*float64(inclusions[peer]) + ps.recentFinalized = (1-finalizedEMAAlpha)*ps.recentFinalized + finalizedEMAAlpha*float64(finalized[peer]) + } +} + +// NotifyRequestLatency records a tx-request response latency sample for +// the given peer. Timeouts should be reported as the timeout value. +// Creates a peer entry if one doesn't exist (a peer may have latency +// samples before any inclusion signal). +func (s *Stats) NotifyRequestLatency(peer string, latency time.Duration) { + s.mu.Lock() + defer s.mu.Unlock() + + ps := s.peers[peer] + if ps == nil { + ps = &peerStats{} + s.peers[peer] = ps + } + if ps.requestSamples == 0 { + // Bootstrap the EMA with the first sample so it doesn't drift up + // from zero over many samples before reaching realistic values. + ps.requestLatencyEMA = latency + } else { + ps.requestLatencyEMA = time.Duration( + float64(ps.requestLatencyEMA)*(1-latencyEMAAlpha) + + float64(latency)*latencyEMAAlpha, + ) + } + ps.requestSamples++ +} + +// NotifyPeerDrop removes a peer's stats on disconnect. A rare stale +// latency sample racing with the drop may recreate the peer entry with +// one sample; that entry can never earn protection (MinLatencySamples +// guard) and is harmless. +func (s *Stats) NotifyPeerDrop(peer string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.peers, peer) +} + +// GetAllPeerStats returns a snapshot of per-peer stats. Called by the +// dropper every few minutes; allocation cost is negligible at that rate. +func (s *Stats) GetAllPeerStats() map[string]PeerStats { + s.mu.Lock() + defer s.mu.Unlock() + + result := make(map[string]PeerStats, len(s.peers)) + for id, ps := range s.peers { + result[id] = PeerStats{ + RecentFinalized: ps.recentFinalized, + RecentIncluded: ps.recentIncluded, + RequestLatencyEMA: ps.requestLatencyEMA, + RequestSamples: ps.requestSamples, + } + } + return result +} diff --git a/eth/peerstats/peerstats_test.go b/eth/peerstats/peerstats_test.go new file mode 100644 index 0000000000..42d3bfa385 --- /dev/null +++ b/eth/peerstats/peerstats_test.go @@ -0,0 +1,223 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package peerstats + +import ( + "testing" + "time" +) + +// TestNotifyBlockBootstrapsFromInclusions verifies that a peer with a positive +// inclusion count in the first NotifyBlock gets a stats entry created. +func TestNotifyBlockBootstrapsFromInclusions(t *testing.T) { + s := New() + s.NotifyBlock(map[string]int{"peerA": 3}, nil) + + stats := s.GetAllPeerStats() + if len(stats) != 1 { + t.Fatalf("expected 1 peer entry, got %d", len(stats)) + } + ps, ok := stats["peerA"] + if !ok { + t.Fatal("expected peerA entry") + } + // EMA after first block: (1-0.05)*0 + 0.05*3 = 0.15 + if ps.RecentIncluded <= 0 { + t.Fatalf("expected RecentIncluded > 0 after inclusion, got %f", ps.RecentIncluded) + } +} + +// TestNotifyBlockDecaysKnownPeers verifies that peers already tracked get their +// RecentIncluded EMA decayed when they have no inclusions in a block. +func TestNotifyBlockDecaysKnownPeers(t *testing.T) { + s := New() + // Seed peerA with an inclusion. + s.NotifyBlock(map[string]int{"peerA": 3}, nil) + initial := s.GetAllPeerStats()["peerA"].RecentIncluded + + // Empty block — peerA should decay. + s.NotifyBlock(nil, nil) + after := s.GetAllPeerStats()["peerA"].RecentIncluded + + if after >= initial { + t.Fatalf("expected decay, got %f >= %f", after, initial) + } +} + +// TestNotifyBlockDoesNotResurrectDroppedPeers verifies that finalization +// credits to a peer with no entry don't create one. +func TestNotifyBlockDoesNotResurrectFromFinalization(t *testing.T) { + s := New() + s.NotifyBlock(nil, map[string]int{"peerA": 5}) + + if stats := s.GetAllPeerStats(); len(stats) != 0 { + t.Fatalf("finalization credits must not create entries, got %d peers", len(stats)) + } +} + +// TestNotifyBlockDropThenFinalizeNoResurrect verifies the full drop→finalize +// sequence: a dropped peer doesn't come back via finalization credits. +func TestNotifyBlockDropThenFinalizeNoResurrect(t *testing.T) { + s := New() + s.NotifyBlock(map[string]int{"peerA": 1}, nil) + s.NotifyPeerDrop("peerA") + s.NotifyBlock(nil, map[string]int{"peerA": 10}) + + if stats := s.GetAllPeerStats(); len(stats) != 0 { + t.Fatalf("dropped peer must not be resurrected, got %d peers", len(stats)) + } +} + +// TestNotifyBlockFinalizationCredits an existing peer. +func TestNotifyBlockFinalizationCredits(t *testing.T) { + s := New() + s.NotifyBlock(map[string]int{"peerA": 1}, nil) + s.NotifyBlock(nil, map[string]int{"peerA": 3}) + + // RecentFinalized is a slow EMA, not a cumulative count: assert it + // moved in the positive direction, not the exact value. + if got := s.GetAllPeerStats()["peerA"].RecentFinalized; got <= 0 { + t.Fatalf("expected RecentFinalized>0 after credits, got %f", got) + } +} + +// TestNotifyBlockInclusionEMAUpdate verifies the EMA formula (1-α)·old + α·count. +func TestNotifyBlockInclusionEMAUpdate(t *testing.T) { + s := New() + // Three inclusions: EMA = 0.05 * 3 = 0.15 + s.NotifyBlock(map[string]int{"peerA": 3}, nil) + got := s.GetAllPeerStats()["peerA"].RecentIncluded + want := 0.15 + if diff := got - want; diff < -1e-9 || diff > 1e-9 { + t.Fatalf("EMA after one sample: got %f, want %f", got, want) + } + // Next block with 10 inclusions: EMA = 0.95*0.15 + 0.05*10 = 0.6425 + s.NotifyBlock(map[string]int{"peerA": 10}, nil) + got = s.GetAllPeerStats()["peerA"].RecentIncluded + want = 0.6425 + if diff := got - want; diff < -1e-9 || diff > 1e-9 { + t.Fatalf("EMA after two samples: got %f, want %f", got, want) + } +} + +// TestNotifyRequestLatencyFirstSampleBootstrap asserts that the first +// latency sample seeds the EMA directly. +func TestNotifyRequestLatencyFirstSampleBootstrap(t *testing.T) { + s := New() + s.NotifyRequestLatency("peerA", 200*time.Millisecond) + + ps := s.GetAllPeerStats()["peerA"] + if ps.RequestLatencyEMA != 200*time.Millisecond { + t.Fatalf("expected first sample to seed EMA at 200ms, got %v", ps.RequestLatencyEMA) + } + if ps.RequestSamples != 1 { + t.Fatalf("expected RequestSamples=1, got %d", ps.RequestSamples) + } +} + +// TestNotifyRequestLatencyEMAUpdate verifies the EMA formula for latency. +func TestNotifyRequestLatencyEMAUpdate(t *testing.T) { + s := New() + s.NotifyRequestLatency("peerA", 100*time.Millisecond) + s.NotifyRequestLatency("peerA", 1000*time.Millisecond) + + // Expected: 0.99*100ms + 0.01*1000ms = 109ms + got := s.GetAllPeerStats()["peerA"].RequestLatencyEMA + want := 109 * time.Millisecond + delta := got - want + if delta < 0 { + delta = -delta + } + if delta > 1*time.Microsecond { + t.Fatalf("EMA mismatch: got %v, want %v", got, want) + } + if samples := s.GetAllPeerStats()["peerA"].RequestSamples; samples != 2 { + t.Fatalf("expected RequestSamples=2, got %d", samples) + } +} + +// TestNotifyRequestLatencySlowConvergence verifies the slow alpha +// damps convergence under sustained timeouts. +func TestNotifyRequestLatencySlowConvergence(t *testing.T) { + s := New() + s.NotifyRequestLatency("peerA", 100*time.Millisecond) + for i := 0; i < 50; i++ { + s.NotifyRequestLatency("peerA", 5*time.Second) + } + got := s.GetAllPeerStats()["peerA"].RequestLatencyEMA + if got < 1*time.Second { + t.Fatalf("EMA did not move enough under sustained timeouts, got %v", got) + } + if got > 3*time.Second { + t.Fatalf("EMA converged too fast for slow alpha=0.01, got %v", got) + } +} + +// TestNotifyPeerDropClearsStats verifies that a dropped peer disappears +// from GetAllPeerStats. +func TestNotifyPeerDropClearsStats(t *testing.T) { + s := New() + s.NotifyRequestLatency("peerA", 200*time.Millisecond) + s.NotifyPeerDrop("peerA") + + if _, ok := s.GetAllPeerStats()["peerA"]; ok { + t.Fatal("peerA stats should be removed after NotifyPeerDrop") + } +} + +// TestStaleRequestLatencyAfterDrop documents the accepted behavior: a +// late sample after NotifyPeerDrop recreates a 1-sample entry. The +// dropper's MinLatencySamples=10 guard ensures this is harmless. +func TestStaleRequestLatencyAfterDrop(t *testing.T) { + s := New() + s.NotifyRequestLatency("peerA", 200*time.Millisecond) + s.NotifyPeerDrop("peerA") + // Late sample racing with the drop. + s.NotifyRequestLatency("peerA", 50*time.Millisecond) + + ps := s.GetAllPeerStats()["peerA"] + if ps.RequestSamples != 1 { + t.Fatalf("expected fresh RequestSamples=1, got %d", ps.RequestSamples) + } + if ps.RequestLatencyEMA != 50*time.Millisecond { + t.Fatalf("expected fresh bootstrap at 50ms, got %v", ps.RequestLatencyEMA) + } + // The dropper's MinLatencySamples guard (in eth/dropper.go) prevents + // this 1-sample entry from earning latency-based protection. +} + +// TestMultiplePeersIsolated verifies per-peer isolation across signal types. +func TestMultiplePeersIsolated(t *testing.T) { + s := New() + s.NotifyBlock(map[string]int{"peerA": 5, "peerB": 0}, nil) + s.NotifyRequestLatency("peerA", 100*time.Millisecond) + s.NotifyRequestLatency("peerB", 5*time.Second) + s.NotifyBlock(nil, map[string]int{"peerA": 2}) + + stats := s.GetAllPeerStats() + // Only peerA receives finalization credits; peerB's EMA stays at zero + // (no credits, pure decay from zero). + if stats["peerA"].RecentFinalized <= 0 || stats["peerB"].RecentFinalized != 0 { + t.Errorf("finalization leaked: A=%f B=%f", stats["peerA"].RecentFinalized, stats["peerB"].RecentFinalized) + } + if stats["peerA"].RequestLatencyEMA != 100*time.Millisecond { + t.Errorf("peerA latency: got %v, want 100ms", stats["peerA"].RequestLatencyEMA) + } + if stats["peerB"].RequestLatencyEMA != 5*time.Second { + t.Errorf("peerB latency: got %v, want 5s", stats["peerB"].RequestLatencyEMA) + } +} diff --git a/eth/txtracker/tracker.go b/eth/txtracker/tracker.go index 8c678c6561..661dda32f6 100644 --- a/eth/txtracker/tracker.go +++ b/eth/txtracker/tracker.go @@ -14,21 +14,18 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Package txtracker provides minimal per-peer transaction inclusion tracking. +// Package txtracker maps accepted transactions to their delivering peer +// and observes chain-head and finalization events to emit per-block +// per-peer signals to a StatsConsumer (typically eth/peerstats). // -// It records which peer delivered each accepted transaction (via NotifyAccepted) -// and monitors the chain for inclusion and finalization events. When a -// delivered transaction is finalized on chain, the delivering peer is -// credited. A per-block exponential moving average (EMA) of inclusions -// tracks recent peer productivity. -// -// The primary consumer is the peer dropper (eth/dropper.go), which uses -// these stats to protect high-value peers from random disconnection. +// The tracker owns the tx-hash → deliverer-peer map with FIFO eviction, +// a chain-head subscription goroutine, and the computation of per-block +// inclusion counts and finalization credits. It does NOT maintain +// per-peer aggregates — that is peerstats' job. package txtracker import ( "sync" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -40,31 +37,8 @@ import ( const ( // Maximum number of tx→deliverer mappings to retain. maxTracked = 262144 - // EMA smoothing factor for per-block inclusion rate. - emaAlpha = 0.05 - // EMA smoothing factor for per-block finalization rate. Very slow on - // purpose: finalization is permanent, and the score should reflect - // sustained contribution over long windows, not recent bursts. - // Half-life ≈ 6930 chain heads (~23 hours on 12s blocks). - finalizedEMAAlpha = 0.0001 - // EMA smoothing factor for per-request latency average. Slow on purpose: - // short bursts shouldn't shift the score, sustained behavior should. - // Half-life ≈ ln(0.5)/ln(0.99) ≈ 69 samples. - latencyEMAAlpha = 0.01 - // MinLatencySamples is the number of latency samples a peer must accumulate - // before its RequestLatencyEMA is considered meaningful for protection. - // Prevents a single lucky-fast reply from displacing established peers. - MinLatencySamples = 10 ) -// PeerStats holds the per-peer inclusion and responsiveness data. -type PeerStats struct { - RecentFinalized float64 // EMA of per-block finalization credits (slow) - RecentIncluded float64 // EMA of per-block inclusions (fast) - RequestLatencyEMA time.Duration // Slow EMA of tx-request response latency (timeouts count as the timeout value) - RequestSamples int64 // Number of latency samples seen for this peer -} - // Chain is the blockchain interface needed by the tracker. type Chain interface { SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription @@ -73,22 +47,29 @@ type Chain interface { CurrentFinalBlock() *types.Header } -type peerStats struct { - recentFinalized float64 - recentIncluded float64 - requestLatencyEMA time.Duration - requestSamples int64 +// StatsConsumer receives per-block signals about peer inclusion and +// finalization. The tracker invokes NotifyBlock exactly once per handled chain +// head, AFTER releasing its own lock, with: +// +// - inclusions: per-peer count of transactions in the head block +// - finalized: per-peer count of transactions in blocks that became +// finalized since the previous call (possibly zero-range) +// +// Either map may be empty but the slice/map itself is never nil when +// called. NotifyBlock must not call back into the tracker. +type StatsConsumer interface { + NotifyBlock(inclusions, finalized map[string]int) } -// Tracker records which peer delivered each transaction and credits peers -// when their transactions appear on chain. +// Tracker records which peer delivered each transaction and emits +// per-block inclusion and finalization signals to a StatsConsumer. type Tracker struct { mu sync.Mutex txs map[common.Hash]string // hash → deliverer peer ID - peers map[string]*peerStats - order []common.Hash // insertion order for LRU eviction + order []common.Hash // insertion order for FIFO eviction chain Chain + consumer StatsConsumer lastFinalNum uint64 // last finalized block number processed headCh chan core.ChainHeadEvent sub event.Subscription @@ -101,16 +82,18 @@ type Tracker struct { // New creates a new tracker. func New() *Tracker { return &Tracker{ - txs: make(map[common.Hash]string), - peers: make(map[string]*peerStats), - quit: make(chan struct{}), - step: make(chan struct{}, 1), + txs: make(map[common.Hash]string), + quit: make(chan struct{}), + step: make(chan struct{}, 1), } } -// Start begins listening for chain head events. -func (t *Tracker) Start(chain Chain) { +// Start begins listening for chain head events. `consumer` receives +// per-block signals; if nil, signals are computed but discarded +// (useful in tests that exercise only the tx-lifecycle surface). +func (t *Tracker) Start(chain Chain, consumer StatsConsumer) { t.chain = chain + t.consumer = consumer // Seed lastFinalNum so checkFinalization doesn't backfill from genesis. if fh := chain.CurrentFinalBlock(); fh != nil { t.lastFinalNum = fh.Number.Uint64() @@ -121,14 +104,6 @@ func (t *Tracker) Start(chain Chain) { go t.loop() } -// NotifyPeerDrop removes a disconnected peer's stats to prevent unbounded -// growth. Safe to call from any goroutine. -func (t *Tracker) NotifyPeerDrop(peer string) { - t.mu.Lock() - defer t.mu.Unlock() - delete(t.peers, peer) -} - // Stop shuts down the tracker. func (t *Tracker) Stop() { t.sub.Unsubscribe() @@ -151,10 +126,6 @@ func (t *Tracker) NotifyAccepted(peer string, hashes []common.Hash) { t.txs[hash] = peer t.order = append(t.order, hash) } - // Ensure the delivering peer has a stats entry. - if len(hashes) > 0 && t.peers[peer] == nil { - t.peers[peer] = &peerStats{} - } // Evict oldest entries if over capacity. for len(t.txs) > maxTracked { oldest := t.order[0] @@ -168,51 +139,6 @@ func (t *Tracker) NotifyAccepted(peer string, hashes []common.Hash) { } } -// NotifyRequestLatency records a tx-request response latency sample for the -// given peer. Timeouts should be reported as the timeout value (so they count -// against the EMA rather than being silently omitted). The EMA uses a slow -// alpha so isolated bursts don't shift the score appreciably. -// Safe to call from any goroutine. -func (t *Tracker) NotifyRequestLatency(peer string, latency time.Duration) { - t.mu.Lock() - defer t.mu.Unlock() - - ps := t.peers[peer] - if ps == nil { - ps = &peerStats{} - t.peers[peer] = ps - } - if ps.requestSamples == 0 { - // Bootstrap the EMA with the first sample so it doesn't drift up - // from zero over many samples before reaching realistic values. - ps.requestLatencyEMA = latency - } else { - ps.requestLatencyEMA = time.Duration( - float64(ps.requestLatencyEMA)*(1-latencyEMAAlpha) + - float64(latency)*latencyEMAAlpha, - ) - } - ps.requestSamples++ -} - -// GetAllPeerStats returns a snapshot of per-peer inclusion statistics. -// Safe to call from any goroutine. -func (t *Tracker) GetAllPeerStats() map[string]PeerStats { - t.mu.Lock() - defer t.mu.Unlock() - - result := make(map[string]PeerStats, len(t.peers)) - for id, ps := range t.peers { - result[id] = PeerStats{ - RecentFinalized: ps.recentFinalized, - RecentIncluded: ps.recentIncluded, - RequestLatencyEMA: ps.requestLatencyEMA, - RequestSamples: ps.requestSamples, - } - } - return result -} - func (t *Tracker) loop() { defer t.wg.Done() @@ -232,6 +158,10 @@ func (t *Tracker) loop() { } } +// handleChainHead computes per-peer deltas for the new head block and any +// newly-finalized blocks, then hands them to the StatsConsumer AFTER +// releasing t.mu. The lock-release-before-consumer pattern avoids any +// cross-package lock ordering. func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) { // Fetch the head block by hash (not just number) to avoid using a // reorged block if the tracker goroutine lags behind the chain. @@ -239,35 +169,28 @@ func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) { if block == nil { return } - t.mu.Lock() - defer t.mu.Unlock() - // Count per-peer inclusions in this block for the inclusion EMA. - blockIncl := make(map[string]int) + t.mu.Lock() + // Count per-peer inclusions in the head block. + inclusions := make(map[string]int) for _, tx := range block.Transactions() { if peer := t.txs[tx.Hash()]; peer != "" { - blockIncl[peer]++ + inclusions[peer]++ } } - // Accumulate per-peer finalization credits over the newly-finalized - // range (possibly zero blocks). Only counts peers still tracked. - blockFinal := t.collectFinalizationCredits() + // Compute per-peer finalization credits since the last call. + finalized := t.collectFinalization() + t.mu.Unlock() - // Update both EMAs for all tracked peers (decays inactive ones). - // Don't create entries for unknown peers — they may have been - // removed by NotifyPeerDrop and should not be resurrected. - for peer, ps := range t.peers { - ps.recentIncluded = (1-emaAlpha)*ps.recentIncluded + emaAlpha*float64(blockIncl[peer]) - ps.recentFinalized = (1-finalizedEMAAlpha)*ps.recentFinalized + finalizedEMAAlpha*float64(blockFinal[peer]) + if t.consumer != nil { + t.consumer.NotifyBlock(inclusions, finalized) } } -// collectFinalizationCredits accumulates per-peer finalization credits for -// blocks newly finalized since lastFinalNum. Returns a (possibly empty) map -// keyed by peer ID; advances lastFinalNum. Must be called with t.mu held. -// Peers that have already been removed by NotifyPeerDrop are skipped so -// dropped peers are not resurrected by old on-chain data. -func (t *Tracker) collectFinalizationCredits() map[string]int { +// collectFinalization accumulates per-peer finalization credits for +// blocks newly finalized since lastFinalNum. Returns a (possibly empty) +// map; advances lastFinalNum. Must be called with t.mu held. +func (t *Tracker) collectFinalization() map[string]int { credits := make(map[string]int) finalHeader := t.chain.CurrentFinalBlock() if finalHeader == nil { @@ -283,14 +206,9 @@ func (t *Tracker) collectFinalizationCredits() map[string]int { continue } for _, tx := range block.Transactions() { - peer := t.txs[tx.Hash()] - if peer == "" { - continue + if peer := t.txs[tx.Hash()]; peer != "" { + credits[peer]++ } - if _, ok := t.peers[peer]; !ok { - continue // peer disconnected, skip credit - } - credits[peer]++ } } if total := sumCounts(credits); total > 0 { diff --git a/eth/txtracker/tracker_test.go b/eth/txtracker/tracker_test.go index 280694b273..ad0637eb0a 100644 --- a/eth/txtracker/tracker_test.go +++ b/eth/txtracker/tracker_test.go @@ -79,20 +79,17 @@ func (c *mockChain) CurrentFinalBlock() *types.Header { return &types.Header{Number: new(big.Int).SetUint64(c.finalNum)} } -// addBlock adds a canonical block at the given height. Overwrites any -// prior canonical block at that height. +// addBlock adds a canonical block at the given height. func (c *mockChain) addBlock(num uint64, txs []*types.Transaction) *types.Block { return c.addBlockAtHeight(num, num, txs, true) } // addBlockAtHeight adds a block at the given height. The salt parameter -// ensures distinct block hashes for two blocks at the same height (used -// for reorg tests). If canonical is true, the block becomes the canonical -// block for that height (looked up by GetBlockByNumber). +// ensures distinct block hashes for two blocks at the same height. If +// canonical is true, the block becomes the canonical block for that height. func (c *mockChain) addBlockAtHeight(num, salt uint64, txs []*types.Transaction, canonical bool) *types.Block { c.mu.Lock() defer c.mu.Unlock() - // Mix salt into Extra so siblings at the same height get distinct hashes. header := &types.Header{ Number: new(big.Int).SetUint64(num), Extra: big.NewInt(int64(salt)).Bytes(), @@ -111,9 +108,7 @@ func (c *mockChain) setFinalBlock(num uint64) { c.finalNum = num } -// sendHead emits a chain head event for the canonical block at the given -// height. The emitted header carries the real block's hash so the -// tracker's GetBlock(hash, number) lookup resolves correctly. +// sendHead emits a chain head event for the canonical block at the given height. func (c *mockChain) sendHead(num uint64) { c.mu.Lock() hash := c.canonicalByNum[num] @@ -143,6 +138,49 @@ func makeTx(nonce uint64) *types.Transaction { return types.NewTx(&types.LegacyTx{Nonce: nonce, GasPrice: big.NewInt(1), Gas: 21000}) } +// mockConsumer captures NotifyBlock invocations so tests can assert on the +// signals the tracker emits. +type mockConsumer struct { + mu sync.Mutex + signals []signal +} + +type signal struct { + inclusions, finalized map[string]int +} + +func (c *mockConsumer) NotifyBlock(inclusions, finalized map[string]int) { + c.mu.Lock() + defer c.mu.Unlock() + // Deep-copy so tests inspecting older signals aren't tripped up by + // later iterations mutating the same map (they don't today, but + // this keeps the assertion model simple). + in := make(map[string]int, len(inclusions)) + for k, v := range inclusions { + in[k] = v + } + fn := make(map[string]int, len(finalized)) + for k, v := range finalized { + fn[k] = v + } + c.signals = append(c.signals, signal{in, fn}) +} + +func (c *mockConsumer) last() signal { + c.mu.Lock() + defer c.mu.Unlock() + if len(c.signals) == 0 { + return signal{} + } + return c.signals[len(c.signals)-1] +} + +func (c *mockConsumer) count() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.signals) +} + // waitStep blocks until the tracker has processed one event. func waitStep(t *testing.T, tr *Tracker) { t.Helper() @@ -153,33 +191,16 @@ func waitStep(t *testing.T, tr *Tracker) { } } -func TestNotifyReceived(t *testing.T) { +// TestNotifyAcceptedRecordsMapping verifies the tx-lifecycle surface: +// NotifyAccepted records tx→peer mappings in insertion order, with +// first-deliverer-wins semantics on duplicates. +func TestNotifyAcceptedRecordsMapping(t *testing.T) { tr := New() - chain := newMockChain() - tr.Start(chain) - defer tr.Stop() txs := []*types.Transaction{makeTx(1), makeTx(2), makeTx(3)} hashes := hashTxs(txs) tr.NotifyAccepted("peerA", hashes) - // Public surface: peer entry was created with zero stats before any - // chain events. Map lookups would return a zero value for a missing - // key, so assert presence explicitly. - stats := tr.GetAllPeerStats() - if len(stats) != 1 { - t.Fatalf("expected 1 peer entry, got %d", len(stats)) - } - ps, ok := stats["peerA"] - if !ok { - t.Fatal("expected peerA entry, not found") - } - if ps.RecentFinalized != 0 || ps.RecentIncluded != 0 { - t.Fatalf("expected zero stats before chain events, got %+v", ps) - } - - // Internal state: all tx→deliverer mappings recorded, insertion order - // preserved in the FIFO slice. tr.mu.Lock() defer tr.mu.Unlock() if len(tr.txs) != 3 { @@ -198,191 +219,114 @@ func TestNotifyReceived(t *testing.T) { } } -func TestInclusionEMA(t *testing.T) { +// TestNotifyAcceptedFirstDelivererWins verifies duplicate accepts +// preserve the original deliverer. +func TestNotifyAcceptedFirstDelivererWins(t *testing.T) { tr := New() - chain := newMockChain() - tr.Start(chain) - defer tr.Stop() - tx := makeTx(1) tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + tr.NotifyAccepted("peerB", []common.Hash{tx.Hash()}) - // Block 1 includes peerA's tx. - chain.addBlock(1, []*types.Transaction{tx}) - chain.sendHead(1) - waitStep(t, tr) - - stats := tr.GetAllPeerStats() - if stats["peerA"].RecentIncluded <= 0 { - t.Fatalf("expected RecentIncluded > 0 after inclusion, got %f", stats["peerA"].RecentIncluded) + tr.mu.Lock() + defer tr.mu.Unlock() + if got := tr.txs[tx.Hash()]; got != "peerA" { + t.Fatalf("expected first deliverer peerA to win, got %q", got) } - ema1 := stats["peerA"].RecentIncluded - - // Block 2 has no txs from peerA — EMA should decay. - chain.addBlock(2, nil) - chain.sendHead(2) - waitStep(t, tr) - - stats = tr.GetAllPeerStats() - if stats["peerA"].RecentIncluded >= ema1 { - t.Fatalf("expected EMA to decay, got %f >= %f", stats["peerA"].RecentIncluded, ema1) + if len(tr.order) != 1 { + t.Fatalf("expected single order entry, got %d", len(tr.order)) } } -func TestFinalization(t *testing.T) { +// TestHandleChainHeadEmitsInclusions verifies the tracker emits a +// correct per-peer inclusion map to its consumer when a head block +// contains tracked transactions. +func TestHandleChainHeadEmitsInclusions(t *testing.T) { tr := New() chain := newMockChain() - tr.Start(chain) + consumer := &mockConsumer{} + tr.Start(chain, consumer) defer tr.Stop() - tx := makeTx(1) - tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) - - // Include in block 1. - chain.addBlock(1, []*types.Transaction{tx}) - chain.sendHead(1) - waitStep(t, tr) - - // Not finalized yet. - stats := tr.GetAllPeerStats() - if stats["peerA"].RecentFinalized != 0 { - t.Fatalf("expected RecentFinalized=0 before finalization, got %f", stats["peerA"].RecentFinalized) - } - - // Finalize block 1, then send head 2 to trigger the finalization EMA update. - chain.setFinalBlock(1) - chain.addBlock(2, nil) - chain.sendHead(2) - waitStep(t, tr) - - stats = tr.GetAllPeerStats() - if stats["peerA"].RecentFinalized <= 0 { - t.Fatalf("expected RecentFinalized>0 after finalization, got %f", stats["peerA"].RecentFinalized) - } -} - -func TestMultiplePeers(t *testing.T) { - tr := New() - chain := newMockChain() - tr.Start(chain) - defer tr.Stop() - - tx1 := makeTx(1) - tx2 := makeTx(2) + tx1, tx2 := makeTx(1), makeTx(2) tr.NotifyAccepted("peerA", []common.Hash{tx1.Hash()}) tr.NotifyAccepted("peerB", []common.Hash{tx2.Hash()}) - // Both included in block 1. chain.addBlock(1, []*types.Transaction{tx1, tx2}) chain.sendHead(1) waitStep(t, tr) - // Finalize. + sig := consumer.last() + if sig.inclusions["peerA"] != 1 { + t.Errorf("peerA inclusions: got %d, want 1", sig.inclusions["peerA"]) + } + if sig.inclusions["peerB"] != 1 { + t.Errorf("peerB inclusions: got %d, want 1", sig.inclusions["peerB"]) + } + if len(sig.finalized) != 0 { + t.Errorf("expected empty finalized map, got %v", sig.finalized) + } +} + +// TestHandleChainHeadEmptyBlock verifies an empty head block emits an +// empty inclusion map (so peerstats can decay all known peers). +func TestHandleChainHeadEmptyBlock(t *testing.T) { + tr := New() + chain := newMockChain() + consumer := &mockConsumer{} + tr.Start(chain, consumer) + defer tr.Stop() + + chain.addBlock(1, nil) + chain.sendHead(1) + waitStep(t, tr) + + sig := consumer.last() + if len(sig.inclusions) != 0 { + t.Errorf("expected empty inclusions, got %v", sig.inclusions) + } +} + +// TestHandleChainHeadEmitsFinalization verifies that when finalization +// advances, the consumer receives per-peer finalization credits +// accumulated over the newly-finalized range. +func TestHandleChainHeadEmitsFinalization(t *testing.T) { + tr := New() + chain := newMockChain() + consumer := &mockConsumer{} + tr.Start(chain, consumer) + defer tr.Stop() + + tx := makeTx(1) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + + // Include in block 1, not yet finalized. + chain.addBlock(1, []*types.Transaction{tx}) + chain.sendHead(1) + waitStep(t, tr) + + if credits := consumer.last().finalized["peerA"]; credits != 0 { + t.Fatalf("expected no finalization credits before finalization, got %d", credits) + } + + // Finalize block 1; next head triggers the finalization scan. chain.setFinalBlock(1) chain.addBlock(2, nil) chain.sendHead(2) waitStep(t, tr) - stats := tr.GetAllPeerStats() - if stats["peerA"].RecentFinalized <= 0 { - t.Fatalf("peerA: expected RecentFinalized>0, got %f", stats["peerA"].RecentFinalized) - } - if stats["peerB"].RecentFinalized <= 0 { - t.Fatalf("peerB: expected RecentFinalized>0, got %f", stats["peerB"].RecentFinalized) + if credits := consumer.last().finalized["peerA"]; credits != 1 { + t.Fatalf("expected 1 finalization credit, got %d", credits) } } -func TestFirstDelivererWins(t *testing.T) { - tr := New() - chain := newMockChain() - tr.Start(chain) - defer tr.Stop() - - tx := makeTx(1) - tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) - tr.NotifyAccepted("peerB", []common.Hash{tx.Hash()}) // duplicate, should be ignored - - chain.addBlock(1, []*types.Transaction{tx}) - chain.sendHead(1) - waitStep(t, tr) - - chain.setFinalBlock(1) - chain.addBlock(2, nil) - chain.sendHead(2) - waitStep(t, tr) - - stats := tr.GetAllPeerStats() - if stats["peerA"].RecentFinalized <= 0 { - t.Fatalf("peerA should be credited, got RecentFinalized=%f", stats["peerA"].RecentFinalized) - } - if stats["peerB"].RecentFinalized != 0 { - t.Fatalf("peerB should NOT be credited, got RecentFinalized=%f", stats["peerB"].RecentFinalized) - } -} - -func TestNoFinalizationCredit(t *testing.T) { - tr := New() - chain := newMockChain() - tr.Start(chain) - defer tr.Stop() - - tx := makeTx(1) - tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) - - // Include but don't finalize. - chain.addBlock(1, []*types.Transaction{tx}) - chain.sendHead(1) - waitStep(t, tr) - - // Send more heads without finalization. - chain.addBlock(2, nil) - chain.sendHead(2) - waitStep(t, tr) - - stats := tr.GetAllPeerStats() - if stats["peerA"].RecentFinalized != 0 { - t.Fatalf("expected RecentFinalized=0 without finalization, got %f", stats["peerA"].RecentFinalized) - } -} - -func TestEMADecay(t *testing.T) { - tr := New() - chain := newMockChain() - tr.Start(chain) - defer tr.Stop() - - tx := makeTx(1) - tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) - - // Include in block 1. - chain.addBlock(1, []*types.Transaction{tx}) - chain.sendHead(1) - waitStep(t, tr) - - // Send 30 empty blocks — EMA should decay close to zero. - for i := uint64(2); i <= 31; i++ { - chain.addBlock(i, nil) - chain.sendHead(i) - waitStep(t, tr) - } - - stats := tr.GetAllPeerStats() - if stats["peerA"].RecentIncluded > 0.02 { - t.Fatalf("expected RecentIncluded near zero after 30 empty blocks, got %f", stats["peerA"].RecentIncluded) - } -} - -// TestReorgSafety verifies that handleChainHead resolves the head block by -// HASH (not just by number), so a head event announcing a sibling block at -// the same height does not credit transactions from the canonical block. -// -// Regression check: if the tracker were changed to use GetBlockByNumber, -// it would always fetch the canonical block A and credit peerA even when -// the head points to sibling B. +// TestReorgSafety verifies the tracker resolves the head block by HASH +// so a head event pointing at a sibling block does not emit inclusions +// from the canonical block at the same height. func TestReorgSafety(t *testing.T) { tr := New() chain := newMockChain() - tr.Start(chain) + consumer := &mockConsumer{} + tr.Start(chain, consumer) defer tr.Stop() tx := makeTx(1) @@ -395,164 +339,30 @@ func TestReorgSafety(t *testing.T) { t.Fatal("sibling blocks ended up with the same hash") } - // Head announces sibling B. A hash-aware tracker fetches B, sees no - // peerA txs, and leaves the EMA at zero. A number-only tracker would - // instead fetch A and credit peerA. + // Head announces sibling B — emit must contain no peerA inclusions. chain.sendHeadBlock(blockB) waitStep(t, tr) - - if got := tr.GetAllPeerStats()["peerA"].RecentIncluded; got != 0 { - t.Fatalf("expected RecentIncluded=0 after sibling-B head event, got %f (tracker followed the wrong block)", got) + if incl := consumer.last().inclusions["peerA"]; incl != 0 { + t.Fatalf("sibling-B head should emit 0 peerA inclusions, got %d", incl) } - // Now announce canonical A; peerA should be credited. + // Head announces canonical A — emit must contain 1 peerA inclusion. chain.sendHeadBlock(blockA) waitStep(t, tr) - - if got := tr.GetAllPeerStats()["peerA"].RecentIncluded; got <= 0 { - t.Fatalf("expected RecentIncluded>0 after canonical-A head event, got %f", got) + if incl := consumer.last().inclusions["peerA"]; incl != 1 { + t.Fatalf("canonical-A head should emit 1 peerA inclusion, got %d", incl) } } -// TestRecentFinalizedDecays verifies that the finalization EMA decays -// for a peer that earned credits in the past but has no new -// finalization activity. The decay is slow (α=0.0001), so we -// just assert monotonic decrease, not convergence to zero. -func TestRecentFinalizedDecays(t *testing.T) { +// TestHandleChainHeadNilConsumer verifies the tracker tolerates a nil +// consumer (useful for tests that only exercise tx-lifecycle behavior). +func TestHandleChainHeadNilConsumer(t *testing.T) { tr := New() chain := newMockChain() - tr.Start(chain) + tr.Start(chain, nil) defer tr.Stop() - tx := makeTx(1) - tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) - - // Include and finalize in block 1. - chain.addBlock(1, []*types.Transaction{tx}) + chain.addBlock(1, nil) chain.sendHead(1) - waitStep(t, tr) - chain.setFinalBlock(1) - chain.addBlock(2, nil) - chain.sendHead(2) - waitStep(t, tr) - - peak := tr.GetAllPeerStats()["peerA"].RecentFinalized - if peak <= 0 { - t.Fatalf("expected RecentFinalized>0 after finalization, got %f", peak) - } - - // Send many empty heads — peer contributes zero each block, - // EMA should decay monotonically. - for i := uint64(3); i <= 50; i++ { - chain.addBlock(i, nil) - chain.sendHead(i) - waitStep(t, tr) - } - - after := tr.GetAllPeerStats()["peerA"].RecentFinalized - if after >= peak { - t.Fatalf("expected RecentFinalized to decay, got %f >= peak %f", after, peak) - } -} - -// TestRequestLatencyFirstSampleBootstrap asserts that the first latency -// sample seeds the EMA directly (no slow ramp-up from zero), and that the -// sample counter starts at 1. -func TestRequestLatencyFirstSampleBootstrap(t *testing.T) { - tr := New() - tr.NotifyRequestLatency("peerA", 200*time.Millisecond) - - stats := tr.GetAllPeerStats() - ps := stats["peerA"] - if ps.RequestLatencyEMA != 200*time.Millisecond { - t.Fatalf("expected first sample to seed EMA at 200ms, got %v", ps.RequestLatencyEMA) - } - if ps.RequestSamples != 1 { - t.Fatalf("expected RequestSamples=1, got %d", ps.RequestSamples) - } -} - -// TestRequestLatencyEMAUpdate verifies the EMA formula (1-α)·old + α·new. -func TestRequestLatencyEMAUpdate(t *testing.T) { - tr := New() - tr.NotifyRequestLatency("peerA", 100*time.Millisecond) - tr.NotifyRequestLatency("peerA", 1000*time.Millisecond) - - // Expected: 0.99*100ms + 0.01*1000ms = 109ms - got := tr.GetAllPeerStats()["peerA"].RequestLatencyEMA - want := 109 * time.Millisecond - delta := got - want - if delta < 0 { - delta = -delta - } - if delta > 1*time.Microsecond { - t.Fatalf("EMA mismatch: got %v, want %v (delta %v)", got, want, delta) - } - if samples := tr.GetAllPeerStats()["peerA"].RequestSamples; samples != 2 { - t.Fatalf("expected RequestSamples=2, got %d", samples) - } -} - -// TestRequestLatencySlowEMAConvergence verifies that the slow alpha -// requires many samples to noticeably shift the EMA. Starting at 100ms -// and feeding 5s (timeout) samples, the EMA should still be well below -// 1s after 50 samples. -func TestRequestLatencySlowEMAConvergence(t *testing.T) { - tr := New() - tr.NotifyRequestLatency("peerA", 100*time.Millisecond) - for i := 0; i < 50; i++ { - tr.NotifyRequestLatency("peerA", 5*time.Second) - } - got := tr.GetAllPeerStats()["peerA"].RequestLatencyEMA - if got < 1*time.Second { - // Expected ≈ (0.99)^50 * 100ms + (1-(0.99)^50) * 5s ≈ 1.99s - // The lower bound proves a meaningful shift; the upper bound (below) - // proves the slow alpha damped the convergence. - t.Fatalf("EMA did not move enough under sustained timeouts, got %v", got) - } - if got > 3*time.Second { - t.Fatalf("EMA converged too fast for slow alpha=0.01, got %v", got) - } -} - -// TestRequestLatencyMultiplePeersIsolated verifies per-peer isolation: a -// sample for peerA does not affect peerB's stats. -func TestRequestLatencyMultiplePeersIsolated(t *testing.T) { - tr := New() - tr.NotifyRequestLatency("peerA", 100*time.Millisecond) - tr.NotifyRequestLatency("peerB", 5*time.Second) - - stats := tr.GetAllPeerStats() - if stats["peerA"].RequestLatencyEMA != 100*time.Millisecond { - t.Errorf("peerA EMA: got %v, want 100ms", stats["peerA"].RequestLatencyEMA) - } - if stats["peerB"].RequestLatencyEMA != 5*time.Second { - t.Errorf("peerB EMA: got %v, want 5s", stats["peerB"].RequestLatencyEMA) - } - if stats["peerA"].RequestSamples != 1 || stats["peerB"].RequestSamples != 1 { - t.Errorf("expected RequestSamples=1 for each peer, got A=%d B=%d", - stats["peerA"].RequestSamples, stats["peerB"].RequestSamples) - } -} - -// TestRequestLatencyPeerDropResetsStats verifies that NotifyPeerDrop -// removes the peer's latency history along with its other stats. -func TestRequestLatencyPeerDropResetsStats(t *testing.T) { - tr := New() - tr.NotifyRequestLatency("peerA", 200*time.Millisecond) - tr.NotifyPeerDrop("peerA") - - if _, ok := tr.GetAllPeerStats()["peerA"]; ok { - t.Fatal("peerA stats should be removed after NotifyPeerDrop") - } - - // A subsequent latency sample re-creates the entry as a fresh peer. - tr.NotifyRequestLatency("peerA", 50*time.Millisecond) - ps := tr.GetAllPeerStats()["peerA"] - if ps.RequestSamples != 1 { - t.Fatalf("expected RequestSamples=1 after re-add, got %d", ps.RequestSamples) - } - if ps.RequestLatencyEMA != 50*time.Millisecond { - t.Fatalf("expected fresh EMA bootstrap, got %v", ps.RequestLatencyEMA) - } + waitStep(t, tr) // should not panic }