diff --git a/eth/dropper.go b/eth/dropper.go index 3d370c0733..0982dba91d 100644 --- a/eth/dropper.go +++ b/eth/dropper.go @@ -82,7 +82,7 @@ var protectionCategories = []protectionCategory{ // whose EMA reaches the timeout also score 0 by this path because // the reciprocal of a very large duration is tiny but positive; the // per-pool top-N will still push faster peers ahead of them. - if s.RequestSamples < peerstats.MinLatencySamples { + if s.RequestSuccesses+s.RequestTimeouts < peerstats.MinLatencySamples { return 0 } // Freshness gate: a peer that earned a fast EMA but then went diff --git a/eth/dropper_test.go b/eth/dropper_test.go index 63b431de22..c56f293166 100644 --- a/eth/dropper_test.go +++ b/eth/dropper_test.go @@ -243,17 +243,17 @@ func TestProtectedByPoolRequestLatencyBasic(t *testing.T) { // Three peers have enough samples; the two fastest should win. stats[dialed[0].ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 50 * time.Millisecond, - RequestSamples: peerstats.MinLatencySamples, + RequestSuccesses: peerstats.MinLatencySamples, LastLatencySample: time.Now(), } stats[dialed[1].ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 100 * time.Millisecond, - RequestSamples: peerstats.MinLatencySamples, + RequestSuccesses: peerstats.MinLatencySamples, LastLatencySample: time.Now(), } stats[dialed[2].ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 2 * time.Second, - RequestSamples: peerstats.MinLatencySamples, + RequestSuccesses: peerstats.MinLatencySamples, LastLatencySample: time.Now(), } @@ -282,12 +282,12 @@ func TestProtectedByPoolRequestLatencyBootstrapGuard(t *testing.T) { // A lucky-fast peer with only 1 sample — must NOT be protected. stats[dialed[0].ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 1 * time.Millisecond, - RequestSamples: 1, + RequestSuccesses: 1, } // A warmed-up but slower peer — should be protected on latency. stats[dialed[1].ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 500 * time.Millisecond, - RequestSamples: peerstats.MinLatencySamples, + RequestSuccesses: peerstats.MinLatencySamples, LastLatencySample: time.Now(), } @@ -317,7 +317,7 @@ func TestProtectedByPoolRequestLatencyPerPool(t *testing.T) { for _, p := range inbound { stats[p.ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 50 * time.Millisecond, - RequestSamples: peerstats.MinLatencySamples, + RequestSuccesses: peerstats.MinLatencySamples, LastLatencySample: time.Now(), } } @@ -326,7 +326,7 @@ func TestProtectedByPoolRequestLatencyPerPool(t *testing.T) { for _, p := range dialed { stats[p.ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 1 * time.Second, - RequestSamples: peerstats.MinLatencySamples, + RequestSuccesses: peerstats.MinLatencySamples, LastLatencySample: time.Now(), } } @@ -356,14 +356,14 @@ func TestProtectedByPoolRequestLatencyStale(t *testing.T) { // Fresh, fast peer — should be protected. stats[dialed[0].ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 50 * time.Millisecond, - RequestSamples: peerstats.MinLatencySamples, + RequestSuccesses: peerstats.MinLatencySamples, LastLatencySample: time.Now(), } // Stale, fast peer — was fast, but hasn't answered in too long. // Same EMA and sample count as the fresh peer; only staleness differs. stats[dialed[1].ID().String()] = peerstats.PeerStats{ RequestLatencyEMA: 50 * time.Millisecond, - RequestSamples: peerstats.MinLatencySamples, + RequestSuccesses: peerstats.MinLatencySamples, LastLatencySample: time.Now().Add(-2 * peerstats.MaxLatencyStaleness), } diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index ffed7d5acf..6aba3b1f23 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -185,7 +185,7 @@ type TxFetcher struct { fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer dropPeer func(string) // Drops a peer in case of announcement violation onAccepted func(peer string, hashes []common.Hash) // Optional: notified with accepted tx hashes per peer - onRequestLatency func(peer string, latency time.Duration) // Optional: notified once per completed/timed-out tx request + onRequestResult func(peer string, latency time.Duration, timeout bool) // Optional: notified once per completed/timed-out tx request step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Monotonic clock or simulated clock for tests @@ -196,15 +196,15 @@ type TxFetcher struct { // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. // Chain can be nil to disable on-chain checks. -func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestLatency func(string, time.Duration)) *TxFetcher { - return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, onRequestLatency, mclock.System{}, time.Now, nil) +func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestResult func(string, time.Duration, bool)) *TxFetcher { + return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, onRequestResult, mclock.System{}, time.Now, nil) } // NewTxFetcherForTests is a testing method to mock out the realtime clock with // a simulated version and the internal randomness with a deterministic one. // Chain can be nil to disable on-chain checks. func NewTxFetcherForTests( - chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestLatency func(string, time.Duration), + chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestResult func(string, time.Duration, bool), clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher { return &TxFetcher{ notify: make(chan *txAnnounce), @@ -227,7 +227,7 @@ func NewTxFetcherForTests( fetchTxs: fetchTxs, dropPeer: dropPeer, onAccepted: onAccepted, - onRequestLatency: onRequestLatency, + onRequestResult: onRequestResult, clock: clock, realTime: realTime, rand: rand, @@ -680,8 +680,8 @@ func (f *TxFetcher) loop() { // itself, so a peer that times out repeatedly drags its // score down without us having to wait for an eventual // (possibly never-arriving) reply. - if f.onRequestLatency != nil { - f.onRequestLatency(peer, txFetchTimeout) + if f.onRequestResult != nil { + f.onRequestResult(peer, txFetchTimeout, true) } } } @@ -781,9 +781,9 @@ func (f *TxFetcher) loop() { txFetcherSlowWait.Update(time.Duration(f.clock.Now() - req.time).Nanoseconds()) // Already counted as a timeout sample at the timeout site; // don't double-record on eventual delivery. - } else if f.onRequestLatency != nil { + } else if f.onRequestResult != nil { // Normal in-time delivery. Record the actual round-trip. - f.onRequestLatency(delivery.origin, time.Duration(f.clock.Now()-req.time)) + f.onRequestResult(delivery.origin, time.Duration(f.clock.Now()-req.time), false) } delete(f.requests, delivery.origin) diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 92f3979acb..2b4f156420 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -2287,53 +2287,51 @@ func TestTransactionForgotten(t *testing.T) { } } -// latencyRecorder is a thread-safe recorder for onRequestLatency callbacks. -type latencyRecorder struct { +// resultRecorder is a thread-safe recorder for onRequestResult callbacks. +type resultRecorder struct { mu sync.Mutex - samples []latencySample + samples []resultSample } -type latencySample struct { +type resultSample struct { peer string latency time.Duration + timeout bool } -func (r *latencyRecorder) record(peer string, latency time.Duration) { +func (r *resultRecorder) record(peer string, latency time.Duration, timeout bool) { r.mu.Lock() defer r.mu.Unlock() - r.samples = append(r.samples, latencySample{peer, latency}) + r.samples = append(r.samples, resultSample{peer, latency, timeout}) } -func (r *latencyRecorder) snapshot() []latencySample { +func (r *resultRecorder) snapshot() []resultSample { r.mu.Lock() defer r.mu.Unlock() - out := make([]latencySample, len(r.samples)) + out := make([]resultSample, len(r.samples)) copy(out, r.samples) return out } -// TestTransactionFetcherRequestLatencyOnDelivery asserts that an in-time -// direct delivery of a requested batch fires the onRequestLatency callback -// exactly once with the actual round-trip latency. -func TestTransactionFetcherRequestLatencyOnDelivery(t *testing.T) { - rec := &latencyRecorder{} +// TestTransactionFetcherRequestResultOnDelivery asserts that an in-time +// direct delivery fires the onRequestResult callback with timeout=false. +func TestTransactionFetcherRequestResultOnDelivery(t *testing.T) { + rec := &resultRecorder{} testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { f := newTestTxFetcher() - f.onRequestLatency = rec.record + f.onRequestResult = rec.record return f }, steps: []interface{}{ doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, - // Wait for the announce-arrival timer; request is dispatched at this point. doWait{time: txArriveTimeout, step: true}, - // Simulate 200ms round-trip before the response arrives. doWait{time: 200 * time.Millisecond, step: false}, doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, doFunc(func() { samples := rec.snapshot() if len(samples) != 1 { - t.Fatalf("expected 1 latency sample, got %d (%v)", len(samples), samples) + t.Fatalf("expected 1 sample, got %d (%v)", len(samples), samples) } if samples[0].peer != "A" { t.Errorf("peer mismatch: got %q, want A", samples[0].peer) @@ -2341,28 +2339,28 @@ func TestTransactionFetcherRequestLatencyOnDelivery(t *testing.T) { if samples[0].latency != 200*time.Millisecond { t.Errorf("latency mismatch: got %v, want 200ms", samples[0].latency) } + if samples[0].timeout { + t.Error("expected timeout=false for delivery") + } }), }, }) } -// TestTransactionFetcherRequestLatencyOnTimeout asserts that when a request -// times out (no reply within txFetchTimeout), onRequestLatency fires once -// with the timeout value, and a subsequent (late) delivery does not fire -// a duplicate sample. -func TestTransactionFetcherRequestLatencyOnTimeout(t *testing.T) { - rec := &latencyRecorder{} +// TestTransactionFetcherRequestResultOnTimeout asserts that a timed-out +// request fires onRequestResult with timeout=true and the timeout value, +// and a subsequent (late) delivery does not fire a duplicate sample. +func TestTransactionFetcherRequestResultOnTimeout(t *testing.T) { + rec := &resultRecorder{} testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { f := newTestTxFetcher() - f.onRequestLatency = rec.record + f.onRequestResult = rec.record return f }, steps: []interface{}{ doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, doWait{time: txArriveTimeout, step: true}, - // Push the clock past the request deadline; the timeout handler - // should fire and record a single timeout-valued sample. doWait{time: txFetchTimeout, step: true}, doFunc(func() { samples := rec.snapshot() @@ -2375,13 +2373,14 @@ func TestTransactionFetcherRequestLatencyOnTimeout(t *testing.T) { if samples[0].latency != txFetchTimeout { t.Errorf("latency mismatch: got %v, want %v", samples[0].latency, txFetchTimeout) } + if !samples[0].timeout { + t.Error("expected timeout=true for timed-out request") + } }), - // A late reply from the slow peer must not produce a second sample. doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, doFunc(func() { - samples := rec.snapshot() - if len(samples) != 1 { - t.Fatalf("late delivery double-counted latency: got %d samples, want 1", len(samples)) + if len(rec.snapshot()) != 1 { + t.Fatalf("late delivery double-counted: got %d samples, want 1", len(rec.snapshot())) } }), }, diff --git a/eth/handler.go b/eth/handler.go index 8916dad662..448cfe0a2a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -194,7 +194,7 @@ func newHandler(config *handlerConfig) (*handler, error) { } h.txTracker = txtracker.New() h.peerStats = peerstats.New() - h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted, h.peerStats.NotifyRequestLatency) + h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted, h.peerStats.NotifyRequestResult) return h, nil } diff --git a/eth/peerstats/peerstats.go b/eth/peerstats/peerstats.go index 1d0d3d36bc..146226a187 100644 --- a/eth/peerstats/peerstats.go +++ b/eth/peerstats/peerstats.go @@ -26,9 +26,10 @@ // Signal sources: // - NotifyBlock(inclusions, finalized) — per-block deltas from txtracker // (computed under txtracker's own lock, then passed in after release) -// - NotifyRequestLatency(peer, latency) — per-request samples from the -// fetcher; timeouts are reported with the timeout value so slow peers -// contribute to the EMA +// - NotifyRequestResult(peer, latency, timeout) — per-request outcomes +// from the fetcher; timeouts are reported with the timeout value so +// slow peers contribute to the EMA, and the timeout flag increments +// the per-peer timeout counter // - NotifyPeerDrop(peer) — called from the handler on disconnect package peerstats @@ -67,8 +68,9 @@ type PeerStats struct { RecentFinalized float64 // EMA of per-block finalization credits (slow) RecentIncluded float64 // EMA of per-block inclusions (fast) RequestLatencyEMA time.Duration // Slow EMA of tx-request response latency (timeouts count as the timeout value) - RequestSamples int64 // Number of latency samples seen (for bootstrap guard) - LastLatencySample time.Time // Wall-clock time of the most recent latency sample (for staleness gate) + RequestSuccesses int64 // Requests answered before timeout + RequestTimeouts int64 // Requests that timed out + LastLatencySample time.Time // Wall-clock time of the most recent request result (for staleness gate) } // peerStats is the internal mutable state per peer. @@ -76,7 +78,8 @@ type peerStats struct { recentFinalized float64 recentIncluded float64 requestLatencyEMA time.Duration - requestSamples int64 + requestSuccesses int64 + requestTimeouts int64 lastLatencySample time.Time } @@ -126,11 +129,13 @@ func (s *Stats) NotifyBlock(inclusions, finalized map[string]int) { } } -// NotifyRequestLatency records a tx-request response latency sample for -// the given peer. Timeouts should be reported as the timeout value. -// Creates a peer entry if one doesn't exist (a peer may have latency -// samples before any inclusion signal). -func (s *Stats) NotifyRequestLatency(peer string, latency time.Duration) { +// NotifyRequestResult records a tx-request outcome for the given peer. +// latency is the round-trip time (for timeouts, pass the timeout value). +// timeout indicates whether the request timed out rather than receiving a +// normal delivery. Both cases update the latency EMA; the timeout flag +// additionally increments the per-peer timeout counter. +// Creates a peer entry if one doesn't exist. +func (s *Stats) NotifyRequestResult(peer string, latency time.Duration, timeout bool) { s.mu.Lock() defer s.mu.Unlock() @@ -139,7 +144,7 @@ func (s *Stats) NotifyRequestLatency(peer string, latency time.Duration) { ps = &peerStats{} s.peers[peer] = ps } - if ps.requestSamples == 0 { + if ps.requestSuccesses+ps.requestTimeouts == 0 { // Bootstrap the EMA with the first sample so it doesn't drift up // from zero over many samples before reaching realistic values. ps.requestLatencyEMA = latency @@ -149,7 +154,11 @@ func (s *Stats) NotifyRequestLatency(peer string, latency time.Duration) { float64(latency)*latencyEMAAlpha, ) } - ps.requestSamples++ + if timeout { + ps.requestTimeouts++ + } else { + ps.requestSuccesses++ + } ps.lastLatencySample = time.Now() } @@ -175,7 +184,8 @@ func (s *Stats) GetAllPeerStats() map[string]PeerStats { RecentFinalized: ps.recentFinalized, RecentIncluded: ps.recentIncluded, RequestLatencyEMA: ps.requestLatencyEMA, - RequestSamples: ps.requestSamples, + RequestSuccesses: ps.requestSuccesses, + RequestTimeouts: ps.requestTimeouts, LastLatencySample: ps.lastLatencySample, } } diff --git a/eth/peerstats/peerstats_test.go b/eth/peerstats/peerstats_test.go index cfa894512f..3b242eac15 100644 --- a/eth/peerstats/peerstats_test.go +++ b/eth/peerstats/peerstats_test.go @@ -114,26 +114,29 @@ func TestNotifyBlockInclusionEMAUpdate(t *testing.T) { } } -// TestNotifyRequestLatencyFirstSampleBootstrap asserts that the first +// TestNotifyRequestResultFirstSampleBootstrap asserts that the first // latency sample seeds the EMA directly. -func TestNotifyRequestLatencyFirstSampleBootstrap(t *testing.T) { +func TestNotifyRequestResultFirstSampleBootstrap(t *testing.T) { s := New() - s.NotifyRequestLatency("peerA", 200*time.Millisecond) + s.NotifyRequestResult("peerA", 200*time.Millisecond, false) ps := s.GetAllPeerStats()["peerA"] if ps.RequestLatencyEMA != 200*time.Millisecond { t.Fatalf("expected first sample to seed EMA at 200ms, got %v", ps.RequestLatencyEMA) } - if ps.RequestSamples != 1 { - t.Fatalf("expected RequestSamples=1, got %d", ps.RequestSamples) + if ps.RequestSuccesses != 1 { + t.Fatalf("expected RequestSuccesses=1, got %d", ps.RequestSuccesses) + } + if ps.RequestTimeouts != 0 { + t.Fatalf("expected RequestTimeouts=0, got %d", ps.RequestTimeouts) } } -// TestNotifyRequestLatencyEMAUpdate verifies the EMA formula for latency. -func TestNotifyRequestLatencyEMAUpdate(t *testing.T) { +// TestNotifyRequestResultEMAUpdate verifies the EMA formula for latency. +func TestNotifyRequestResultEMAUpdate(t *testing.T) { s := New() - s.NotifyRequestLatency("peerA", 100*time.Millisecond) - s.NotifyRequestLatency("peerA", 1000*time.Millisecond) + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) + s.NotifyRequestResult("peerA", 1000*time.Millisecond, false) // Expected: 0.99*100ms + 0.01*1000ms = 109ms got := s.GetAllPeerStats()["peerA"].RequestLatencyEMA @@ -145,18 +148,19 @@ func TestNotifyRequestLatencyEMAUpdate(t *testing.T) { if delta > 1*time.Microsecond { t.Fatalf("EMA mismatch: got %v, want %v", got, want) } - if samples := s.GetAllPeerStats()["peerA"].RequestSamples; samples != 2 { - t.Fatalf("expected RequestSamples=2, got %d", samples) + ps := s.GetAllPeerStats()["peerA"] + if ps.RequestSuccesses != 2 { + t.Fatalf("expected RequestSuccesses=2, got %d", ps.RequestSuccesses) } } -// TestNotifyRequestLatencySlowConvergence verifies the slow alpha +// TestNotifyRequestResultSlowConvergence verifies the slow alpha // damps convergence under sustained timeouts. -func TestNotifyRequestLatencySlowConvergence(t *testing.T) { +func TestNotifyRequestResultSlowConvergence(t *testing.T) { s := New() - s.NotifyRequestLatency("peerA", 100*time.Millisecond) + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) for i := 0; i < 50; i++ { - s.NotifyRequestLatency("peerA", 5*time.Second) + s.NotifyRequestResult("peerA", 5*time.Second, false) } got := s.GetAllPeerStats()["peerA"].RequestLatencyEMA if got < 1*time.Second { @@ -171,7 +175,7 @@ func TestNotifyRequestLatencySlowConvergence(t *testing.T) { // from GetAllPeerStats. func TestNotifyPeerDropClearsStats(t *testing.T) { s := New() - s.NotifyRequestLatency("peerA", 200*time.Millisecond) + s.NotifyRequestResult("peerA", 200*time.Millisecond, false) s.NotifyPeerDrop("peerA") if _, ok := s.GetAllPeerStats()["peerA"]; ok { @@ -184,14 +188,14 @@ func TestNotifyPeerDropClearsStats(t *testing.T) { // dropper's MinLatencySamples=100 guard ensures this is harmless. func TestStaleRequestLatencyAfterDrop(t *testing.T) { s := New() - s.NotifyRequestLatency("peerA", 200*time.Millisecond) + s.NotifyRequestResult("peerA", 200*time.Millisecond, false) s.NotifyPeerDrop("peerA") // Late sample racing with the drop. - s.NotifyRequestLatency("peerA", 50*time.Millisecond) + s.NotifyRequestResult("peerA", 50*time.Millisecond, false) ps := s.GetAllPeerStats()["peerA"] - if ps.RequestSamples != 1 { - t.Fatalf("expected fresh RequestSamples=1, got %d", ps.RequestSamples) + if ps.RequestSuccesses != 1 { + t.Fatalf("expected fresh RequestSuccesses=1, got %d", ps.RequestSuccesses) } if ps.RequestLatencyEMA != 50*time.Millisecond { t.Fatalf("expected fresh bootstrap at 50ms, got %v", ps.RequestLatencyEMA) @@ -204,8 +208,8 @@ func TestStaleRequestLatencyAfterDrop(t *testing.T) { func TestMultiplePeersIsolated(t *testing.T) { s := New() s.NotifyBlock(map[string]int{"peerA": 5, "peerB": 0}, nil) - s.NotifyRequestLatency("peerA", 100*time.Millisecond) - s.NotifyRequestLatency("peerB", 5*time.Second) + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) + s.NotifyRequestResult("peerB", 5*time.Second, false) s.NotifyBlock(nil, map[string]int{"peerA": 2}) stats := s.GetAllPeerStats() @@ -222,12 +226,12 @@ func TestMultiplePeersIsolated(t *testing.T) { } } -// TestLatencyTimestampSet verifies that NotifyRequestLatency stamps the +// TestLatencyTimestampSet verifies that NotifyRequestResult stamps the // peer's LastLatencySample with approximately time.Now(). func TestLatencyTimestampSet(t *testing.T) { s := New() before := time.Now() - s.NotifyRequestLatency("peerA", 100*time.Millisecond) + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) after := time.Now() got := s.GetAllPeerStats()["peerA"].LastLatencySample @@ -237,18 +241,53 @@ func TestLatencyTimestampSet(t *testing.T) { } // TestLatencyTimestampUpdatesOnEachSample verifies that a later -// NotifyRequestLatency call advances LastLatencySample. +// NotifyRequestResult call advances LastLatencySample. func TestLatencyTimestampUpdatesOnEachSample(t *testing.T) { s := New() - s.NotifyRequestLatency("peerA", 100*time.Millisecond) + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) first := s.GetAllPeerStats()["peerA"].LastLatencySample // Small sleep so the second timestamp is detectably later. time.Sleep(2 * time.Millisecond) - s.NotifyRequestLatency("peerA", 200*time.Millisecond) + s.NotifyRequestResult("peerA", 200*time.Millisecond, false) second := s.GetAllPeerStats()["peerA"].LastLatencySample if !second.After(first) { t.Fatalf("expected second sample timestamp > first, got first=%v second=%v", first, second) } } + +// TestRequestResultTimeoutCounting verifies that timeout=true increments +// RequestTimeouts (not RequestSuccesses) and still updates the EMA. +func TestRequestResultTimeoutCounting(t *testing.T) { + s := New() + s.NotifyRequestResult("peerA", 5*time.Second, true) + + ps := s.GetAllPeerStats()["peerA"] + if ps.RequestTimeouts != 1 { + t.Fatalf("expected RequestTimeouts=1, got %d", ps.RequestTimeouts) + } + if ps.RequestSuccesses != 0 { + t.Fatalf("expected RequestSuccesses=0, got %d", ps.RequestSuccesses) + } + if ps.RequestLatencyEMA != 5*time.Second { + t.Fatalf("EMA should bootstrap to timeout value, got %v", ps.RequestLatencyEMA) + } +} + +// TestRequestResultMixedCounting verifies that a mix of successes and +// timeouts increments the correct counters independently. +func TestRequestResultMixedCounting(t *testing.T) { + s := New() + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) + s.NotifyRequestResult("peerA", 5*time.Second, true) + + ps := s.GetAllPeerStats()["peerA"] + if ps.RequestSuccesses != 2 { + t.Fatalf("expected RequestSuccesses=2, got %d", ps.RequestSuccesses) + } + if ps.RequestTimeouts != 1 { + t.Fatalf("expected RequestTimeouts=1, got %d", ps.RequestTimeouts) + } +}