From 111d90aef84b99b67fc9a815c1f4bf0b63f07657 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 13 Apr 2026 20:25:53 +0200 Subject: [PATCH] eth/txtracker: track per-peer tx-request response latency Adds NotifyRequestLatency(peer, latency) and a slow per-peer EMA (alpha=0.01, ~70-sample half-life) that the dropper will use as a new protection signal. The first sample seeds the EMA directly so fresh peers don't ramp up from zero. RequestSamples is exposed alongside the EMA so consumers can apply a minimum-samples bootstrap guard before trusting the value. Includes design notes for the broader peerdrop-latency feature. --- eth/txtracker/tracker.go | 56 ++++++++++++++++--- eth/txtracker/tracker_test.go | 102 ++++++++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+), 7 deletions(-) diff --git a/eth/txtracker/tracker.go b/eth/txtracker/tracker.go index 93797cad9d..8c678c6561 100644 --- a/eth/txtracker/tracker.go +++ b/eth/txtracker/tracker.go @@ -28,6 +28,7 @@ package txtracker import ( "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -46,12 +47,22 @@ const ( // sustained contribution over long windows, not recent bursts. // Half-life ≈ 6930 chain heads (~23 hours on 12s blocks). finalizedEMAAlpha = 0.0001 + // EMA smoothing factor for per-request latency average. Slow on purpose: + // short bursts shouldn't shift the score, sustained behavior should. + // Half-life ≈ ln(0.5)/ln(0.99) ≈ 69 samples. + latencyEMAAlpha = 0.01 + // MinLatencySamples is the number of latency samples a peer must accumulate + // before its RequestLatencyEMA is considered meaningful for protection. + // Prevents a single lucky-fast reply from displacing established peers. + MinLatencySamples = 10 ) -// PeerStats holds the per-peer inclusion data. +// PeerStats holds the per-peer inclusion and responsiveness data. type PeerStats struct { - RecentFinalized float64 // EMA of per-block finalization credits (slow) - RecentIncluded float64 // EMA of per-block inclusions (fast) + RecentFinalized float64 // EMA of per-block finalization credits (slow) + RecentIncluded float64 // EMA of per-block inclusions (fast) + RequestLatencyEMA time.Duration // Slow EMA of tx-request response latency (timeouts count as the timeout value) + RequestSamples int64 // Number of latency samples seen for this peer } // Chain is the blockchain interface needed by the tracker. @@ -63,8 +74,10 @@ type Chain interface { } type peerStats struct { - recentFinalized float64 - recentIncluded float64 + recentFinalized float64 + recentIncluded float64 + requestLatencyEMA time.Duration + requestSamples int64 } // Tracker records which peer delivered each transaction and credits peers @@ -155,6 +168,33 @@ func (t *Tracker) NotifyAccepted(peer string, hashes []common.Hash) { } } +// NotifyRequestLatency records a tx-request response latency sample for the +// given peer. Timeouts should be reported as the timeout value (so they count +// against the EMA rather than being silently omitted). The EMA uses a slow +// alpha so isolated bursts don't shift the score appreciably. +// Safe to call from any goroutine. +func (t *Tracker) NotifyRequestLatency(peer string, latency time.Duration) { + t.mu.Lock() + defer t.mu.Unlock() + + ps := t.peers[peer] + if ps == nil { + ps = &peerStats{} + t.peers[peer] = ps + } + if ps.requestSamples == 0 { + // Bootstrap the EMA with the first sample so it doesn't drift up + // from zero over many samples before reaching realistic values. + ps.requestLatencyEMA = latency + } else { + ps.requestLatencyEMA = time.Duration( + float64(ps.requestLatencyEMA)*(1-latencyEMAAlpha) + + float64(latency)*latencyEMAAlpha, + ) + } + ps.requestSamples++ +} + // GetAllPeerStats returns a snapshot of per-peer inclusion statistics. // Safe to call from any goroutine. func (t *Tracker) GetAllPeerStats() map[string]PeerStats { @@ -164,8 +204,10 @@ func (t *Tracker) GetAllPeerStats() map[string]PeerStats { result := make(map[string]PeerStats, len(t.peers)) for id, ps := range t.peers { result[id] = PeerStats{ - RecentFinalized: ps.recentFinalized, - RecentIncluded: ps.recentIncluded, + RecentFinalized: ps.recentFinalized, + RecentIncluded: ps.recentIncluded, + RequestLatencyEMA: ps.requestLatencyEMA, + RequestSamples: ps.requestSamples, } } return result diff --git a/eth/txtracker/tracker_test.go b/eth/txtracker/tracker_test.go index b572597e6e..280694b273 100644 --- a/eth/txtracker/tracker_test.go +++ b/eth/txtracker/tracker_test.go @@ -454,3 +454,105 @@ func TestRecentFinalizedDecays(t *testing.T) { t.Fatalf("expected RecentFinalized to decay, got %f >= peak %f", after, peak) } } + +// TestRequestLatencyFirstSampleBootstrap asserts that the first latency +// sample seeds the EMA directly (no slow ramp-up from zero), and that the +// sample counter starts at 1. +func TestRequestLatencyFirstSampleBootstrap(t *testing.T) { + tr := New() + tr.NotifyRequestLatency("peerA", 200*time.Millisecond) + + stats := tr.GetAllPeerStats() + ps := stats["peerA"] + if ps.RequestLatencyEMA != 200*time.Millisecond { + t.Fatalf("expected first sample to seed EMA at 200ms, got %v", ps.RequestLatencyEMA) + } + if ps.RequestSamples != 1 { + t.Fatalf("expected RequestSamples=1, got %d", ps.RequestSamples) + } +} + +// TestRequestLatencyEMAUpdate verifies the EMA formula (1-α)·old + α·new. +func TestRequestLatencyEMAUpdate(t *testing.T) { + tr := New() + tr.NotifyRequestLatency("peerA", 100*time.Millisecond) + tr.NotifyRequestLatency("peerA", 1000*time.Millisecond) + + // Expected: 0.99*100ms + 0.01*1000ms = 109ms + got := tr.GetAllPeerStats()["peerA"].RequestLatencyEMA + want := 109 * time.Millisecond + delta := got - want + if delta < 0 { + delta = -delta + } + if delta > 1*time.Microsecond { + t.Fatalf("EMA mismatch: got %v, want %v (delta %v)", got, want, delta) + } + if samples := tr.GetAllPeerStats()["peerA"].RequestSamples; samples != 2 { + t.Fatalf("expected RequestSamples=2, got %d", samples) + } +} + +// TestRequestLatencySlowEMAConvergence verifies that the slow alpha +// requires many samples to noticeably shift the EMA. Starting at 100ms +// and feeding 5s (timeout) samples, the EMA should still be well below +// 1s after 50 samples. +func TestRequestLatencySlowEMAConvergence(t *testing.T) { + tr := New() + tr.NotifyRequestLatency("peerA", 100*time.Millisecond) + for i := 0; i < 50; i++ { + tr.NotifyRequestLatency("peerA", 5*time.Second) + } + got := tr.GetAllPeerStats()["peerA"].RequestLatencyEMA + if got < 1*time.Second { + // Expected ≈ (0.99)^50 * 100ms + (1-(0.99)^50) * 5s ≈ 1.99s + // The lower bound proves a meaningful shift; the upper bound (below) + // proves the slow alpha damped the convergence. + t.Fatalf("EMA did not move enough under sustained timeouts, got %v", got) + } + if got > 3*time.Second { + t.Fatalf("EMA converged too fast for slow alpha=0.01, got %v", got) + } +} + +// TestRequestLatencyMultiplePeersIsolated verifies per-peer isolation: a +// sample for peerA does not affect peerB's stats. +func TestRequestLatencyMultiplePeersIsolated(t *testing.T) { + tr := New() + tr.NotifyRequestLatency("peerA", 100*time.Millisecond) + tr.NotifyRequestLatency("peerB", 5*time.Second) + + stats := tr.GetAllPeerStats() + if stats["peerA"].RequestLatencyEMA != 100*time.Millisecond { + t.Errorf("peerA EMA: got %v, want 100ms", stats["peerA"].RequestLatencyEMA) + } + if stats["peerB"].RequestLatencyEMA != 5*time.Second { + t.Errorf("peerB EMA: got %v, want 5s", stats["peerB"].RequestLatencyEMA) + } + if stats["peerA"].RequestSamples != 1 || stats["peerB"].RequestSamples != 1 { + t.Errorf("expected RequestSamples=1 for each peer, got A=%d B=%d", + stats["peerA"].RequestSamples, stats["peerB"].RequestSamples) + } +} + +// TestRequestLatencyPeerDropResetsStats verifies that NotifyPeerDrop +// removes the peer's latency history along with its other stats. +func TestRequestLatencyPeerDropResetsStats(t *testing.T) { + tr := New() + tr.NotifyRequestLatency("peerA", 200*time.Millisecond) + tr.NotifyPeerDrop("peerA") + + if _, ok := tr.GetAllPeerStats()["peerA"]; ok { + t.Fatal("peerA stats should be removed after NotifyPeerDrop") + } + + // A subsequent latency sample re-creates the entry as a fresh peer. + tr.NotifyRequestLatency("peerA", 50*time.Millisecond) + ps := tr.GetAllPeerStats()["peerA"] + if ps.RequestSamples != 1 { + t.Fatalf("expected RequestSamples=1 after re-add, got %d", ps.RequestSamples) + } + if ps.RequestLatencyEMA != 50*time.Millisecond { + t.Fatalf("expected fresh EMA bootstrap, got %v", ps.RequestLatencyEMA) + } +}