eth/peerstats: gate latency protection on sample freshness

The request-latency category scores peers by the reciprocal of their
RequestLatencyEMA, but that EMA is only updated by NotifyRequestLatency
— which only fires when the tx fetcher sends a request to the peer.
A peer can serve a burst of fast replies to build a strong EMA, stop
announcing transactions so we never request from them again, and
retain latency protection indefinitely with a frozen score.

Record LastLatencySample (wall-clock time) per peer alongside the EMA
update. In the dropper's scoring function, return 0 when the last
sample is older than MaxLatencyStaleness (10 minutes). Fresh samples
reset the clock, so peers that resume activity become eligible again.

Timestamps rather than block counts: real-time is what we actually
care about (10 minutes idle), not a block count that varies with
chain pace, and the EMA itself is a time.Duration so measuring
staleness in the same domain stays consistent.

Tests cover the timestamp update on NotifyRequestLatency, the timestamp
advancing on successive samples, and the dropper rejecting a stale
peer whose EMA and sample count would otherwise qualify.
This commit is contained in:
Csaba Kiraly 2026-04-16 01:07:58 +02:00
parent b178ec9a4a
commit 89222edba9
4 changed files with 86 additions and 0 deletions

View file

@ -85,6 +85,12 @@ var protectionCategories = []protectionCategory{
if s.RequestSamples < peerstats.MinLatencySamples {
return 0
}
// Freshness gate: a peer that earned a fast EMA but then went
// silent on announcements (no requests → no fresh samples) must
// not keep that score indefinitely. Ignore stale data.
if time.Since(s.LastLatencySample) > peerstats.MaxLatencyStaleness {
return 0
}
if s.RequestLatencyEMA <= 0 {
return 0
}

View file

@ -244,14 +244,17 @@ func TestProtectedByPoolRequestLatencyBasic(t *testing.T) {
stats[dialed[0].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 50 * time.Millisecond,
RequestSamples: peerstats.MinLatencySamples,
LastLatencySample: time.Now(),
}
stats[dialed[1].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 100 * time.Millisecond,
RequestSamples: peerstats.MinLatencySamples,
LastLatencySample: time.Now(),
}
stats[dialed[2].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 2 * time.Second,
RequestSamples: peerstats.MinLatencySamples,
LastLatencySample: time.Now(),
}
protected := protectedPeersByPool(nil, dialed, stats)
@ -285,6 +288,7 @@ func TestProtectedByPoolRequestLatencyBootstrapGuard(t *testing.T) {
stats[dialed[1].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 500 * time.Millisecond,
RequestSamples: peerstats.MinLatencySamples,
LastLatencySample: time.Now(),
}
protected := protectedPeersByPool(nil, dialed, stats)
@ -314,6 +318,7 @@ func TestProtectedByPoolRequestLatencyPerPool(t *testing.T) {
stats[p.ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 50 * time.Millisecond,
RequestSamples: peerstats.MinLatencySamples,
LastLatencySample: time.Now(),
}
}
// Dialed peers are slower (1s) — globally they would all lose, but
@ -322,6 +327,7 @@ func TestProtectedByPoolRequestLatencyPerPool(t *testing.T) {
stats[p.ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 1 * time.Second,
RequestSamples: peerstats.MinLatencySamples,
LastLatencySample: time.Now(),
}
}
@ -338,3 +344,35 @@ func TestProtectedByPoolRequestLatencyPerPool(t *testing.T) {
t.Fatalf("expected 2 dialed peers protected by per-pool top-N, got %d", dialedProtected)
}
}
// TestProtectedByPoolRequestLatencyStale verifies that the freshness gate
// excludes peers whose latency EMA is valid (meeting the sample count and
// fast value) but whose last sample is older than MaxLatencyStaleness.
// A peer cannot serve a burst of fast replies, go silent on announcements,
// and keep latency-based protection indefinitely.
func TestProtectedByPoolRequestLatencyStale(t *testing.T) {
dialed := makePeers(20)
stats := make(map[string]peerstats.PeerStats)
// Fresh, fast peer — should be protected.
stats[dialed[0].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 50 * time.Millisecond,
RequestSamples: peerstats.MinLatencySamples,
LastLatencySample: time.Now(),
}
// Stale, fast peer — was fast, but hasn't answered in too long.
// Same EMA and sample count as the fresh peer; only staleness differs.
stats[dialed[1].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 50 * time.Millisecond,
RequestSamples: peerstats.MinLatencySamples,
LastLatencySample: time.Now().Add(-2 * peerstats.MaxLatencyStaleness),
}
protected := protectedPeersByPool(nil, dialed, stats)
if !protected[dialed[0]] {
t.Error("fresh fast peer must be protected")
}
if protected[dialed[1]] {
t.Error("stale peer must NOT keep latency protection despite fast EMA")
}
}

View file

@ -53,6 +53,13 @@ const (
// before its RequestLatencyEMA is considered meaningful for protection.
// Prevents a single lucky-fast reply from displacing established peers.
MinLatencySamples = 100
// MaxLatencyStaleness is the oldest allowed age of a peer's last
// latency sample before their RequestLatencyEMA is disregarded for
// protection. Prevents a peer from earning a fast score during a
// burst of activity and then holding protection indefinitely by
// going silent on tx announcements (no further requests → no fresh
// samples → EMA frozen at its last value).
MaxLatencyStaleness = 10 * time.Minute
)
// PeerStats is the exported per-peer snapshot returned by GetAllPeerStats.
@ -61,6 +68,7 @@ type PeerStats struct {
RecentIncluded float64 // EMA of per-block inclusions (fast)
RequestLatencyEMA time.Duration // Slow EMA of tx-request response latency (timeouts count as the timeout value)
RequestSamples int64 // Number of latency samples seen (for bootstrap guard)
LastLatencySample time.Time // Wall-clock time of the most recent latency sample (for staleness gate)
}
// peerStats is the internal mutable state per peer.
@ -69,6 +77,7 @@ type peerStats struct {
recentIncluded float64
requestLatencyEMA time.Duration
requestSamples int64
lastLatencySample time.Time
}
// Stats is the per-peer quality aggregator.
@ -141,6 +150,7 @@ func (s *Stats) NotifyRequestLatency(peer string, latency time.Duration) {
)
}
ps.requestSamples++
ps.lastLatencySample = time.Now()
}
// NotifyPeerDrop removes a peer's stats on disconnect. A rare stale
@ -166,6 +176,7 @@ func (s *Stats) GetAllPeerStats() map[string]PeerStats {
RecentIncluded: ps.recentIncluded,
RequestLatencyEMA: ps.requestLatencyEMA,
RequestSamples: ps.requestSamples,
LastLatencySample: ps.lastLatencySample,
}
}
return result

View file

@ -221,3 +221,34 @@ func TestMultiplePeersIsolated(t *testing.T) {
t.Errorf("peerB latency: got %v, want 5s", stats["peerB"].RequestLatencyEMA)
}
}
// TestLatencyTimestampSet verifies that NotifyRequestLatency stamps the
// peer's LastLatencySample with approximately time.Now().
func TestLatencyTimestampSet(t *testing.T) {
s := New()
before := time.Now()
s.NotifyRequestLatency("peerA", 100*time.Millisecond)
after := time.Now()
got := s.GetAllPeerStats()["peerA"].LastLatencySample
if got.Before(before) || got.After(after) {
t.Fatalf("LastLatencySample = %v not in [%v, %v]", got, before, after)
}
}
// TestLatencyTimestampUpdatesOnEachSample verifies that a later
// NotifyRequestLatency call advances LastLatencySample.
func TestLatencyTimestampUpdatesOnEachSample(t *testing.T) {
s := New()
s.NotifyRequestLatency("peerA", 100*time.Millisecond)
first := s.GetAllPeerStats()["peerA"].LastLatencySample
// Small sleep so the second timestamp is detectably later.
time.Sleep(2 * time.Millisecond)
s.NotifyRequestLatency("peerA", 200*time.Millisecond)
second := s.GetAllPeerStats()["peerA"].LastLatencySample
if !second.After(first) {
t.Fatalf("expected second sample timestamp > first, got first=%v second=%v", first, second)
}
}