eth: rename NotifyRequestLatency to NotifyRequestResult, track success/timeout counts

Replace NotifyRequestLatency(peer, latency) with
NotifyRequestResult(peer, latency, timeout). The new timeout bool
tells peerstats whether the request was answered or timed out.

Per-peer RequestSuccesses and RequestTimeouts counters replace the
single RequestSamples field — any two of the three are derivable, so
we keep the two primary counters and derive the total
(successes + timeouts) where needed (e.g. the MinLatencySamples
guard in the dropper).

The latency EMA continues to use the timeout value for timed-out
requests, penalizing slow peers as before. The success/timeout
counters are exposed as statistics only — no protection category
uses them yet.
This commit is contained in:
Csaba Kiraly 2026-04-16 15:16:29 +02:00
parent 89222edba9
commit b6b6345be9
7 changed files with 139 additions and 91 deletions

View file

@ -82,7 +82,7 @@ var protectionCategories = []protectionCategory{
// whose EMA reaches the timeout also score 0 by this path because // whose EMA reaches the timeout also score 0 by this path because
// the reciprocal of a very large duration is tiny but positive; the // the reciprocal of a very large duration is tiny but positive; the
// per-pool top-N will still push faster peers ahead of them. // per-pool top-N will still push faster peers ahead of them.
if s.RequestSamples < peerstats.MinLatencySamples { if s.RequestSuccesses+s.RequestTimeouts < peerstats.MinLatencySamples {
return 0 return 0
} }
// Freshness gate: a peer that earned a fast EMA but then went // Freshness gate: a peer that earned a fast EMA but then went

View file

@ -243,17 +243,17 @@ func TestProtectedByPoolRequestLatencyBasic(t *testing.T) {
// Three peers have enough samples; the two fastest should win. // Three peers have enough samples; the two fastest should win.
stats[dialed[0].ID().String()] = peerstats.PeerStats{ stats[dialed[0].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 50 * time.Millisecond, RequestLatencyEMA: 50 * time.Millisecond,
RequestSamples: peerstats.MinLatencySamples, RequestSuccesses: peerstats.MinLatencySamples,
LastLatencySample: time.Now(), LastLatencySample: time.Now(),
} }
stats[dialed[1].ID().String()] = peerstats.PeerStats{ stats[dialed[1].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 100 * time.Millisecond, RequestLatencyEMA: 100 * time.Millisecond,
RequestSamples: peerstats.MinLatencySamples, RequestSuccesses: peerstats.MinLatencySamples,
LastLatencySample: time.Now(), LastLatencySample: time.Now(),
} }
stats[dialed[2].ID().String()] = peerstats.PeerStats{ stats[dialed[2].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 2 * time.Second, RequestLatencyEMA: 2 * time.Second,
RequestSamples: peerstats.MinLatencySamples, RequestSuccesses: peerstats.MinLatencySamples,
LastLatencySample: time.Now(), LastLatencySample: time.Now(),
} }
@ -282,12 +282,12 @@ func TestProtectedByPoolRequestLatencyBootstrapGuard(t *testing.T) {
// A lucky-fast peer with only 1 sample — must NOT be protected. // A lucky-fast peer with only 1 sample — must NOT be protected.
stats[dialed[0].ID().String()] = peerstats.PeerStats{ stats[dialed[0].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 1 * time.Millisecond, RequestLatencyEMA: 1 * time.Millisecond,
RequestSamples: 1, RequestSuccesses: 1,
} }
// A warmed-up but slower peer — should be protected on latency. // A warmed-up but slower peer — should be protected on latency.
stats[dialed[1].ID().String()] = peerstats.PeerStats{ stats[dialed[1].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 500 * time.Millisecond, RequestLatencyEMA: 500 * time.Millisecond,
RequestSamples: peerstats.MinLatencySamples, RequestSuccesses: peerstats.MinLatencySamples,
LastLatencySample: time.Now(), LastLatencySample: time.Now(),
} }
@ -317,7 +317,7 @@ func TestProtectedByPoolRequestLatencyPerPool(t *testing.T) {
for _, p := range inbound { for _, p := range inbound {
stats[p.ID().String()] = peerstats.PeerStats{ stats[p.ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 50 * time.Millisecond, RequestLatencyEMA: 50 * time.Millisecond,
RequestSamples: peerstats.MinLatencySamples, RequestSuccesses: peerstats.MinLatencySamples,
LastLatencySample: time.Now(), LastLatencySample: time.Now(),
} }
} }
@ -326,7 +326,7 @@ func TestProtectedByPoolRequestLatencyPerPool(t *testing.T) {
for _, p := range dialed { for _, p := range dialed {
stats[p.ID().String()] = peerstats.PeerStats{ stats[p.ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 1 * time.Second, RequestLatencyEMA: 1 * time.Second,
RequestSamples: peerstats.MinLatencySamples, RequestSuccesses: peerstats.MinLatencySamples,
LastLatencySample: time.Now(), LastLatencySample: time.Now(),
} }
} }
@ -356,14 +356,14 @@ func TestProtectedByPoolRequestLatencyStale(t *testing.T) {
// Fresh, fast peer — should be protected. // Fresh, fast peer — should be protected.
stats[dialed[0].ID().String()] = peerstats.PeerStats{ stats[dialed[0].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 50 * time.Millisecond, RequestLatencyEMA: 50 * time.Millisecond,
RequestSamples: peerstats.MinLatencySamples, RequestSuccesses: peerstats.MinLatencySamples,
LastLatencySample: time.Now(), LastLatencySample: time.Now(),
} }
// Stale, fast peer — was fast, but hasn't answered in too long. // Stale, fast peer — was fast, but hasn't answered in too long.
// Same EMA and sample count as the fresh peer; only staleness differs. // Same EMA and sample count as the fresh peer; only staleness differs.
stats[dialed[1].ID().String()] = peerstats.PeerStats{ stats[dialed[1].ID().String()] = peerstats.PeerStats{
RequestLatencyEMA: 50 * time.Millisecond, RequestLatencyEMA: 50 * time.Millisecond,
RequestSamples: peerstats.MinLatencySamples, RequestSuccesses: peerstats.MinLatencySamples,
LastLatencySample: time.Now().Add(-2 * peerstats.MaxLatencyStaleness), LastLatencySample: time.Now().Add(-2 * peerstats.MaxLatencyStaleness),
} }

View file

@ -185,7 +185,7 @@ type TxFetcher struct {
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation dropPeer func(string) // Drops a peer in case of announcement violation
onAccepted func(peer string, hashes []common.Hash) // Optional: notified with accepted tx hashes per peer onAccepted func(peer string, hashes []common.Hash) // Optional: notified with accepted tx hashes per peer
onRequestLatency func(peer string, latency time.Duration) // Optional: notified once per completed/timed-out tx request onRequestResult func(peer string, latency time.Duration, timeout bool) // Optional: notified once per completed/timed-out tx request
step chan struct{} // Notification channel when the fetcher loop iterates step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Monotonic clock or simulated clock for tests clock mclock.Clock // Monotonic clock or simulated clock for tests
@ -196,15 +196,15 @@ type TxFetcher struct {
// NewTxFetcher creates a transaction fetcher to retrieve transaction // NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements. // based on hash announcements.
// Chain can be nil to disable on-chain checks. // Chain can be nil to disable on-chain checks.
func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestLatency func(string, time.Duration)) *TxFetcher { func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestResult func(string, time.Duration, bool)) *TxFetcher {
return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, onRequestLatency, mclock.System{}, time.Now, nil) return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, onRequestResult, mclock.System{}, time.Now, nil)
} }
// NewTxFetcherForTests is a testing method to mock out the realtime clock with // NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one. // a simulated version and the internal randomness with a deterministic one.
// Chain can be nil to disable on-chain checks. // Chain can be nil to disable on-chain checks.
func NewTxFetcherForTests( func NewTxFetcherForTests(
chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestLatency func(string, time.Duration), chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestResult func(string, time.Duration, bool),
clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher { clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{ return &TxFetcher{
notify: make(chan *txAnnounce), notify: make(chan *txAnnounce),
@ -227,7 +227,7 @@ func NewTxFetcherForTests(
fetchTxs: fetchTxs, fetchTxs: fetchTxs,
dropPeer: dropPeer, dropPeer: dropPeer,
onAccepted: onAccepted, onAccepted: onAccepted,
onRequestLatency: onRequestLatency, onRequestResult: onRequestResult,
clock: clock, clock: clock,
realTime: realTime, realTime: realTime,
rand: rand, rand: rand,
@ -680,8 +680,8 @@ func (f *TxFetcher) loop() {
// itself, so a peer that times out repeatedly drags its // itself, so a peer that times out repeatedly drags its
// score down without us having to wait for an eventual // score down without us having to wait for an eventual
// (possibly never-arriving) reply. // (possibly never-arriving) reply.
if f.onRequestLatency != nil { if f.onRequestResult != nil {
f.onRequestLatency(peer, txFetchTimeout) f.onRequestResult(peer, txFetchTimeout, true)
} }
} }
} }
@ -781,9 +781,9 @@ func (f *TxFetcher) loop() {
txFetcherSlowWait.Update(time.Duration(f.clock.Now() - req.time).Nanoseconds()) txFetcherSlowWait.Update(time.Duration(f.clock.Now() - req.time).Nanoseconds())
// Already counted as a timeout sample at the timeout site; // Already counted as a timeout sample at the timeout site;
// don't double-record on eventual delivery. // don't double-record on eventual delivery.
} else if f.onRequestLatency != nil { } else if f.onRequestResult != nil {
// Normal in-time delivery. Record the actual round-trip. // Normal in-time delivery. Record the actual round-trip.
f.onRequestLatency(delivery.origin, time.Duration(f.clock.Now()-req.time)) f.onRequestResult(delivery.origin, time.Duration(f.clock.Now()-req.time), false)
} }
delete(f.requests, delivery.origin) delete(f.requests, delivery.origin)

View file

@ -2287,53 +2287,51 @@ func TestTransactionForgotten(t *testing.T) {
} }
} }
// latencyRecorder is a thread-safe recorder for onRequestLatency callbacks. // resultRecorder is a thread-safe recorder for onRequestResult callbacks.
type latencyRecorder struct { type resultRecorder struct {
mu sync.Mutex mu sync.Mutex
samples []latencySample samples []resultSample
} }
type latencySample struct { type resultSample struct {
peer string peer string
latency time.Duration latency time.Duration
timeout bool
} }
func (r *latencyRecorder) record(peer string, latency time.Duration) { func (r *resultRecorder) record(peer string, latency time.Duration, timeout bool) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.samples = append(r.samples, latencySample{peer, latency}) r.samples = append(r.samples, resultSample{peer, latency, timeout})
} }
func (r *latencyRecorder) snapshot() []latencySample { func (r *resultRecorder) snapshot() []resultSample {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
out := make([]latencySample, len(r.samples)) out := make([]resultSample, len(r.samples))
copy(out, r.samples) copy(out, r.samples)
return out return out
} }
// TestTransactionFetcherRequestLatencyOnDelivery asserts that an in-time // TestTransactionFetcherRequestResultOnDelivery asserts that an in-time
// direct delivery of a requested batch fires the onRequestLatency callback // direct delivery fires the onRequestResult callback with timeout=false.
// exactly once with the actual round-trip latency. func TestTransactionFetcherRequestResultOnDelivery(t *testing.T) {
func TestTransactionFetcherRequestLatencyOnDelivery(t *testing.T) { rec := &resultRecorder{}
rec := &latencyRecorder{}
testTransactionFetcherParallel(t, txFetcherTest{ testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher { init: func() *TxFetcher {
f := newTestTxFetcher() f := newTestTxFetcher()
f.onRequestLatency = rec.record f.onRequestResult = rec.record
return f return f
}, },
steps: []interface{}{ steps: []interface{}{
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
// Wait for the announce-arrival timer; request is dispatched at this point.
doWait{time: txArriveTimeout, step: true}, doWait{time: txArriveTimeout, step: true},
// Simulate 200ms round-trip before the response arrives.
doWait{time: 200 * time.Millisecond, step: false}, doWait{time: 200 * time.Millisecond, step: false},
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true},
doFunc(func() { doFunc(func() {
samples := rec.snapshot() samples := rec.snapshot()
if len(samples) != 1 { if len(samples) != 1 {
t.Fatalf("expected 1 latency sample, got %d (%v)", len(samples), samples) t.Fatalf("expected 1 sample, got %d (%v)", len(samples), samples)
} }
if samples[0].peer != "A" { if samples[0].peer != "A" {
t.Errorf("peer mismatch: got %q, want A", samples[0].peer) t.Errorf("peer mismatch: got %q, want A", samples[0].peer)
@ -2341,28 +2339,28 @@ func TestTransactionFetcherRequestLatencyOnDelivery(t *testing.T) {
if samples[0].latency != 200*time.Millisecond { if samples[0].latency != 200*time.Millisecond {
t.Errorf("latency mismatch: got %v, want 200ms", samples[0].latency) t.Errorf("latency mismatch: got %v, want 200ms", samples[0].latency)
} }
if samples[0].timeout {
t.Error("expected timeout=false for delivery")
}
}), }),
}, },
}) })
} }
// TestTransactionFetcherRequestLatencyOnTimeout asserts that when a request // TestTransactionFetcherRequestResultOnTimeout asserts that a timed-out
// times out (no reply within txFetchTimeout), onRequestLatency fires once // request fires onRequestResult with timeout=true and the timeout value,
// with the timeout value, and a subsequent (late) delivery does not fire // and a subsequent (late) delivery does not fire a duplicate sample.
// a duplicate sample. func TestTransactionFetcherRequestResultOnTimeout(t *testing.T) {
func TestTransactionFetcherRequestLatencyOnTimeout(t *testing.T) { rec := &resultRecorder{}
rec := &latencyRecorder{}
testTransactionFetcherParallel(t, txFetcherTest{ testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher { init: func() *TxFetcher {
f := newTestTxFetcher() f := newTestTxFetcher()
f.onRequestLatency = rec.record f.onRequestResult = rec.record
return f return f
}, },
steps: []interface{}{ steps: []interface{}{
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txArriveTimeout, step: true}, doWait{time: txArriveTimeout, step: true},
// Push the clock past the request deadline; the timeout handler
// should fire and record a single timeout-valued sample.
doWait{time: txFetchTimeout, step: true}, doWait{time: txFetchTimeout, step: true},
doFunc(func() { doFunc(func() {
samples := rec.snapshot() samples := rec.snapshot()
@ -2375,13 +2373,14 @@ func TestTransactionFetcherRequestLatencyOnTimeout(t *testing.T) {
if samples[0].latency != txFetchTimeout { if samples[0].latency != txFetchTimeout {
t.Errorf("latency mismatch: got %v, want %v", samples[0].latency, txFetchTimeout) t.Errorf("latency mismatch: got %v, want %v", samples[0].latency, txFetchTimeout)
} }
if !samples[0].timeout {
t.Error("expected timeout=true for timed-out request")
}
}), }),
// A late reply from the slow peer must not produce a second sample.
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true},
doFunc(func() { doFunc(func() {
samples := rec.snapshot() if len(rec.snapshot()) != 1 {
if len(samples) != 1 { t.Fatalf("late delivery double-counted: got %d samples, want 1", len(rec.snapshot()))
t.Fatalf("late delivery double-counted latency: got %d samples, want 1", len(samples))
} }
}), }),
}, },

View file

@ -194,7 +194,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
} }
h.txTracker = txtracker.New() h.txTracker = txtracker.New()
h.peerStats = peerstats.New() h.peerStats = peerstats.New()
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted, h.peerStats.NotifyRequestLatency) h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted, h.peerStats.NotifyRequestResult)
return h, nil return h, nil
} }

View file

@ -26,9 +26,10 @@
// Signal sources: // Signal sources:
// - NotifyBlock(inclusions, finalized) — per-block deltas from txtracker // - NotifyBlock(inclusions, finalized) — per-block deltas from txtracker
// (computed under txtracker's own lock, then passed in after release) // (computed under txtracker's own lock, then passed in after release)
// - NotifyRequestLatency(peer, latency) — per-request samples from the // - NotifyRequestResult(peer, latency, timeout) — per-request outcomes
// fetcher; timeouts are reported with the timeout value so slow peers // from the fetcher; timeouts are reported with the timeout value so
// contribute to the EMA // slow peers contribute to the EMA, and the timeout flag increments
// the per-peer timeout counter
// - NotifyPeerDrop(peer) — called from the handler on disconnect // - NotifyPeerDrop(peer) — called from the handler on disconnect
package peerstats package peerstats
@ -67,8 +68,9 @@ type PeerStats struct {
RecentFinalized float64 // EMA of per-block finalization credits (slow) RecentFinalized float64 // EMA of per-block finalization credits (slow)
RecentIncluded float64 // EMA of per-block inclusions (fast) RecentIncluded float64 // EMA of per-block inclusions (fast)
RequestLatencyEMA time.Duration // Slow EMA of tx-request response latency (timeouts count as the timeout value) RequestLatencyEMA time.Duration // Slow EMA of tx-request response latency (timeouts count as the timeout value)
RequestSamples int64 // Number of latency samples seen (for bootstrap guard) RequestSuccesses int64 // Requests answered before timeout
LastLatencySample time.Time // Wall-clock time of the most recent latency sample (for staleness gate) RequestTimeouts int64 // Requests that timed out
LastLatencySample time.Time // Wall-clock time of the most recent request result (for staleness gate)
} }
// peerStats is the internal mutable state per peer. // peerStats is the internal mutable state per peer.
@ -76,7 +78,8 @@ type peerStats struct {
recentFinalized float64 recentFinalized float64
recentIncluded float64 recentIncluded float64
requestLatencyEMA time.Duration requestLatencyEMA time.Duration
requestSamples int64 requestSuccesses int64
requestTimeouts int64
lastLatencySample time.Time lastLatencySample time.Time
} }
@ -126,11 +129,13 @@ func (s *Stats) NotifyBlock(inclusions, finalized map[string]int) {
} }
} }
// NotifyRequestLatency records a tx-request response latency sample for // NotifyRequestResult records a tx-request outcome for the given peer.
// the given peer. Timeouts should be reported as the timeout value. // latency is the round-trip time (for timeouts, pass the timeout value).
// Creates a peer entry if one doesn't exist (a peer may have latency // timeout indicates whether the request timed out rather than receiving a
// samples before any inclusion signal). // normal delivery. Both cases update the latency EMA; the timeout flag
func (s *Stats) NotifyRequestLatency(peer string, latency time.Duration) { // additionally increments the per-peer timeout counter.
// Creates a peer entry if one doesn't exist.
func (s *Stats) NotifyRequestResult(peer string, latency time.Duration, timeout bool) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -139,7 +144,7 @@ func (s *Stats) NotifyRequestLatency(peer string, latency time.Duration) {
ps = &peerStats{} ps = &peerStats{}
s.peers[peer] = ps s.peers[peer] = ps
} }
if ps.requestSamples == 0 { if ps.requestSuccesses+ps.requestTimeouts == 0 {
// Bootstrap the EMA with the first sample so it doesn't drift up // Bootstrap the EMA with the first sample so it doesn't drift up
// from zero over many samples before reaching realistic values. // from zero over many samples before reaching realistic values.
ps.requestLatencyEMA = latency ps.requestLatencyEMA = latency
@ -149,7 +154,11 @@ func (s *Stats) NotifyRequestLatency(peer string, latency time.Duration) {
float64(latency)*latencyEMAAlpha, float64(latency)*latencyEMAAlpha,
) )
} }
ps.requestSamples++ if timeout {
ps.requestTimeouts++
} else {
ps.requestSuccesses++
}
ps.lastLatencySample = time.Now() ps.lastLatencySample = time.Now()
} }
@ -175,7 +184,8 @@ func (s *Stats) GetAllPeerStats() map[string]PeerStats {
RecentFinalized: ps.recentFinalized, RecentFinalized: ps.recentFinalized,
RecentIncluded: ps.recentIncluded, RecentIncluded: ps.recentIncluded,
RequestLatencyEMA: ps.requestLatencyEMA, RequestLatencyEMA: ps.requestLatencyEMA,
RequestSamples: ps.requestSamples, RequestSuccesses: ps.requestSuccesses,
RequestTimeouts: ps.requestTimeouts,
LastLatencySample: ps.lastLatencySample, LastLatencySample: ps.lastLatencySample,
} }
} }

View file

@ -114,26 +114,29 @@ func TestNotifyBlockInclusionEMAUpdate(t *testing.T) {
} }
} }
// TestNotifyRequestLatencyFirstSampleBootstrap asserts that the first // TestNotifyRequestResultFirstSampleBootstrap asserts that the first
// latency sample seeds the EMA directly. // latency sample seeds the EMA directly.
func TestNotifyRequestLatencyFirstSampleBootstrap(t *testing.T) { func TestNotifyRequestResultFirstSampleBootstrap(t *testing.T) {
s := New() s := New()
s.NotifyRequestLatency("peerA", 200*time.Millisecond) s.NotifyRequestResult("peerA", 200*time.Millisecond, false)
ps := s.GetAllPeerStats()["peerA"] ps := s.GetAllPeerStats()["peerA"]
if ps.RequestLatencyEMA != 200*time.Millisecond { if ps.RequestLatencyEMA != 200*time.Millisecond {
t.Fatalf("expected first sample to seed EMA at 200ms, got %v", ps.RequestLatencyEMA) t.Fatalf("expected first sample to seed EMA at 200ms, got %v", ps.RequestLatencyEMA)
} }
if ps.RequestSamples != 1 { if ps.RequestSuccesses != 1 {
t.Fatalf("expected RequestSamples=1, got %d", ps.RequestSamples) t.Fatalf("expected RequestSuccesses=1, got %d", ps.RequestSuccesses)
}
if ps.RequestTimeouts != 0 {
t.Fatalf("expected RequestTimeouts=0, got %d", ps.RequestTimeouts)
} }
} }
// TestNotifyRequestLatencyEMAUpdate verifies the EMA formula for latency. // TestNotifyRequestResultEMAUpdate verifies the EMA formula for latency.
func TestNotifyRequestLatencyEMAUpdate(t *testing.T) { func TestNotifyRequestResultEMAUpdate(t *testing.T) {
s := New() s := New()
s.NotifyRequestLatency("peerA", 100*time.Millisecond) s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
s.NotifyRequestLatency("peerA", 1000*time.Millisecond) s.NotifyRequestResult("peerA", 1000*time.Millisecond, false)
// Expected: 0.99*100ms + 0.01*1000ms = 109ms // Expected: 0.99*100ms + 0.01*1000ms = 109ms
got := s.GetAllPeerStats()["peerA"].RequestLatencyEMA got := s.GetAllPeerStats()["peerA"].RequestLatencyEMA
@ -145,18 +148,19 @@ func TestNotifyRequestLatencyEMAUpdate(t *testing.T) {
if delta > 1*time.Microsecond { if delta > 1*time.Microsecond {
t.Fatalf("EMA mismatch: got %v, want %v", got, want) t.Fatalf("EMA mismatch: got %v, want %v", got, want)
} }
if samples := s.GetAllPeerStats()["peerA"].RequestSamples; samples != 2 { ps := s.GetAllPeerStats()["peerA"]
t.Fatalf("expected RequestSamples=2, got %d", samples) if ps.RequestSuccesses != 2 {
t.Fatalf("expected RequestSuccesses=2, got %d", ps.RequestSuccesses)
} }
} }
// TestNotifyRequestLatencySlowConvergence verifies the slow alpha // TestNotifyRequestResultSlowConvergence verifies the slow alpha
// damps convergence under sustained timeouts. // damps convergence under sustained timeouts.
func TestNotifyRequestLatencySlowConvergence(t *testing.T) { func TestNotifyRequestResultSlowConvergence(t *testing.T) {
s := New() s := New()
s.NotifyRequestLatency("peerA", 100*time.Millisecond) s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
for i := 0; i < 50; i++ { for i := 0; i < 50; i++ {
s.NotifyRequestLatency("peerA", 5*time.Second) s.NotifyRequestResult("peerA", 5*time.Second, false)
} }
got := s.GetAllPeerStats()["peerA"].RequestLatencyEMA got := s.GetAllPeerStats()["peerA"].RequestLatencyEMA
if got < 1*time.Second { if got < 1*time.Second {
@ -171,7 +175,7 @@ func TestNotifyRequestLatencySlowConvergence(t *testing.T) {
// from GetAllPeerStats. // from GetAllPeerStats.
func TestNotifyPeerDropClearsStats(t *testing.T) { func TestNotifyPeerDropClearsStats(t *testing.T) {
s := New() s := New()
s.NotifyRequestLatency("peerA", 200*time.Millisecond) s.NotifyRequestResult("peerA", 200*time.Millisecond, false)
s.NotifyPeerDrop("peerA") s.NotifyPeerDrop("peerA")
if _, ok := s.GetAllPeerStats()["peerA"]; ok { if _, ok := s.GetAllPeerStats()["peerA"]; ok {
@ -184,14 +188,14 @@ func TestNotifyPeerDropClearsStats(t *testing.T) {
// dropper's MinLatencySamples=100 guard ensures this is harmless. // dropper's MinLatencySamples=100 guard ensures this is harmless.
func TestStaleRequestLatencyAfterDrop(t *testing.T) { func TestStaleRequestLatencyAfterDrop(t *testing.T) {
s := New() s := New()
s.NotifyRequestLatency("peerA", 200*time.Millisecond) s.NotifyRequestResult("peerA", 200*time.Millisecond, false)
s.NotifyPeerDrop("peerA") s.NotifyPeerDrop("peerA")
// Late sample racing with the drop. // Late sample racing with the drop.
s.NotifyRequestLatency("peerA", 50*time.Millisecond) s.NotifyRequestResult("peerA", 50*time.Millisecond, false)
ps := s.GetAllPeerStats()["peerA"] ps := s.GetAllPeerStats()["peerA"]
if ps.RequestSamples != 1 { if ps.RequestSuccesses != 1 {
t.Fatalf("expected fresh RequestSamples=1, got %d", ps.RequestSamples) t.Fatalf("expected fresh RequestSuccesses=1, got %d", ps.RequestSuccesses)
} }
if ps.RequestLatencyEMA != 50*time.Millisecond { if ps.RequestLatencyEMA != 50*time.Millisecond {
t.Fatalf("expected fresh bootstrap at 50ms, got %v", ps.RequestLatencyEMA) t.Fatalf("expected fresh bootstrap at 50ms, got %v", ps.RequestLatencyEMA)
@ -204,8 +208,8 @@ func TestStaleRequestLatencyAfterDrop(t *testing.T) {
func TestMultiplePeersIsolated(t *testing.T) { func TestMultiplePeersIsolated(t *testing.T) {
s := New() s := New()
s.NotifyBlock(map[string]int{"peerA": 5, "peerB": 0}, nil) s.NotifyBlock(map[string]int{"peerA": 5, "peerB": 0}, nil)
s.NotifyRequestLatency("peerA", 100*time.Millisecond) s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
s.NotifyRequestLatency("peerB", 5*time.Second) s.NotifyRequestResult("peerB", 5*time.Second, false)
s.NotifyBlock(nil, map[string]int{"peerA": 2}) s.NotifyBlock(nil, map[string]int{"peerA": 2})
stats := s.GetAllPeerStats() stats := s.GetAllPeerStats()
@ -222,12 +226,12 @@ func TestMultiplePeersIsolated(t *testing.T) {
} }
} }
// TestLatencyTimestampSet verifies that NotifyRequestLatency stamps the // TestLatencyTimestampSet verifies that NotifyRequestResult stamps the
// peer's LastLatencySample with approximately time.Now(). // peer's LastLatencySample with approximately time.Now().
func TestLatencyTimestampSet(t *testing.T) { func TestLatencyTimestampSet(t *testing.T) {
s := New() s := New()
before := time.Now() before := time.Now()
s.NotifyRequestLatency("peerA", 100*time.Millisecond) s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
after := time.Now() after := time.Now()
got := s.GetAllPeerStats()["peerA"].LastLatencySample got := s.GetAllPeerStats()["peerA"].LastLatencySample
@ -237,18 +241,53 @@ func TestLatencyTimestampSet(t *testing.T) {
} }
// TestLatencyTimestampUpdatesOnEachSample verifies that a later // TestLatencyTimestampUpdatesOnEachSample verifies that a later
// NotifyRequestLatency call advances LastLatencySample. // NotifyRequestResult call advances LastLatencySample.
func TestLatencyTimestampUpdatesOnEachSample(t *testing.T) { func TestLatencyTimestampUpdatesOnEachSample(t *testing.T) {
s := New() s := New()
s.NotifyRequestLatency("peerA", 100*time.Millisecond) s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
first := s.GetAllPeerStats()["peerA"].LastLatencySample first := s.GetAllPeerStats()["peerA"].LastLatencySample
// Small sleep so the second timestamp is detectably later. // Small sleep so the second timestamp is detectably later.
time.Sleep(2 * time.Millisecond) time.Sleep(2 * time.Millisecond)
s.NotifyRequestLatency("peerA", 200*time.Millisecond) s.NotifyRequestResult("peerA", 200*time.Millisecond, false)
second := s.GetAllPeerStats()["peerA"].LastLatencySample second := s.GetAllPeerStats()["peerA"].LastLatencySample
if !second.After(first) { if !second.After(first) {
t.Fatalf("expected second sample timestamp > first, got first=%v second=%v", first, second) t.Fatalf("expected second sample timestamp > first, got first=%v second=%v", first, second)
} }
} }
// TestRequestResultTimeoutCounting verifies that timeout=true increments
// RequestTimeouts (not RequestSuccesses) and still updates the EMA.
func TestRequestResultTimeoutCounting(t *testing.T) {
s := New()
s.NotifyRequestResult("peerA", 5*time.Second, true)
ps := s.GetAllPeerStats()["peerA"]
if ps.RequestTimeouts != 1 {
t.Fatalf("expected RequestTimeouts=1, got %d", ps.RequestTimeouts)
}
if ps.RequestSuccesses != 0 {
t.Fatalf("expected RequestSuccesses=0, got %d", ps.RequestSuccesses)
}
if ps.RequestLatencyEMA != 5*time.Second {
t.Fatalf("EMA should bootstrap to timeout value, got %v", ps.RequestLatencyEMA)
}
}
// TestRequestResultMixedCounting verifies that a mix of successes and
// timeouts increments the correct counters independently.
func TestRequestResultMixedCounting(t *testing.T) {
s := New()
s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
s.NotifyRequestResult("peerA", 5*time.Second, true)
ps := s.GetAllPeerStats()["peerA"]
if ps.RequestSuccesses != 2 {
t.Fatalf("expected RequestSuccesses=2, got %d", ps.RequestSuccesses)
}
if ps.RequestTimeouts != 1 {
t.Fatalf("expected RequestTimeouts=1, got %d", ps.RequestTimeouts)
}
}