diff --git a/eth/backend.go b/eth/backend.go index af8b04bda6..05614b87d0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -456,8 +456,13 @@ func (s *Ethereum) Start() error { // Start the networking layer s.handler.Start(s.p2pServer.MaxPeers) - // Start the connection manager - s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }) + // Start the transaction tracker; it emits per-block inclusion and + // finalization signals to peerStats, which the dropper queries for + // protection decisions. + s.handler.txTracker.Start(s.blockchain, s.handler.peerStats) + + // Start the connection manager with inclusion-based peer protection. + s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, s.handler.peerStats.GetAllPeerStats) // start log indexer s.filterMaps.Start() @@ -581,6 +586,7 @@ func (s *Ethereum) Stop() error { // Stop all the peer-related stuff first. s.discmix.Close() s.dropper.Stop() + s.handler.txTracker.Stop() s.handler.Stop() // Then stop everything else. diff --git a/eth/dropper.go b/eth/dropper.go index dada5d07c0..0982dba91d 100644 --- a/eth/dropper.go +++ b/eth/dropper.go @@ -17,6 +17,7 @@ package eth import ( + "cmp" mrand "math/rand" "slices" "sync" @@ -24,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/eth/peerstats" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" @@ -40,6 +42,10 @@ const ( // dropping when no more peers can be added. Larger numbers result in more // aggressive drop behavior. peerDropThreshold = 0 + // Fraction of inbound/dialed peers to protect based on inclusion stats. + // The top inclusionProtectionFrac of each category (by score) are + // shielded from random dropping. 0.1 = top 10%. + inclusionProtectionFrac = 0.1 ) var ( @@ -47,18 +53,77 @@ var ( droppedInbound = metrics.NewRegisteredMeter("eth/dropper/inbound", nil) // droppedOutbound is the number of outbound peers dropped droppedOutbound = metrics.NewRegisteredMeter("eth/dropper/outbound", nil) + // dropSkipped counts times a drop was attempted but no peer was dropped, + // for any reason (pool has headroom, all candidates trusted/static/young, + // or protected by inclusion stats). + dropSkipped = metrics.NewRegisteredMeter("eth/dropper/skipped", nil) ) -// dropper monitors the state of the peer pool and makes changes as follows: -// - during sync the Downloader handles peer connections, so dropper is disabled -// - if not syncing and the peer count is close to the limit, it drops peers -// randomly every peerDropInterval to make space for new peers -// - peers are dropped separately from the inboud pool and from the dialed pool +// Callback type to get per-peer inclusion statistics. +type getPeerStatsFunc func() map[string]peerstats.PeerStats + +// protectionCategory defines a peer scoring function and the fraction of peers +// to protect per inbound/dialed category. Multiple categories are unioned. +type protectionCategory struct { + name string + score func(peerstats.PeerStats) float64 + frac float64 // fraction of max peers to protect (0.0–1.0) +} + +// protectionCategories is the list of protection criteria. Each category +// independently selects its top-N peers per pool; the union is protected. +var protectionCategories = []protectionCategory{ + {"recent-finalized", func(s peerstats.PeerStats) float64 { return s.RecentFinalized }, inclusionProtectionFrac}, + {"recent-included", func(s peerstats.PeerStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac}, + {"request-latency", func(s peerstats.PeerStats) float64 { + // Low-latency peers should rank higher. Peers with too few samples + // score 0 so the existing `score <= 0` filter excludes them — this + // prevents a single lucky-fast reply from winning protection. Peers + // whose EMA reaches the timeout also score 0 by this path because + // the reciprocal of a very large duration is tiny but positive; the + // per-pool top-N will still push faster peers ahead of them. + if s.RequestSuccesses+s.RequestTimeouts < peerstats.MinLatencySamples { + return 0 + } + // Freshness gate: a peer that earned a fast EMA but then went + // silent on announcements (no requests → no fresh samples) must + // not keep that score indefinitely. Ignore stale data. + if time.Since(s.LastLatencySample) > peerstats.MaxLatencyStaleness { + return 0 + } + if s.RequestLatencyEMA <= 0 { + return 0 + } + return 1.0 / float64(s.RequestLatencyEMA) + }, inclusionProtectionFrac}, +} + +// dropper monitors the state of the peer pool and introduces churn by +// periodically disconnecting a random peer to make room for new connections. +// The main goal is to allow new peers to join the network and to facilitate +// continuous topology adaptation. +// +// Behavior: +// - During sync the Downloader handles peer connections, so dropper is disabled. +// - When not syncing and a peer category (inbound or dialed) is close to its +// limit, a random peer from that category is disconnected every 3–7 minutes. +// - Trusted and static peers are never dropped. +// - Recently connected peers are also protected from dropping to give them time +// to prove their value before being at risk of disconnection. +// - Some peers are protected from dropping based on their contribution +// to the tx pool. Each pool (inbound/dialed) independently selects its +// top fraction of peers by a per-peer EMA score — a slow EMA of +// finalized inclusions (~1-day half-life, rewards sustained long-term +// contribution) and a fast EMA of recent block inclusions (rewards +// current activity). The union of all protected sets is shielded from +// random dropping, and the drop target is chosen randomly from the +// remainder. type dropper struct { maxDialPeers int // maximum number of dialed peers maxInboundPeers int // maximum number of inbound peers peersFunc getPeersFunc syncingFunc getSyncingFunc + peerStatsFunc getPeerStatsFunc // optional: inclusion stats for protection // peerDropTimer introduces churn if we are close to limit capacity. // We handle Dialed and Inbound connections separately @@ -88,10 +153,12 @@ func newDropper(maxDialPeers, maxInboundPeers int) *dropper { return cm } -// Start the dropper. -func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) { +// Start the dropper. peerStatsFunc is optional (nil disables inclusion +// protection). +func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc, peerStatsFunc getPeerStatsFunc) { cm.peersFunc = srv.Peers cm.syncingFunc = syncingFunc + cm.peerStatsFunc = peerStatsFunc cm.wg.Add(1) go cm.loop() } @@ -114,30 +181,101 @@ func (cm *dropper) dropRandomPeer() bool { } numDialed := len(peers) - numInbound + // Fast path: if neither pool is near capacity, every non-trusted/non-static + // peer is already do-not-drop by pool-threshold rules. No point computing + // inclusion protection. + if cm.maxDialPeers-numDialed > peerDropThreshold && + cm.maxInboundPeers-numInbound > peerDropThreshold { + dropSkipped.Mark(1) + return false + } + + // Compute the set of inclusion-protected peers before filtering. + protected := cm.protectedPeers(peers) + selectDoNotDrop := func(p *p2p.Peer) bool { - // Avoid dropping trusted and static peers, or recent peers. - // Only drop peers if their respective category (dialed/inbound) - // is close to limit capacity. return p.Trusted() || p.StaticDialed() || p.Lifetime() < mclock.AbsTime(doNotDropBefore) || (p.DynDialed() && cm.maxDialPeers-numDialed > peerDropThreshold) || - (p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold) + (p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold) || + protected[p] } droppable := slices.DeleteFunc(peers, selectDoNotDrop) - if len(droppable) > 0 { - p := droppable[mrand.Intn(len(droppable))] - log.Debug("Dropping random peer", "inbound", p.Inbound(), - "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) - p.Disconnect(p2p.DiscUselessPeer) - if p.Inbound() { - droppedInbound.Mark(1) - } else { - droppedOutbound.Mark(1) - } - return true + if len(droppable) == 0 { + dropSkipped.Mark(1) + return false } - return false + p := droppable[mrand.Intn(len(droppable))] + log.Debug("Dropping random peer", "inbound", p.Inbound(), + "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) + p.Disconnect(p2p.DiscUselessPeer) + if p.Inbound() { + droppedInbound.Mark(1) + } else { + droppedOutbound.Mark(1) + } + return true +} + +// protectedPeers computes the set of peers that should not be dropped based +// on inclusion stats. Each protection category independently selects its +// top-N peers per inbound/dialed pool; the union is returned. +func (cm *dropper) protectedPeers(peers []*p2p.Peer) map[*p2p.Peer]bool { + if cm.peerStatsFunc == nil { + return nil + } + stats := cm.peerStatsFunc() + if len(stats) == 0 { + return nil + } + // Split peers by direction. + var inbound, dialed []*p2p.Peer + for _, p := range peers { + if p.Inbound() { + inbound = append(inbound, p) + } else { + dialed = append(dialed, p) + } + } + result := protectedPeersByPool(inbound, dialed, stats) + if len(result) > 0 { + log.Debug("Protecting high-value peers from drop", "protected", len(result)) + } + return result +} + +// protectedPeersByPool selects the union of top-N peers per protection +// category across the given already-split inbound and dialed pools. +// Factored from protectedPeers so tests can exercise the per-pool +// selection logic without needing to construct direction-flagged +// *p2p.Peer instances (which require unexported p2p types). +func protectedPeersByPool(inbound, dialed []*p2p.Peer, stats map[string]peerstats.PeerStats) map[*p2p.Peer]bool { + result := make(map[*p2p.Peer]bool) + // protectPool selects the top-frac peers from pool by score and adds them to result. + protectPool := func(pool []*p2p.Peer, score func(*p2p.Peer) float64, frac float64) { + n := int(float64(len(pool)) * frac) + if n == 0 { + return + } + sorted := slices.SortedFunc(slices.Values(pool), func(a, b *p2p.Peer) int { + return cmp.Compare(score(b), score(a)) // descending + }) + top := slices.DeleteFunc(sorted[:min(n, len(sorted))], func(p *p2p.Peer) bool { + return score(p) <= 0 + }) + for _, p := range top { + result[p] = true + } + } + for _, cat := range protectionCategories { + score := func(p *p2p.Peer) float64 { + return cat.score(stats[p.ID().String()]) + } + protectPool(inbound, score, cat.frac) + protectPool(dialed, score, cat.frac) + } + return result } // randomDuration generates a random duration between min and max. diff --git a/eth/dropper_test.go b/eth/dropper_test.go new file mode 100644 index 0000000000..67f24a064d --- /dev/null +++ b/eth/dropper_test.go @@ -0,0 +1,378 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package eth + +import ( + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/eth/peerstats" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +func makePeers(n int) []*p2p.Peer { + peers := make([]*p2p.Peer, n) + for i := range peers { + id := enode.ID{byte(i)} + peers[i] = p2p.NewPeer(id, fmt.Sprintf("peer%d", i), nil) + } + return peers +} + +func TestProtectedPeersNoStats(t *testing.T) { + cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} + cm.peerStatsFunc = func() map[string]peerstats.PeerStats { return nil } + + peers := makePeers(10) + protected := cm.protectedPeers(peers) + if len(protected) != 0 { + t.Fatalf("expected no protected peers with nil stats, got %d", len(protected)) + } +} + +func TestProtectedPeersEmptyStats(t *testing.T) { + cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} + cm.peerStatsFunc = func() map[string]peerstats.PeerStats { + return map[string]peerstats.PeerStats{} + } + + peers := makePeers(10) + protected := cm.protectedPeers(peers) + if len(protected) != 0 { + t.Fatalf("expected no protected peers with empty stats, got %d", len(protected)) + } +} + +func TestProtectedPeersTopPeer(t *testing.T) { + // 20 peers, 10% of 20 = 2 protected per category. + cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} + + peers := makePeers(20) + stats := make(map[string]peerstats.PeerStats) + stats[peers[0].ID().String()] = peerstats.PeerStats{RecentFinalized: 100} + stats[peers[1].ID().String()] = peerstats.PeerStats{RecentIncluded: 5.0} + + cm.peerStatsFunc = func() map[string]peerstats.PeerStats { return stats } + + protected := cm.protectedPeers(peers) + if len(protected) != 2 { + t.Fatalf("expected 2 protected peers, got %d", len(protected)) + } + if !protected[peers[0]] { + t.Fatal("peer 0 should be protected (top RecentFinalized)") + } + if !protected[peers[1]] { + t.Fatal("peer 1 should be protected (top RecentIncluded)") + } +} + +func TestProtectedPeersZeroScore(t *testing.T) { + cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} + + peers := makePeers(10) + stats := make(map[string]peerstats.PeerStats) + for _, p := range peers { + stats[p.ID().String()] = peerstats.PeerStats{} + } + cm.peerStatsFunc = func() map[string]peerstats.PeerStats { return stats } + + protected := cm.protectedPeers(peers) + if len(protected) != 0 { + t.Fatalf("expected no protection with zero scores, got %d", len(protected)) + } +} + +func TestProtectedPeersOverlap(t *testing.T) { + // One peer is top in both categories — counted once. + cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} + + peers := makePeers(20) + stats := make(map[string]peerstats.PeerStats) + stats[peers[0].ID().String()] = peerstats.PeerStats{RecentFinalized: 100, RecentIncluded: 5.0} + + cm.peerStatsFunc = func() map[string]peerstats.PeerStats { return stats } + + protected := cm.protectedPeers(peers) + if len(protected) != 1 { + t.Fatalf("expected 1 protected peer (overlap), got %d", len(protected)) + } +} + +func TestProtectedPeersNilFunc(t *testing.T) { + cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} + // peerStatsFunc is nil (default). + + peers := makePeers(10) + protected := cm.protectedPeers(peers) + if protected != nil { + t.Fatalf("expected nil with nil stats func, got %v", protected) + } +} + +// TestProtectedByPoolPerPoolTopN verifies that the top-N selection runs +// independently in each of the inbound and dialed pools, not globally. +// With 10 peers per pool and inclusionProtectionFrac=0.1, exactly 1 peer +// is protected per pool per category — so 2 total (one per pool), both +// for the RecentFinalized category since we don't set RecentIncluded. +func TestProtectedByPoolPerPoolTopN(t *testing.T) { + inbound := makePeers(10) + dialed := makePeers(10) + // Distinguish dialed peer IDs from inbound so stats maps don't collide. + for i := range dialed { + id := enode.ID{byte(100 + i)} + dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil) + } + // Strictly increasing scores: highest wins in each pool. + stats := make(map[string]peerstats.PeerStats) + for i, p := range inbound { + stats[p.ID().String()] = peerstats.PeerStats{RecentFinalized: float64(1 + i)} + } + for i, p := range dialed { + stats[p.ID().String()] = peerstats.PeerStats{RecentFinalized: float64(1 + i)} + } + + protected := protectedPeersByPool(inbound, dialed, stats) + + // Expect top 1 of inbound (inbound[9]) and top 1 of dialed (dialed[9]). + if len(protected) != 2 { + t.Fatalf("expected 2 protected peers (1 per pool), got %d", len(protected)) + } + if !protected[inbound[9]] { + t.Error("expected top inbound peer to be protected") + } + if !protected[dialed[9]] { + t.Error("expected top dialed peer to be protected") + } +} + +// TestProtectedByPoolCrossCategoryOverlap verifies that the union across +// protection categories is correctly deduplicated: a peer that wins in +// multiple categories appears once, and category winners are all +// protected. Uses a pool large enough that frac*len yields n=2 per +// category, so cross-category overlap is observable. +func TestProtectedByPoolCrossCategoryOverlap(t *testing.T) { + // 20 dialed peers so 0.1 * 20 = 2 protected per category. + dialed := makePeers(20) + // P0: high RecentFinalized only. P1: high RecentIncluded only. P2: high both. + // With n=2 per category: + // RecentFinalized winners: P2 (tie-broken-ok), P0 + // RecentIncluded winners: P2, P1 + // Union: {P0, P1, P2}. + stats := make(map[string]peerstats.PeerStats) + stats[dialed[0].ID().String()] = peerstats.PeerStats{RecentFinalized: 100, RecentIncluded: 0} + stats[dialed[1].ID().String()] = peerstats.PeerStats{RecentFinalized: 0, RecentIncluded: 5.0} + stats[dialed[2].ID().String()] = peerstats.PeerStats{RecentFinalized: 200, RecentIncluded: 10.0} + + protected := protectedPeersByPool(nil, dialed, stats) + + if len(protected) != 3 { + t.Fatalf("expected 3 protected peers (union of category winners), got %d", len(protected)) + } + for _, idx := range []int{0, 1, 2} { + if !protected[dialed[idx]] { + t.Errorf("peer %d should be protected", idx) + } + } +} + +// TestProtectedByPoolPerPoolIndependence locks in that selection runs +// per-pool, not globally. Every inbound peer scores higher than every +// dialed peer, so a global top-N would pick only inbound peers. Per-pool +// top-N must still protect the top dialed peers. +func TestProtectedByPoolPerPoolIndependence(t *testing.T) { + // 20 inbound, 20 dialed — frac=0.1 → 2 protected per pool per category. + // Global top-4 of RecentFinalized would be inbound[16..19] — zero dialed. + inbound := makePeers(20) + dialed := make([]*p2p.Peer, 20) + for i := range dialed { + id := enode.ID{byte(100 + i)} + dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil) + } + stats := make(map[string]peerstats.PeerStats) + // Every inbound peer outscores every dialed peer. + for i, p := range inbound { + stats[p.ID().String()] = peerstats.PeerStats{RecentFinalized: float64(1000 + i)} + } + for i, p := range dialed { + stats[p.ID().String()] = peerstats.PeerStats{RecentFinalized: float64(1 + i)} + } + + protected := protectedPeersByPool(inbound, dialed, stats) + + // Per-pool top-2 of RecentFinalized: + // inbound: inbound[18], inbound[19] + // dialed: dialed[18], dialed[19] + // Global top-N would contain zero dialed peers, so asserting the top + // dialed peers are protected enforces per-pool independence. + if !protected[dialed[19]] { + t.Fatal("top dialed peer must be protected regardless of globally-higher inbound peers") + } + if !protected[dialed[18]] { + t.Fatal("second-top dialed peer must be protected regardless of globally-higher inbound peers") + } + if !protected[inbound[19]] || !protected[inbound[18]] { + t.Fatal("top inbound peers must also be protected") + } + if len(protected) != 4 { + t.Fatalf("expected 4 protected peers (top-2 of each pool), got %d", len(protected)) + } +} + +// TestProtectedByPoolRequestLatencyBasic verifies the latency protection +// category: with no competing inclusion stats, the lowest-latency peers +// (among those with enough samples) win top-N protection. +func TestProtectedByPoolRequestLatencyBasic(t *testing.T) { + dialed := makePeers(20) // frac=0.1 → n=2 per category + stats := make(map[string]peerstats.PeerStats) + // Three peers have enough samples; the two fastest should win. + stats[dialed[0].ID().String()] = peerstats.PeerStats{ + RequestLatencyEMA: 50 * time.Millisecond, + RequestSuccesses: peerstats.MinLatencySamples, + LastLatencySample: time.Now(), + } + stats[dialed[1].ID().String()] = peerstats.PeerStats{ + RequestLatencyEMA: 100 * time.Millisecond, + RequestSuccesses: peerstats.MinLatencySamples, + LastLatencySample: time.Now(), + } + stats[dialed[2].ID().String()] = peerstats.PeerStats{ + RequestLatencyEMA: 2 * time.Second, + RequestSuccesses: peerstats.MinLatencySamples, + LastLatencySample: time.Now(), + } + + protected := protectedPeersByPool(nil, dialed, stats) + + if !protected[dialed[0]] { + t.Error("fastest peer should be protected") + } + if !protected[dialed[1]] { + t.Error("second-fastest peer should be protected") + } + if protected[dialed[2]] { + t.Error("slowest peer should not be in top-2") + } + if len(protected) != 2 { + t.Fatalf("expected top-2 latency protection, got %d", len(protected)) + } +} + +// TestProtectedByPoolRequestLatencyBootstrapGuard verifies that peers with +// fewer than MinLatencySamples do not earn latency-based protection, even +// if their few samples indicate very low latency. +func TestProtectedByPoolRequestLatencyBootstrapGuard(t *testing.T) { + dialed := makePeers(20) + stats := make(map[string]peerstats.PeerStats) + // A lucky-fast peer with only 1 sample — must NOT be protected. + stats[dialed[0].ID().String()] = peerstats.PeerStats{ + RequestLatencyEMA: 1 * time.Millisecond, + RequestSuccesses: 1, + } + // A warmed-up but slower peer — should be protected on latency. + stats[dialed[1].ID().String()] = peerstats.PeerStats{ + RequestLatencyEMA: 500 * time.Millisecond, + RequestSuccesses: peerstats.MinLatencySamples, + LastLatencySample: time.Now(), + } + + protected := protectedPeersByPool(nil, dialed, stats) + + if protected[dialed[0]] { + t.Error("under-sampled peer should not be protected (bootstrap guard)") + } + if !protected[dialed[1]] { + t.Error("warmed-up peer should be protected") + } +} + +// TestProtectedByPoolRequestLatencyPerPool verifies that the latency +// category selects top-N per pool independently, consistent with the +// other categories. An inbound peer with lower latency does not prevent +// a dialed peer from being protected as top of the dialed pool. +func TestProtectedByPoolRequestLatencyPerPool(t *testing.T) { + inbound := makePeers(20) + dialed := make([]*p2p.Peer, 20) + for i := range dialed { + id := enode.ID{byte(100 + i)} + dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil) + } + stats := make(map[string]peerstats.PeerStats) + // All inbound peers are very fast (50ms). + for _, p := range inbound { + stats[p.ID().String()] = peerstats.PeerStats{ + RequestLatencyEMA: 50 * time.Millisecond, + RequestSuccesses: peerstats.MinLatencySamples, + LastLatencySample: time.Now(), + } + } + // Dialed peers are slower (1s) — globally they would all lose, but + // per-pool top-N should still protect two of them. + for _, p := range dialed { + stats[p.ID().String()] = peerstats.PeerStats{ + RequestLatencyEMA: 1 * time.Second, + RequestSuccesses: peerstats.MinLatencySamples, + LastLatencySample: time.Now(), + } + } + + protected := protectedPeersByPool(inbound, dialed, stats) + + // 2 from inbound + 2 from dialed = 4. + var dialedProtected int + for _, p := range dialed { + if protected[p] { + dialedProtected++ + } + } + if dialedProtected != 2 { + t.Fatalf("expected 2 dialed peers protected by per-pool top-N, got %d", dialedProtected) + } +} + +// TestProtectedByPoolRequestLatencyStale verifies that the freshness gate +// excludes peers whose latency EMA is valid (meeting the sample count and +// fast value) but whose last sample is older than MaxLatencyStaleness. +// A peer cannot serve a burst of fast replies, go silent on announcements, +// and keep latency-based protection indefinitely. +func TestProtectedByPoolRequestLatencyStale(t *testing.T) { + dialed := makePeers(20) + stats := make(map[string]peerstats.PeerStats) + // Fresh, fast peer — should be protected. + stats[dialed[0].ID().String()] = peerstats.PeerStats{ + RequestLatencyEMA: 50 * time.Millisecond, + RequestSuccesses: peerstats.MinLatencySamples, + LastLatencySample: time.Now(), + } + // Stale, fast peer — was fast, but hasn't answered in too long. + // Same EMA and sample count as the fresh peer; only staleness differs. + stats[dialed[1].ID().String()] = peerstats.PeerStats{ + RequestLatencyEMA: 50 * time.Millisecond, + RequestSuccesses: peerstats.MinLatencySamples, + LastLatencySample: time.Now().Add(-2 * peerstats.MaxLatencyStaleness), + } + + protected := protectedPeersByPool(nil, dialed, stats) + + if !protected[dialed[0]] { + t.Error("fresh fast peer must be protected") + } + if protected[dialed[1]] { + t.Error("stale peer must NOT keep latency protection despite fast EMA") + } +} diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 20621c531d..b7060ed53b 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -180,10 +180,12 @@ type TxFetcher struct { alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails // Callbacks - validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool - addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool - fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer - dropPeer func(string) // Drops a peer in case of announcement violation + validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool + addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool + fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer + dropPeer func(string) // Drops a peer in case of announcement violation + onAccepted func(peer string, hashes []common.Hash) // Optional: notified with accepted tx hashes per peer + onRequestResult func(peer string, latency time.Duration, timeout bool) // Optional: notified once per completed/timed-out tx request step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Monotonic clock or simulated clock for tests @@ -194,39 +196,41 @@ type TxFetcher struct { // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. // Chain can be nil to disable on-chain checks. -func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher { - return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil) +func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestResult func(string, time.Duration, bool)) *TxFetcher { + return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, onRequestResult, mclock.System{}, time.Now, nil) } // NewTxFetcherForTests is a testing method to mock out the realtime clock with // a simulated version and the internal randomness with a deterministic one. // Chain can be nil to disable on-chain checks. func NewTxFetcherForTests( - chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), + chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestResult func(string, time.Duration, bool), clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher { return &TxFetcher{ - notify: make(chan *txAnnounce), - cleanup: make(chan *txDelivery), - drop: make(chan *txDrop), - quit: make(chan struct{}), - waitlist: make(map[common.Hash]map[string]struct{}), - waittime: make(map[common.Hash]mclock.AbsTime), - waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq), - announces: make(map[string]map[common.Hash]*txMetadataWithSeq), - announced: make(map[common.Hash]map[string]struct{}), - fetching: make(map[common.Hash]string), - requests: make(map[string]*txRequest), - alternates: make(map[common.Hash]map[string]struct{}), - underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), - txOnChainCache: lru.NewCache[common.Hash, struct{}](txOnChainCacheLimit), - chain: chain, - validateMeta: validateMeta, - addTxs: addTxs, - fetchTxs: fetchTxs, - dropPeer: dropPeer, - clock: clock, - realTime: realTime, - rand: rand, + notify: make(chan *txAnnounce), + cleanup: make(chan *txDelivery), + drop: make(chan *txDrop), + quit: make(chan struct{}), + waitlist: make(map[common.Hash]map[string]struct{}), + waittime: make(map[common.Hash]mclock.AbsTime), + waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq), + announces: make(map[string]map[common.Hash]*txMetadataWithSeq), + announced: make(map[common.Hash]map[string]struct{}), + fetching: make(map[common.Hash]string), + requests: make(map[string]*txRequest), + alternates: make(map[common.Hash]map[string]struct{}), + underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), + txOnChainCache: lru.NewCache[common.Hash, struct{}](txOnChainCacheLimit), + chain: chain, + validateMeta: validateMeta, + addTxs: addTxs, + fetchTxs: fetchTxs, + dropPeer: dropPeer, + onAccepted: onAccepted, + onRequestResult: onRequestResult, + clock: clock, + realTime: realTime, + rand: rand, } } @@ -344,6 +348,8 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) ) batch := txs[i:end] + var accepted []common.Hash + for j, err := range f.addTxs(batch) { // Track the transaction hash if the price is too low for us. // Avoid re-request this transaction when we receive another @@ -353,7 +359,8 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) } // Track a few interesting failure types switch { - case err == nil: // Noop, but need to handle to not count these + case err == nil: + accepted = append(accepted, batch[j].Hash()) case errors.Is(err, txpool.ErrAlreadyKnown): duplicate++ @@ -385,6 +392,10 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) underpricedMeter.Mark(underpriced) otherRejectMeter.Mark(otherreject) + // Notify the tracker which txs from this peer were accepted. + if f.onAccepted != nil && len(accepted) > 0 { + f.onAccepted(peer, accepted) + } // If 'other reject' is >25% of the deliveries in any batch, sleep a bit. if otherreject > int64((len(batch)+3)/4) { log.Debug("Peer delivering stale or invalid transactions", "peer", peer, "rejected", otherreject) @@ -664,6 +675,14 @@ func (f *TxFetcher) loop() { // Keep track of the request as dangling, but never expire f.requests[peer].hashes = nil txFetcherSlowPeers.Inc(1) + // Record the request as a timeout-latency sample. The slow + // EMA in the consumer counts timeouts as the timeout value + // itself, so a peer that times out repeatedly drags its + // score down without us having to wait for an eventual + // (possibly never-arriving) reply. + if f.onRequestResult != nil { + f.onRequestResult(peer, txFetchTimeout, true) + } } } // Schedule a new transaction retrieval @@ -760,6 +779,11 @@ func (f *TxFetcher) loop() { if req.hashes == nil { txFetcherSlowPeers.Dec(1) txFetcherSlowWait.Update(time.Duration(f.clock.Now() - req.time).Nanoseconds()) + // Already counted as a timeout sample at the timeout site; + // don't double-record on eventual delivery. + } else if f.onRequestResult != nil { + // Normal in-time delivery. Record the actual round-trip. + f.onRequestResult(delivery.origin, time.Duration(f.clock.Now()-req.time), false) } delete(f.requests, delivery.origin) diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 6c2719631e..2b4f156420 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -22,6 +22,7 @@ import ( "math/big" "math/rand" "slices" + "sync" "testing" "time" @@ -97,7 +98,7 @@ func newTestTxFetcher() *TxFetcher { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, - nil, + nil, nil, nil, ) } @@ -2203,6 +2204,8 @@ func TestTransactionForgotten(t *testing.T) { }, func(string, []common.Hash) error { return nil }, func(string) {}, + nil, + nil, mockClock, mockTime, rand.New(rand.NewSource(0)), // Use fixed seed for deterministic behavior @@ -2283,3 +2286,103 @@ func TestTransactionForgotten(t *testing.T) { t.Errorf("wrong final underpriced cache size: got %d, want 1", size) } } + +// resultRecorder is a thread-safe recorder for onRequestResult callbacks. +type resultRecorder struct { + mu sync.Mutex + samples []resultSample +} + +type resultSample struct { + peer string + latency time.Duration + timeout bool +} + +func (r *resultRecorder) record(peer string, latency time.Duration, timeout bool) { + r.mu.Lock() + defer r.mu.Unlock() + r.samples = append(r.samples, resultSample{peer, latency, timeout}) +} + +func (r *resultRecorder) snapshot() []resultSample { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]resultSample, len(r.samples)) + copy(out, r.samples) + return out +} + +// TestTransactionFetcherRequestResultOnDelivery asserts that an in-time +// direct delivery fires the onRequestResult callback with timeout=false. +func TestTransactionFetcherRequestResultOnDelivery(t *testing.T) { + rec := &resultRecorder{} + testTransactionFetcherParallel(t, txFetcherTest{ + init: func() *TxFetcher { + f := newTestTxFetcher() + f.onRequestResult = rec.record + return f + }, + steps: []interface{}{ + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, + doWait{time: txArriveTimeout, step: true}, + doWait{time: 200 * time.Millisecond, step: false}, + doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, + doFunc(func() { + samples := rec.snapshot() + if len(samples) != 1 { + t.Fatalf("expected 1 sample, got %d (%v)", len(samples), samples) + } + if samples[0].peer != "A" { + t.Errorf("peer mismatch: got %q, want A", samples[0].peer) + } + if samples[0].latency != 200*time.Millisecond { + t.Errorf("latency mismatch: got %v, want 200ms", samples[0].latency) + } + if samples[0].timeout { + t.Error("expected timeout=false for delivery") + } + }), + }, + }) +} + +// TestTransactionFetcherRequestResultOnTimeout asserts that a timed-out +// request fires onRequestResult with timeout=true and the timeout value, +// and a subsequent (late) delivery does not fire a duplicate sample. +func TestTransactionFetcherRequestResultOnTimeout(t *testing.T) { + rec := &resultRecorder{} + testTransactionFetcherParallel(t, txFetcherTest{ + init: func() *TxFetcher { + f := newTestTxFetcher() + f.onRequestResult = rec.record + return f + }, + steps: []interface{}{ + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, + doWait{time: txArriveTimeout, step: true}, + doWait{time: txFetchTimeout, step: true}, + doFunc(func() { + samples := rec.snapshot() + if len(samples) != 1 { + t.Fatalf("expected 1 timeout sample, got %d (%v)", len(samples), samples) + } + if samples[0].peer != "A" { + t.Errorf("peer mismatch: got %q, want A", samples[0].peer) + } + if samples[0].latency != txFetchTimeout { + t.Errorf("latency mismatch: got %v, want %v", samples[0].latency, txFetchTimeout) + } + if !samples[0].timeout { + t.Error("expected timeout=true for timed-out request") + } + }), + doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, + doFunc(func() { + if len(rec.snapshot()) != 1 { + t.Fatalf("late delivery double-counted: got %d samples, want 1", len(rec.snapshot())) + } + }), + }, + }) +} diff --git a/eth/handler.go b/eth/handler.go index 76df635fb0..2a0e92c819 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -36,8 +36,10 @@ import ( "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/fetcher" + "github.com/ethereum/go-ethereum/eth/peerstats" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" + "github.com/ethereum/go-ethereum/eth/txtracker" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -122,6 +124,8 @@ type handler struct { downloader *downloader.Downloader txFetcher *fetcher.TxFetcher + txTracker *txtracker.Tracker + peerStats *peerstats.Stats peers *peerSet txBroadcastKey [16]byte @@ -181,7 +185,9 @@ func newHandler(config *handlerConfig) (*handler, error) { } return nil } - h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer) + h.txTracker = txtracker.New() + h.peerStats = peerstats.New() + h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted, h.peerStats.NotifyRequestResult) return h, nil } @@ -396,6 +402,7 @@ func (h *handler) unregisterPeer(id string) { } h.downloader.UnregisterPeer(id) h.txFetcher.Drop(id) + h.peerStats.NotifyPeerDrop(id) if err := h.peers.unregisterPeer(id); err != nil { logger.Error("Ethereum peer removal failed", "err", err) diff --git a/eth/peerstats/peerstats.go b/eth/peerstats/peerstats.go new file mode 100644 index 0000000000..146226a187 --- /dev/null +++ b/eth/peerstats/peerstats.go @@ -0,0 +1,193 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package peerstats maintains per-peer quality metrics used by the peer +// dropper to protect high-value peers from random disconnection. +// +// The package is a passive accumulator: it exposes entry points for its +// signal producers (txtracker for inclusion/finalization, the tx fetcher +// for latency, the handler for peer-drop cleanup) and a read-only +// snapshot for its consumer (the dropper). It has no goroutine of its +// own — all mutation is serialized by a single mutex. +// +// Signal sources: +// - NotifyBlock(inclusions, finalized) — per-block deltas from txtracker +// (computed under txtracker's own lock, then passed in after release) +// - NotifyRequestResult(peer, latency, timeout) — per-request outcomes +// from the fetcher; timeouts are reported with the timeout value so +// slow peers contribute to the EMA, and the timeout flag increments +// the per-peer timeout counter +// - NotifyPeerDrop(peer) — called from the handler on disconnect +package peerstats + +import ( + "sync" + "time" +) + +const ( + // EMA smoothing factor for per-block inclusion rate. + emaAlpha = 0.05 + // EMA smoothing factor for per-block finalization rate. Very slow on + // purpose: finalization is permanent, and the score should reflect + // sustained contribution over long windows, not recent bursts. + // Half-life ≈ 6930 chain heads (~23 hours on 12s blocks). + finalizedEMAAlpha = 0.0001 + // EMA smoothing factor for per-request latency average. Slow on purpose: + // short bursts shouldn't shift the score, sustained behavior should. + // Half-life ≈ ln(0.5)/ln(0.99) ≈ 69 samples. + latencyEMAAlpha = 0.01 + // MinLatencySamples is the number of latency samples a peer must accumulate + // before its RequestLatencyEMA is considered meaningful for protection. + // Prevents a single lucky-fast reply from displacing established peers. + MinLatencySamples = 100 + // MaxLatencyStaleness is the oldest allowed age of a peer's last + // latency sample before their RequestLatencyEMA is disregarded for + // protection. Prevents a peer from earning a fast score during a + // burst of activity and then holding protection indefinitely by + // going silent on tx announcements (no further requests → no fresh + // samples → EMA frozen at its last value). + MaxLatencyStaleness = 10 * time.Minute +) + +// PeerStats is the exported per-peer snapshot returned by GetAllPeerStats. +type PeerStats struct { + RecentFinalized float64 // EMA of per-block finalization credits (slow) + RecentIncluded float64 // EMA of per-block inclusions (fast) + RequestLatencyEMA time.Duration // Slow EMA of tx-request response latency (timeouts count as the timeout value) + RequestSuccesses int64 // Requests answered before timeout + RequestTimeouts int64 // Requests that timed out + LastLatencySample time.Time // Wall-clock time of the most recent request result (for staleness gate) +} + +// peerStats is the internal mutable state per peer. +type peerStats struct { + recentFinalized float64 + recentIncluded float64 + requestLatencyEMA time.Duration + requestSuccesses int64 + requestTimeouts int64 + lastLatencySample time.Time +} + +// Stats is the per-peer quality aggregator. +type Stats struct { + mu sync.Mutex + peers map[string]*peerStats +} + +// New creates an empty Stats. +func New() *Stats { + return &Stats{peers: make(map[string]*peerStats)} +} + +// NotifyBlock ingests a per-block update. `inclusions` is the count of the head +// block's transactions attributed to each peer; peers with a positive +// count get a stats entry created if one doesn't exist (this is how +// peerstats learns about newly-active peers). Peers not in the map but +// already tracked have their EMA decay with a zero sample. +// +// `finalized` is per-peer credits accumulated since the last NotifyBlock; +// credits are only applied to peers already tracked — we don't resurrect +// dropped peers from historical finalization data. +// +// NotifyBlock must NOT be called while the caller holds any other lock that +// could be acquired by peerstats callers in reverse order. Current callers +// (txtracker.handleChainHead) release their lock before invoking NotifyBlock. +func (s *Stats) NotifyBlock(inclusions, finalized map[string]int) { + s.mu.Lock() + defer s.mu.Unlock() + + // Ensure a stats entry exists for any peer that just had an inclusion. + // This is the primary path by which peerstats learns about a peer's + // inclusion activity. + for peer, count := range inclusions { + if count > 0 && s.peers[peer] == nil { + s.peers[peer] = &peerStats{} + } + } + // Update inclusion and finalization EMAs for every tracked peer. A + // peer not present in the respective delta map gets a 0 contribution + // — pure decay. Finalization credits for peers no longer tracked are + // ignored (don't resurrect dropped peers from historical data). + for peer, ps := range s.peers { + ps.recentIncluded = (1-emaAlpha)*ps.recentIncluded + emaAlpha*float64(inclusions[peer]) + ps.recentFinalized = (1-finalizedEMAAlpha)*ps.recentFinalized + finalizedEMAAlpha*float64(finalized[peer]) + } +} + +// NotifyRequestResult records a tx-request outcome for the given peer. +// latency is the round-trip time (for timeouts, pass the timeout value). +// timeout indicates whether the request timed out rather than receiving a +// normal delivery. Both cases update the latency EMA; the timeout flag +// additionally increments the per-peer timeout counter. +// Creates a peer entry if one doesn't exist. +func (s *Stats) NotifyRequestResult(peer string, latency time.Duration, timeout bool) { + s.mu.Lock() + defer s.mu.Unlock() + + ps := s.peers[peer] + if ps == nil { + ps = &peerStats{} + s.peers[peer] = ps + } + if ps.requestSuccesses+ps.requestTimeouts == 0 { + // Bootstrap the EMA with the first sample so it doesn't drift up + // from zero over many samples before reaching realistic values. + ps.requestLatencyEMA = latency + } else { + ps.requestLatencyEMA = time.Duration( + float64(ps.requestLatencyEMA)*(1-latencyEMAAlpha) + + float64(latency)*latencyEMAAlpha, + ) + } + if timeout { + ps.requestTimeouts++ + } else { + ps.requestSuccesses++ + } + ps.lastLatencySample = time.Now() +} + +// NotifyPeerDrop removes a peer's stats on disconnect. A rare stale +// latency sample racing with the drop may recreate the peer entry with +// one sample; that entry can never earn protection (MinLatencySamples +// guard) and is harmless. +func (s *Stats) NotifyPeerDrop(peer string) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.peers, peer) +} + +// GetAllPeerStats returns a snapshot of per-peer stats. Called by the +// dropper every few minutes; allocation cost is negligible at that rate. +func (s *Stats) GetAllPeerStats() map[string]PeerStats { + s.mu.Lock() + defer s.mu.Unlock() + + result := make(map[string]PeerStats, len(s.peers)) + for id, ps := range s.peers { + result[id] = PeerStats{ + RecentFinalized: ps.recentFinalized, + RecentIncluded: ps.recentIncluded, + RequestLatencyEMA: ps.requestLatencyEMA, + RequestSuccesses: ps.requestSuccesses, + RequestTimeouts: ps.requestTimeouts, + LastLatencySample: ps.lastLatencySample, + } + } + return result +} diff --git a/eth/peerstats/peerstats_test.go b/eth/peerstats/peerstats_test.go new file mode 100644 index 0000000000..3b242eac15 --- /dev/null +++ b/eth/peerstats/peerstats_test.go @@ -0,0 +1,293 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package peerstats + +import ( + "testing" + "time" +) + +// TestNotifyBlockBootstrapsFromInclusions verifies that a peer with a positive +// inclusion count in the first NotifyBlock gets a stats entry created. +func TestNotifyBlockBootstrapsFromInclusions(t *testing.T) { + s := New() + s.NotifyBlock(map[string]int{"peerA": 3}, nil) + + stats := s.GetAllPeerStats() + if len(stats) != 1 { + t.Fatalf("expected 1 peer entry, got %d", len(stats)) + } + ps, ok := stats["peerA"] + if !ok { + t.Fatal("expected peerA entry") + } + // EMA after first block: (1-0.05)*0 + 0.05*3 = 0.15 + if ps.RecentIncluded <= 0 { + t.Fatalf("expected RecentIncluded > 0 after inclusion, got %f", ps.RecentIncluded) + } +} + +// TestNotifyBlockDecaysKnownPeers verifies that peers already tracked get their +// RecentIncluded EMA decayed when they have no inclusions in a block. +func TestNotifyBlockDecaysKnownPeers(t *testing.T) { + s := New() + // Seed peerA with an inclusion. + s.NotifyBlock(map[string]int{"peerA": 3}, nil) + initial := s.GetAllPeerStats()["peerA"].RecentIncluded + + // Empty block — peerA should decay. + s.NotifyBlock(nil, nil) + after := s.GetAllPeerStats()["peerA"].RecentIncluded + + if after >= initial { + t.Fatalf("expected decay, got %f >= %f", after, initial) + } +} + +// TestNotifyBlockDoesNotResurrectDroppedPeers verifies that finalization +// credits to a peer with no entry don't create one. +func TestNotifyBlockDoesNotResurrectFromFinalization(t *testing.T) { + s := New() + s.NotifyBlock(nil, map[string]int{"peerA": 5}) + + if stats := s.GetAllPeerStats(); len(stats) != 0 { + t.Fatalf("finalization credits must not create entries, got %d peers", len(stats)) + } +} + +// TestNotifyBlockDropThenFinalizeNoResurrect verifies the full drop→finalize +// sequence: a dropped peer doesn't come back via finalization credits. +func TestNotifyBlockDropThenFinalizeNoResurrect(t *testing.T) { + s := New() + s.NotifyBlock(map[string]int{"peerA": 1}, nil) + s.NotifyPeerDrop("peerA") + s.NotifyBlock(nil, map[string]int{"peerA": 10}) + + if stats := s.GetAllPeerStats(); len(stats) != 0 { + t.Fatalf("dropped peer must not be resurrected, got %d peers", len(stats)) + } +} + +// TestNotifyBlockFinalizationCredits an existing peer. +func TestNotifyBlockFinalizationCredits(t *testing.T) { + s := New() + s.NotifyBlock(map[string]int{"peerA": 1}, nil) + s.NotifyBlock(nil, map[string]int{"peerA": 3}) + + // RecentFinalized is a slow EMA, not a cumulative count: assert it + // moved in the positive direction, not the exact value. + if got := s.GetAllPeerStats()["peerA"].RecentFinalized; got <= 0 { + t.Fatalf("expected RecentFinalized>0 after credits, got %f", got) + } +} + +// TestNotifyBlockInclusionEMAUpdate verifies the EMA formula (1-α)·old + α·count. +func TestNotifyBlockInclusionEMAUpdate(t *testing.T) { + s := New() + // Three inclusions: EMA = 0.05 * 3 = 0.15 + s.NotifyBlock(map[string]int{"peerA": 3}, nil) + got := s.GetAllPeerStats()["peerA"].RecentIncluded + want := 0.15 + if diff := got - want; diff < -1e-9 || diff > 1e-9 { + t.Fatalf("EMA after one sample: got %f, want %f", got, want) + } + // Next block with 10 inclusions: EMA = 0.95*0.15 + 0.05*10 = 0.6425 + s.NotifyBlock(map[string]int{"peerA": 10}, nil) + got = s.GetAllPeerStats()["peerA"].RecentIncluded + want = 0.6425 + if diff := got - want; diff < -1e-9 || diff > 1e-9 { + t.Fatalf("EMA after two samples: got %f, want %f", got, want) + } +} + +// TestNotifyRequestResultFirstSampleBootstrap asserts that the first +// latency sample seeds the EMA directly. +func TestNotifyRequestResultFirstSampleBootstrap(t *testing.T) { + s := New() + s.NotifyRequestResult("peerA", 200*time.Millisecond, false) + + ps := s.GetAllPeerStats()["peerA"] + if ps.RequestLatencyEMA != 200*time.Millisecond { + t.Fatalf("expected first sample to seed EMA at 200ms, got %v", ps.RequestLatencyEMA) + } + if ps.RequestSuccesses != 1 { + t.Fatalf("expected RequestSuccesses=1, got %d", ps.RequestSuccesses) + } + if ps.RequestTimeouts != 0 { + t.Fatalf("expected RequestTimeouts=0, got %d", ps.RequestTimeouts) + } +} + +// TestNotifyRequestResultEMAUpdate verifies the EMA formula for latency. +func TestNotifyRequestResultEMAUpdate(t *testing.T) { + s := New() + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) + s.NotifyRequestResult("peerA", 1000*time.Millisecond, false) + + // Expected: 0.99*100ms + 0.01*1000ms = 109ms + got := s.GetAllPeerStats()["peerA"].RequestLatencyEMA + want := 109 * time.Millisecond + delta := got - want + if delta < 0 { + delta = -delta + } + if delta > 1*time.Microsecond { + t.Fatalf("EMA mismatch: got %v, want %v", got, want) + } + ps := s.GetAllPeerStats()["peerA"] + if ps.RequestSuccesses != 2 { + t.Fatalf("expected RequestSuccesses=2, got %d", ps.RequestSuccesses) + } +} + +// TestNotifyRequestResultSlowConvergence verifies the slow alpha +// damps convergence under sustained timeouts. +func TestNotifyRequestResultSlowConvergence(t *testing.T) { + s := New() + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) + for i := 0; i < 50; i++ { + s.NotifyRequestResult("peerA", 5*time.Second, false) + } + got := s.GetAllPeerStats()["peerA"].RequestLatencyEMA + if got < 1*time.Second { + t.Fatalf("EMA did not move enough under sustained timeouts, got %v", got) + } + if got > 3*time.Second { + t.Fatalf("EMA converged too fast for slow alpha=0.01, got %v", got) + } +} + +// TestNotifyPeerDropClearsStats verifies that a dropped peer disappears +// from GetAllPeerStats. +func TestNotifyPeerDropClearsStats(t *testing.T) { + s := New() + s.NotifyRequestResult("peerA", 200*time.Millisecond, false) + s.NotifyPeerDrop("peerA") + + if _, ok := s.GetAllPeerStats()["peerA"]; ok { + t.Fatal("peerA stats should be removed after NotifyPeerDrop") + } +} + +// TestStaleRequestLatencyAfterDrop documents the accepted behavior: a +// late sample after NotifyPeerDrop recreates a 1-sample entry. The +// dropper's MinLatencySamples=100 guard ensures this is harmless. +func TestStaleRequestLatencyAfterDrop(t *testing.T) { + s := New() + s.NotifyRequestResult("peerA", 200*time.Millisecond, false) + s.NotifyPeerDrop("peerA") + // Late sample racing with the drop. + s.NotifyRequestResult("peerA", 50*time.Millisecond, false) + + ps := s.GetAllPeerStats()["peerA"] + if ps.RequestSuccesses != 1 { + t.Fatalf("expected fresh RequestSuccesses=1, got %d", ps.RequestSuccesses) + } + if ps.RequestLatencyEMA != 50*time.Millisecond { + t.Fatalf("expected fresh bootstrap at 50ms, got %v", ps.RequestLatencyEMA) + } + // The dropper's MinLatencySamples guard (in eth/dropper.go) prevents + // this 1-sample entry from earning latency-based protection. +} + +// TestMultiplePeersIsolated verifies per-peer isolation across signal types. +func TestMultiplePeersIsolated(t *testing.T) { + s := New() + s.NotifyBlock(map[string]int{"peerA": 5, "peerB": 0}, nil) + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) + s.NotifyRequestResult("peerB", 5*time.Second, false) + s.NotifyBlock(nil, map[string]int{"peerA": 2}) + + stats := s.GetAllPeerStats() + // Only peerA receives finalization credits; peerB's EMA stays at zero + // (no credits, pure decay from zero). + if stats["peerA"].RecentFinalized <= 0 || stats["peerB"].RecentFinalized != 0 { + t.Errorf("finalization leaked: A=%f B=%f", stats["peerA"].RecentFinalized, stats["peerB"].RecentFinalized) + } + if stats["peerA"].RequestLatencyEMA != 100*time.Millisecond { + t.Errorf("peerA latency: got %v, want 100ms", stats["peerA"].RequestLatencyEMA) + } + if stats["peerB"].RequestLatencyEMA != 5*time.Second { + t.Errorf("peerB latency: got %v, want 5s", stats["peerB"].RequestLatencyEMA) + } +} + +// TestLatencyTimestampSet verifies that NotifyRequestResult stamps the +// peer's LastLatencySample with approximately time.Now(). +func TestLatencyTimestampSet(t *testing.T) { + s := New() + before := time.Now() + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) + after := time.Now() + + got := s.GetAllPeerStats()["peerA"].LastLatencySample + if got.Before(before) || got.After(after) { + t.Fatalf("LastLatencySample = %v not in [%v, %v]", got, before, after) + } +} + +// TestLatencyTimestampUpdatesOnEachSample verifies that a later +// NotifyRequestResult call advances LastLatencySample. +func TestLatencyTimestampUpdatesOnEachSample(t *testing.T) { + s := New() + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) + first := s.GetAllPeerStats()["peerA"].LastLatencySample + + // Small sleep so the second timestamp is detectably later. + time.Sleep(2 * time.Millisecond) + s.NotifyRequestResult("peerA", 200*time.Millisecond, false) + second := s.GetAllPeerStats()["peerA"].LastLatencySample + + if !second.After(first) { + t.Fatalf("expected second sample timestamp > first, got first=%v second=%v", first, second) + } +} + +// TestRequestResultTimeoutCounting verifies that timeout=true increments +// RequestTimeouts (not RequestSuccesses) and still updates the EMA. +func TestRequestResultTimeoutCounting(t *testing.T) { + s := New() + s.NotifyRequestResult("peerA", 5*time.Second, true) + + ps := s.GetAllPeerStats()["peerA"] + if ps.RequestTimeouts != 1 { + t.Fatalf("expected RequestTimeouts=1, got %d", ps.RequestTimeouts) + } + if ps.RequestSuccesses != 0 { + t.Fatalf("expected RequestSuccesses=0, got %d", ps.RequestSuccesses) + } + if ps.RequestLatencyEMA != 5*time.Second { + t.Fatalf("EMA should bootstrap to timeout value, got %v", ps.RequestLatencyEMA) + } +} + +// TestRequestResultMixedCounting verifies that a mix of successes and +// timeouts increments the correct counters independently. +func TestRequestResultMixedCounting(t *testing.T) { + s := New() + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) + s.NotifyRequestResult("peerA", 100*time.Millisecond, false) + s.NotifyRequestResult("peerA", 5*time.Second, true) + + ps := s.GetAllPeerStats()["peerA"] + if ps.RequestSuccesses != 2 { + t.Fatalf("expected RequestSuccesses=2, got %d", ps.RequestSuccesses) + } + if ps.RequestTimeouts != 1 { + t.Fatalf("expected RequestTimeouts=1, got %d", ps.RequestTimeouts) + } +} diff --git a/eth/txtracker/tracker.go b/eth/txtracker/tracker.go new file mode 100644 index 0000000000..661dda32f6 --- /dev/null +++ b/eth/txtracker/tracker.go @@ -0,0 +1,228 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package txtracker maps accepted transactions to their delivering peer +// and observes chain-head and finalization events to emit per-block +// per-peer signals to a StatsConsumer (typically eth/peerstats). +// +// The tracker owns the tx-hash → deliverer-peer map with FIFO eviction, +// a chain-head subscription goroutine, and the computation of per-block +// inclusion counts and finalization credits. It does NOT maintain +// per-peer aggregates — that is peerstats' job. +package txtracker + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" +) + +const ( + // Maximum number of tx→deliverer mappings to retain. + maxTracked = 262144 +) + +// Chain is the blockchain interface needed by the tracker. +type Chain interface { + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription + GetBlockByNumber(number uint64) *types.Block + GetBlock(hash common.Hash, number uint64) *types.Block + CurrentFinalBlock() *types.Header +} + +// StatsConsumer receives per-block signals about peer inclusion and +// finalization. The tracker invokes NotifyBlock exactly once per handled chain +// head, AFTER releasing its own lock, with: +// +// - inclusions: per-peer count of transactions in the head block +// - finalized: per-peer count of transactions in blocks that became +// finalized since the previous call (possibly zero-range) +// +// Either map may be empty but the slice/map itself is never nil when +// called. NotifyBlock must not call back into the tracker. +type StatsConsumer interface { + NotifyBlock(inclusions, finalized map[string]int) +} + +// Tracker records which peer delivered each transaction and emits +// per-block inclusion and finalization signals to a StatsConsumer. +type Tracker struct { + mu sync.Mutex + txs map[common.Hash]string // hash → deliverer peer ID + order []common.Hash // insertion order for FIFO eviction + + chain Chain + consumer StatsConsumer + lastFinalNum uint64 // last finalized block number processed + headCh chan core.ChainHeadEvent + sub event.Subscription + + quit chan struct{} + step chan struct{} // test sync: sent after each event is processed + wg sync.WaitGroup +} + +// New creates a new tracker. +func New() *Tracker { + return &Tracker{ + txs: make(map[common.Hash]string), + quit: make(chan struct{}), + step: make(chan struct{}, 1), + } +} + +// Start begins listening for chain head events. `consumer` receives +// per-block signals; if nil, signals are computed but discarded +// (useful in tests that exercise only the tx-lifecycle surface). +func (t *Tracker) Start(chain Chain, consumer StatsConsumer) { + t.chain = chain + t.consumer = consumer + // Seed lastFinalNum so checkFinalization doesn't backfill from genesis. + if fh := chain.CurrentFinalBlock(); fh != nil { + t.lastFinalNum = fh.Number.Uint64() + } + t.headCh = make(chan core.ChainHeadEvent, 128) + t.sub = chain.SubscribeChainHeadEvent(t.headCh) + t.wg.Add(1) + go t.loop() +} + +// Stop shuts down the tracker. +func (t *Tracker) Stop() { + t.sub.Unsubscribe() + close(t.quit) + t.wg.Wait() +} + +// NotifyAccepted records that a peer delivered transactions that were accepted +// by the pool. Only accepted (not rejected/duplicate) txs should be recorded +// to prevent attribution poisoning from replayed or invalid txs. +// Safe to call from any goroutine. +func (t *Tracker) NotifyAccepted(peer string, hashes []common.Hash) { + t.mu.Lock() + defer t.mu.Unlock() + + for _, hash := range hashes { + if _, ok := t.txs[hash]; ok { + continue // already tracked, keep first deliverer + } + t.txs[hash] = peer + t.order = append(t.order, hash) + } + // Evict oldest entries if over capacity. + for len(t.txs) > maxTracked { + oldest := t.order[0] + t.order = t.order[1:] + delete(t.txs, oldest) + } + // Compact the backing array when it grows too large. Reslicing + // with order[1:] doesn't free earlier slots in the array. + if cap(t.order) > 2*maxTracked { + t.order = append([]common.Hash(nil), t.order...) + } +} + +func (t *Tracker) loop() { + defer t.wg.Done() + + for { + select { + case ev := <-t.headCh: + t.handleChainHead(ev) + select { + case t.step <- struct{}{}: + default: + } + case <-t.sub.Err(): + return + case <-t.quit: + return + } + } +} + +// handleChainHead computes per-peer deltas for the new head block and any +// newly-finalized blocks, then hands them to the StatsConsumer AFTER +// releasing t.mu. The lock-release-before-consumer pattern avoids any +// cross-package lock ordering. +func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) { + // Fetch the head block by hash (not just number) to avoid using a + // reorged block if the tracker goroutine lags behind the chain. + block := t.chain.GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64()) + if block == nil { + return + } + + t.mu.Lock() + // Count per-peer inclusions in the head block. + inclusions := make(map[string]int) + for _, tx := range block.Transactions() { + if peer := t.txs[tx.Hash()]; peer != "" { + inclusions[peer]++ + } + } + // Compute per-peer finalization credits since the last call. + finalized := t.collectFinalization() + t.mu.Unlock() + + if t.consumer != nil { + t.consumer.NotifyBlock(inclusions, finalized) + } +} + +// collectFinalization accumulates per-peer finalization credits for +// blocks newly finalized since lastFinalNum. Returns a (possibly empty) +// map; advances lastFinalNum. Must be called with t.mu held. +func (t *Tracker) collectFinalization() map[string]int { + credits := make(map[string]int) + finalHeader := t.chain.CurrentFinalBlock() + if finalHeader == nil { + return credits + } + finalNum := finalHeader.Number.Uint64() + if finalNum <= t.lastFinalNum { + return credits + } + for num := t.lastFinalNum + 1; num <= finalNum; num++ { + block := t.chain.GetBlockByNumber(num) + if block == nil { + continue + } + for _, tx := range block.Transactions() { + if peer := t.txs[tx.Hash()]; peer != "" { + credits[peer]++ + } + } + } + if total := sumCounts(credits); total > 0 { + log.Trace("Accumulated finalization credits", + "from", t.lastFinalNum+1, "to", finalNum, "txs", total) + } + t.lastFinalNum = finalNum + return credits +} + +func sumCounts(m map[string]int) int { + var sum int + for _, v := range m { + sum += v + } + return sum +} diff --git a/eth/txtracker/tracker_test.go b/eth/txtracker/tracker_test.go new file mode 100644 index 0000000000..16adb5b92c --- /dev/null +++ b/eth/txtracker/tracker_test.go @@ -0,0 +1,362 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package txtracker + +import ( + "math/big" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/trie" +) + +// mockChain implements the Chain interface for testing. +// +// Blocks are stored by hash to exercise the reorg-safe lookup path in +// tracker.handleChainHead (which calls GetBlock(hash, number)). A separate +// canonicalByNum index maps each height to its canonical block hash, used +// by GetBlockByNumber (the finalization path). +type mockChain struct { + mu sync.Mutex + headFeed event.Feed + blocksByHash map[common.Hash]*types.Block + canonicalByNum map[uint64]common.Hash + finalNum uint64 +} + +func newMockChain() *mockChain { + return &mockChain{ + blocksByHash: make(map[common.Hash]*types.Block), + canonicalByNum: make(map[uint64]common.Hash), + } +} + +func (c *mockChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return c.headFeed.Subscribe(ch) +} + +func (c *mockChain) GetBlockByNumber(number uint64) *types.Block { + c.mu.Lock() + defer c.mu.Unlock() + hash, ok := c.canonicalByNum[number] + if !ok { + return nil + } + return c.blocksByHash[hash] +} + +func (c *mockChain) GetBlock(hash common.Hash, number uint64) *types.Block { + c.mu.Lock() + defer c.mu.Unlock() + return c.blocksByHash[hash] +} + +func (c *mockChain) CurrentFinalBlock() *types.Header { + c.mu.Lock() + defer c.mu.Unlock() + if c.finalNum == 0 { + return nil + } + return &types.Header{Number: new(big.Int).SetUint64(c.finalNum)} +} + +// addBlock adds a canonical block at the given height. +func (c *mockChain) addBlock(num uint64, txs []*types.Transaction) *types.Block { + return c.addBlockAtHeight(num, num, txs, true) +} + +// addBlockAtHeight adds a block at the given height. The salt parameter +// ensures distinct block hashes for two blocks at the same height. If +// canonical is true, the block becomes the canonical block for that height. +func (c *mockChain) addBlockAtHeight(num, salt uint64, txs []*types.Transaction, canonical bool) *types.Block { + c.mu.Lock() + defer c.mu.Unlock() + header := &types.Header{ + Number: new(big.Int).SetUint64(num), + Extra: big.NewInt(int64(salt)).Bytes(), + } + block := types.NewBlock(header, &types.Body{Transactions: txs}, nil, trie.NewListHasher()) + c.blocksByHash[block.Hash()] = block + if canonical { + c.canonicalByNum[num] = block.Hash() + } + return block +} + +func (c *mockChain) setFinalBlock(num uint64) { + c.mu.Lock() + defer c.mu.Unlock() + c.finalNum = num +} + +// sendHead emits a chain head event for the canonical block at the given height. +func (c *mockChain) sendHead(num uint64) { + c.mu.Lock() + hash := c.canonicalByNum[num] + block := c.blocksByHash[hash] + c.mu.Unlock() + if block == nil { + panic("sendHead: no canonical block at height") + } + c.headFeed.Send(core.ChainHeadEvent{Header: block.Header()}) +} + +// sendHeadBlock emits a chain head event for the given block (may be +// non-canonical). Used for reorg tests. +func (c *mockChain) sendHeadBlock(block *types.Block) { + c.headFeed.Send(core.ChainHeadEvent{Header: block.Header()}) +} + +func hashTxs(txs []*types.Transaction) []common.Hash { + hashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + hashes[i] = tx.Hash() + } + return hashes +} + +func makeTx(nonce uint64) *types.Transaction { + return types.NewTx(&types.LegacyTx{Nonce: nonce, GasPrice: big.NewInt(1), Gas: 21000}) +} + +// mockConsumer captures NotifyBlock invocations so tests can assert on the +// signals the tracker emits. +type mockConsumer struct { + mu sync.Mutex + signals []signal +} + +type signal struct { + inclusions, finalized map[string]int +} + +func (c *mockConsumer) NotifyBlock(inclusions, finalized map[string]int) { + c.mu.Lock() + defer c.mu.Unlock() + // Deep-copy so tests inspecting older signals aren't tripped up by + // later iterations mutating the same map (they don't today, but + // this keeps the assertion model simple). + in := make(map[string]int, len(inclusions)) + for k, v := range inclusions { + in[k] = v + } + fn := make(map[string]int, len(finalized)) + for k, v := range finalized { + fn[k] = v + } + c.signals = append(c.signals, signal{in, fn}) +} + +func (c *mockConsumer) last() signal { + c.mu.Lock() + defer c.mu.Unlock() + if len(c.signals) == 0 { + return signal{} + } + return c.signals[len(c.signals)-1] +} + +// waitStep blocks until the tracker has processed one event. +func waitStep(t *testing.T, tr *Tracker) { + t.Helper() + select { + case <-tr.step: + case <-time.After(time.Second): + t.Fatal("timeout waiting for tracker step") + } +} + +// TestNotifyAcceptedRecordsMapping verifies the tx-lifecycle surface: +// NotifyAccepted records tx→peer mappings in insertion order, with +// first-deliverer-wins semantics on duplicates. +func TestNotifyAcceptedRecordsMapping(t *testing.T) { + tr := New() + + txs := []*types.Transaction{makeTx(1), makeTx(2), makeTx(3)} + hashes := hashTxs(txs) + tr.NotifyAccepted("peerA", hashes) + + tr.mu.Lock() + defer tr.mu.Unlock() + if len(tr.txs) != 3 { + t.Fatalf("expected 3 tracked txs, got %d", len(tr.txs)) + } + if len(tr.order) != 3 { + t.Fatalf("expected order length 3, got %d", len(tr.order)) + } + for i, h := range hashes { + if got := tr.txs[h]; got != "peerA" { + t.Fatalf("tx %d: expected deliverer=peerA, got %q", i, got) + } + if tr.order[i] != h { + t.Fatalf("order[%d] mismatch", i) + } + } +} + +// TestNotifyAcceptedFirstDelivererWins verifies duplicate accepts +// preserve the original deliverer. +func TestNotifyAcceptedFirstDelivererWins(t *testing.T) { + tr := New() + tx := makeTx(1) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + tr.NotifyAccepted("peerB", []common.Hash{tx.Hash()}) + + tr.mu.Lock() + defer tr.mu.Unlock() + if got := tr.txs[tx.Hash()]; got != "peerA" { + t.Fatalf("expected first deliverer peerA to win, got %q", got) + } + if len(tr.order) != 1 { + t.Fatalf("expected single order entry, got %d", len(tr.order)) + } +} + +// TestHandleChainHeadEmitsInclusions verifies the tracker emits a +// correct per-peer inclusion map to its consumer when a head block +// contains tracked transactions. +func TestHandleChainHeadEmitsInclusions(t *testing.T) { + tr := New() + chain := newMockChain() + consumer := &mockConsumer{} + tr.Start(chain, consumer) + defer tr.Stop() + + tx1, tx2 := makeTx(1), makeTx(2) + tr.NotifyAccepted("peerA", []common.Hash{tx1.Hash()}) + tr.NotifyAccepted("peerB", []common.Hash{tx2.Hash()}) + + chain.addBlock(1, []*types.Transaction{tx1, tx2}) + chain.sendHead(1) + waitStep(t, tr) + + sig := consumer.last() + if sig.inclusions["peerA"] != 1 { + t.Errorf("peerA inclusions: got %d, want 1", sig.inclusions["peerA"]) + } + if sig.inclusions["peerB"] != 1 { + t.Errorf("peerB inclusions: got %d, want 1", sig.inclusions["peerB"]) + } + if len(sig.finalized) != 0 { + t.Errorf("expected empty finalized map, got %v", sig.finalized) + } +} + +// TestHandleChainHeadEmptyBlock verifies an empty head block emits an +// empty inclusion map (so peerstats can decay all known peers). +func TestHandleChainHeadEmptyBlock(t *testing.T) { + tr := New() + chain := newMockChain() + consumer := &mockConsumer{} + tr.Start(chain, consumer) + defer tr.Stop() + + chain.addBlock(1, nil) + chain.sendHead(1) + waitStep(t, tr) + + sig := consumer.last() + if len(sig.inclusions) != 0 { + t.Errorf("expected empty inclusions, got %v", sig.inclusions) + } +} + +// TestHandleChainHeadEmitsFinalization verifies that when finalization +// advances, the consumer receives per-peer finalization credits +// accumulated over the newly-finalized range. +func TestHandleChainHeadEmitsFinalization(t *testing.T) { + tr := New() + chain := newMockChain() + consumer := &mockConsumer{} + tr.Start(chain, consumer) + defer tr.Stop() + + tx := makeTx(1) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + + // Include in block 1, not yet finalized. + chain.addBlock(1, []*types.Transaction{tx}) + chain.sendHead(1) + waitStep(t, tr) + + if credits := consumer.last().finalized["peerA"]; credits != 0 { + t.Fatalf("expected no finalization credits before finalization, got %d", credits) + } + + // Finalize block 1; next head triggers the finalization scan. + chain.setFinalBlock(1) + chain.addBlock(2, nil) + chain.sendHead(2) + waitStep(t, tr) + + if credits := consumer.last().finalized["peerA"]; credits != 1 { + t.Fatalf("expected 1 finalization credit, got %d", credits) + } +} + +// TestReorgSafety verifies the tracker resolves the head block by HASH +// so a head event pointing at a sibling block does not emit inclusions +// from the canonical block at the same height. +func TestReorgSafety(t *testing.T) { + tr := New() + chain := newMockChain() + consumer := &mockConsumer{} + tr.Start(chain, consumer) + defer tr.Stop() + + tx := makeTx(1) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + + // Two blocks at height 1: canonical A contains tx; sibling B does not. + blockA := chain.addBlockAtHeight(1, 1, []*types.Transaction{tx}, true) + blockB := chain.addBlockAtHeight(1, 2, nil, false) + if blockA.Hash() == blockB.Hash() { + t.Fatal("sibling blocks ended up with the same hash") + } + + // Head announces sibling B — emit must contain no peerA inclusions. + chain.sendHeadBlock(blockB) + waitStep(t, tr) + if incl := consumer.last().inclusions["peerA"]; incl != 0 { + t.Fatalf("sibling-B head should emit 0 peerA inclusions, got %d", incl) + } + + // Head announces canonical A — emit must contain 1 peerA inclusion. + chain.sendHeadBlock(blockA) + waitStep(t, tr) + if incl := consumer.last().inclusions["peerA"]; incl != 1 { + t.Fatalf("canonical-A head should emit 1 peerA inclusion, got %d", incl) + } +} + +// TestHandleChainHeadNilConsumer verifies the tracker tolerates a nil +// consumer (useful for tests that only exercise tx-lifecycle behavior). +func TestHandleChainHeadNilConsumer(t *testing.T) { + tr := New() + chain := newMockChain() + tr.Start(chain, nil) + defer tr.Stop() + + chain.addBlock(1, nil) + chain.sendHead(1) + waitStep(t, tr) // should not panic +} diff --git a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go index bcceaff383..10505af4af 100644 --- a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go +++ b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go @@ -84,7 +84,7 @@ func fuzz(input []byte) int { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, - nil, + nil, nil, nil, clock, func() time.Time { nanoTime := int64(clock.Now())