diff --git a/eth/backend.go b/eth/backend.go index af8b04bda6..b82587a6da 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -456,8 +456,11 @@ func (s *Ethereum) Start() error { // Start the networking layer s.handler.Start(s.p2pServer.MaxPeers) - // Start the connection manager - s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }) + // Start the transaction tracker (records tx deliveries, credits peer inclusions). + s.handler.txTracker.Start(s.blockchain) + + // Start the connection manager with inclusion-based peer protection. + s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, s.handler.txTracker.GetAllPeerStats) // start log indexer s.filterMaps.Start() @@ -581,6 +584,7 @@ func (s *Ethereum) Stop() error { // Stop all the peer-related stuff first. s.discmix.Close() s.dropper.Stop() + s.handler.txTracker.Stop() s.handler.Stop() // Then stop everything else. diff --git a/eth/dropper.go b/eth/dropper.go index dada5d07c0..3855439ca5 100644 --- a/eth/dropper.go +++ b/eth/dropper.go @@ -17,6 +17,7 @@ package eth import ( + "cmp" mrand "math/rand" "slices" "sync" @@ -24,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/eth/txtracker" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" @@ -40,6 +42,10 @@ const ( // dropping when no more peers can be added. Larger numbers result in more // aggressive drop behavior. peerDropThreshold = 0 + // Fraction of inbound/dialed peers to protect based on inclusion stats. + // The top inclusionProtectionFrac of each category (by score) are + // shielded from random dropping. 0.1 = top 10%. + inclusionProtectionFrac = 0.1 ) var ( @@ -47,18 +53,56 @@ var ( droppedInbound = metrics.NewRegisteredMeter("eth/dropper/inbound", nil) // droppedOutbound is the number of outbound peers dropped droppedOutbound = metrics.NewRegisteredMeter("eth/dropper/outbound", nil) + // dropSkipped counts times a drop was attempted but no peer was dropped, + // for any reason (pool has headroom, all candidates trusted/static/young, + // or protected by inclusion stats). + dropSkipped = metrics.NewRegisteredMeter("eth/dropper/skipped", nil) ) -// dropper monitors the state of the peer pool and makes changes as follows: -// - during sync the Downloader handles peer connections, so dropper is disabled -// - if not syncing and the peer count is close to the limit, it drops peers -// randomly every peerDropInterval to make space for new peers -// - peers are dropped separately from the inboud pool and from the dialed pool +// Callback type to get per-peer inclusion statistics. +type getPeerStatsFunc func() map[string]txtracker.PeerStats + +// protectionCategory defines a peer scoring function and the fraction of peers +// to protect per inbound/dialed category. Multiple categories are unioned. +type protectionCategory struct { + name string + score func(txtracker.PeerStats) float64 + frac float64 // fraction of max peers to protect (0.0–1.0) +} + +// protectionCategories is the list of protection criteria. Each category +// independently selects its top-N peers per pool; the union is protected. +var protectionCategories = []protectionCategory{ + {"recent-finalized", func(s txtracker.PeerStats) float64 { return s.RecentFinalized }, inclusionProtectionFrac}, + {"recent-included", func(s txtracker.PeerStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac}, +} + +// dropper monitors the state of the peer pool and introduces churn by +// periodically disconnecting a random peer to make room for new connections. +// The main goal is to allow new peers to join the network and to facilitate +// continuous topology adaptation. +// +// Behavior: +// - During sync the Downloader handles peer connections, so dropper is disabled. +// - When not syncing and a peer category (inbound or dialed) is close to its +// limit, a random peer from that category is disconnected every 3–7 minutes. +// - Trusted and static peers are never dropped. +// - Recently connected peers are also protected from dropping to give them time +// to prove their value before being at risk of disconnection. +// - Some peers are protected from dropping based on their contribution +// to the tx pool. Each pool (inbound/dialed) independently selects its +// top fraction of peers by a per-peer EMA score — a slow EMA of +// finalized inclusions (~1-day half-life, rewards sustained long-term +// contribution) and a fast EMA of recent block inclusions (rewards +// current activity). The union of all protected sets is shielded from +// random dropping, and the drop target is chosen randomly from the +// remainder. type dropper struct { maxDialPeers int // maximum number of dialed peers maxInboundPeers int // maximum number of inbound peers peersFunc getPeersFunc syncingFunc getSyncingFunc + peerStatsFunc getPeerStatsFunc // optional: inclusion stats for protection // peerDropTimer introduces churn if we are close to limit capacity. // We handle Dialed and Inbound connections separately @@ -88,10 +132,12 @@ func newDropper(maxDialPeers, maxInboundPeers int) *dropper { return cm } -// Start the dropper. -func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) { +// Start the dropper. peerStatsFunc is optional (nil disables inclusion +// protection). +func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc, peerStatsFunc getPeerStatsFunc) { cm.peersFunc = srv.Peers cm.syncingFunc = syncingFunc + cm.peerStatsFunc = peerStatsFunc cm.wg.Add(1) go cm.loop() } @@ -114,30 +160,101 @@ func (cm *dropper) dropRandomPeer() bool { } numDialed := len(peers) - numInbound + // Fast path: if neither pool is near capacity, every non-trusted/non-static + // peer is already do-not-drop by pool-threshold rules. No point computing + // inclusion protection. + if cm.maxDialPeers-numDialed > peerDropThreshold && + cm.maxInboundPeers-numInbound > peerDropThreshold { + dropSkipped.Mark(1) + return false + } + + // Compute the set of inclusion-protected peers before filtering. + protected := cm.protectedPeers(peers) + selectDoNotDrop := func(p *p2p.Peer) bool { - // Avoid dropping trusted and static peers, or recent peers. - // Only drop peers if their respective category (dialed/inbound) - // is close to limit capacity. return p.Trusted() || p.StaticDialed() || p.Lifetime() < mclock.AbsTime(doNotDropBefore) || (p.DynDialed() && cm.maxDialPeers-numDialed > peerDropThreshold) || - (p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold) + (p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold) || + protected[p] } droppable := slices.DeleteFunc(peers, selectDoNotDrop) - if len(droppable) > 0 { - p := droppable[mrand.Intn(len(droppable))] - log.Debug("Dropping random peer", "inbound", p.Inbound(), - "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) - p.Disconnect(p2p.DiscUselessPeer) - if p.Inbound() { - droppedInbound.Mark(1) - } else { - droppedOutbound.Mark(1) - } - return true + if len(droppable) == 0 { + dropSkipped.Mark(1) + return false } - return false + p := droppable[mrand.Intn(len(droppable))] + log.Debug("Dropping random peer", "inbound", p.Inbound(), + "id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers)) + p.Disconnect(p2p.DiscUselessPeer) + if p.Inbound() { + droppedInbound.Mark(1) + } else { + droppedOutbound.Mark(1) + } + return true +} + +// protectedPeers computes the set of peers that should not be dropped based +// on inclusion stats. Each protection category independently selects its +// top-N peers per inbound/dialed pool; the union is returned. +func (cm *dropper) protectedPeers(peers []*p2p.Peer) map[*p2p.Peer]bool { + if cm.peerStatsFunc == nil { + return nil + } + stats := cm.peerStatsFunc() + if len(stats) == 0 { + return nil + } + // Split peers by direction. + var inbound, dialed []*p2p.Peer + for _, p := range peers { + if p.Inbound() { + inbound = append(inbound, p) + } else { + dialed = append(dialed, p) + } + } + result := protectedPeersByPool(inbound, dialed, stats) + if len(result) > 0 { + log.Debug("Protecting high-value peers from drop", "protected", len(result)) + } + return result +} + +// protectedPeersByPool selects the union of top-N peers per protection +// category across the given already-split inbound and dialed pools. +// Factored from protectedPeers so tests can exercise the per-pool +// selection logic without needing to construct direction-flagged +// *p2p.Peer instances (which require unexported p2p types). +func protectedPeersByPool(inbound, dialed []*p2p.Peer, stats map[string]txtracker.PeerStats) map[*p2p.Peer]bool { + result := make(map[*p2p.Peer]bool) + // protectPool selects the top-frac peers from pool by score and adds them to result. + protectPool := func(pool []*p2p.Peer, score func(*p2p.Peer) float64, frac float64) { + n := int(float64(len(pool)) * frac) + if n == 0 { + return + } + sorted := slices.SortedFunc(slices.Values(pool), func(a, b *p2p.Peer) int { + return cmp.Compare(score(b), score(a)) // descending + }) + top := slices.DeleteFunc(sorted[:min(n, len(sorted))], func(p *p2p.Peer) bool { + return score(p) <= 0 + }) + for _, p := range top { + result[p] = true + } + } + for _, cat := range protectionCategories { + score := func(p *p2p.Peer) float64 { + return cat.score(stats[p.ID().String()]) + } + protectPool(inbound, score, cat.frac) + protectPool(dialed, score, cat.frac) + } + return result } // randomDuration generates a random duration between min and max. diff --git a/eth/dropper_test.go b/eth/dropper_test.go new file mode 100644 index 0000000000..fd2ed9b611 --- /dev/null +++ b/eth/dropper_test.go @@ -0,0 +1,234 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package eth + +import ( + "fmt" + "testing" + + "github.com/ethereum/go-ethereum/eth/txtracker" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +func makePeers(n int) []*p2p.Peer { + peers := make([]*p2p.Peer, n) + for i := range peers { + id := enode.ID{byte(i)} + peers[i] = p2p.NewPeer(id, fmt.Sprintf("peer%d", i), nil) + } + return peers +} + +func TestProtectedPeersNoStats(t *testing.T) { + cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} + cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return nil } + + peers := makePeers(10) + protected := cm.protectedPeers(peers) + if len(protected) != 0 { + t.Fatalf("expected no protected peers with nil stats, got %d", len(protected)) + } +} + +func TestProtectedPeersEmptyStats(t *testing.T) { + cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} + cm.peerStatsFunc = func() map[string]txtracker.PeerStats { + return map[string]txtracker.PeerStats{} + } + + peers := makePeers(10) + protected := cm.protectedPeers(peers) + if len(protected) != 0 { + t.Fatalf("expected no protected peers with empty stats, got %d", len(protected)) + } +} + +func TestProtectedPeersTopPeer(t *testing.T) { + // 20 peers, 10% of 20 = 2 protected per category. + cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} + + peers := makePeers(20) + stats := make(map[string]txtracker.PeerStats) + stats[peers[0].ID().String()] = txtracker.PeerStats{RecentFinalized: 100} + stats[peers[1].ID().String()] = txtracker.PeerStats{RecentIncluded: 5.0} + + cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats } + + protected := cm.protectedPeers(peers) + if len(protected) != 2 { + t.Fatalf("expected 2 protected peers, got %d", len(protected)) + } + if !protected[peers[0]] { + t.Fatal("peer 0 should be protected (top RecentFinalized)") + } + if !protected[peers[1]] { + t.Fatal("peer 1 should be protected (top RecentIncluded)") + } +} + +func TestProtectedPeersZeroScore(t *testing.T) { + cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} + + peers := makePeers(10) + stats := make(map[string]txtracker.PeerStats) + for _, p := range peers { + stats[p.ID().String()] = txtracker.PeerStats{} + } + cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats } + + protected := cm.protectedPeers(peers) + if len(protected) != 0 { + t.Fatalf("expected no protection with zero scores, got %d", len(protected)) + } +} + +func TestProtectedPeersOverlap(t *testing.T) { + // One peer is top in both categories — counted once. + cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} + + peers := makePeers(20) + stats := make(map[string]txtracker.PeerStats) + stats[peers[0].ID().String()] = txtracker.PeerStats{RecentFinalized: 100, RecentIncluded: 5.0} + + cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats } + + protected := cm.protectedPeers(peers) + if len(protected) != 1 { + t.Fatalf("expected 1 protected peer (overlap), got %d", len(protected)) + } +} + +func TestProtectedPeersNilFunc(t *testing.T) { + cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30} + // peerStatsFunc is nil (default). + + peers := makePeers(10) + protected := cm.protectedPeers(peers) + if protected != nil { + t.Fatalf("expected nil with nil stats func, got %v", protected) + } +} + +// TestProtectedByPoolPerPoolTopN verifies that the top-N selection runs +// independently in each of the inbound and dialed pools, not globally. +// With 10 peers per pool and inclusionProtectionFrac=0.1, exactly 1 peer +// is protected per pool per category — so 2 total (one per pool), both +// for the RecentFinalized category since we don't set RecentIncluded. +func TestProtectedByPoolPerPoolTopN(t *testing.T) { + inbound := makePeers(10) + dialed := makePeers(10) + // Distinguish dialed peer IDs from inbound so stats maps don't collide. + for i := range dialed { + id := enode.ID{byte(100 + i)} + dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil) + } + // Strictly increasing scores: highest wins in each pool. + stats := make(map[string]txtracker.PeerStats) + for i, p := range inbound { + stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1 + i)} + } + for i, p := range dialed { + stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1 + i)} + } + + protected := protectedPeersByPool(inbound, dialed, stats) + + // Expect top 1 of inbound (inbound[9]) and top 1 of dialed (dialed[9]). + if len(protected) != 2 { + t.Fatalf("expected 2 protected peers (1 per pool), got %d", len(protected)) + } + if !protected[inbound[9]] { + t.Error("expected top inbound peer to be protected") + } + if !protected[dialed[9]] { + t.Error("expected top dialed peer to be protected") + } +} + +// TestProtectedByPoolCrossCategoryOverlap verifies that the union across +// protection categories is correctly deduplicated: a peer that wins in +// multiple categories appears once, and category winners are all +// protected. Uses a pool large enough that frac*len yields n=2 per +// category, so cross-category overlap is observable. +func TestProtectedByPoolCrossCategoryOverlap(t *testing.T) { + // 20 dialed peers so 0.1 * 20 = 2 protected per category. + dialed := makePeers(20) + // P0: high RecentFinalized only. P1: high RecentIncluded only. P2: high both. + // With n=2 per category: + // RecentFinalized winners: P2 (tie-broken-ok), P0 + // RecentIncluded winners: P2, P1 + // Union: {P0, P1, P2}. + stats := make(map[string]txtracker.PeerStats) + stats[dialed[0].ID().String()] = txtracker.PeerStats{RecentFinalized: 100, RecentIncluded: 0} + stats[dialed[1].ID().String()] = txtracker.PeerStats{RecentFinalized: 0, RecentIncluded: 5.0} + stats[dialed[2].ID().String()] = txtracker.PeerStats{RecentFinalized: 200, RecentIncluded: 10.0} + + protected := protectedPeersByPool(nil, dialed, stats) + + if len(protected) != 3 { + t.Fatalf("expected 3 protected peers (union of category winners), got %d", len(protected)) + } + for _, idx := range []int{0, 1, 2} { + if !protected[dialed[idx]] { + t.Errorf("peer %d should be protected", idx) + } + } +} + +// TestProtectedByPoolPerPoolIndependence locks in that selection runs +// per-pool, not globally. Every inbound peer scores higher than every +// dialed peer, so a global top-N would pick only inbound peers. Per-pool +// top-N must still protect the top dialed peers. +func TestProtectedByPoolPerPoolIndependence(t *testing.T) { + // 20 inbound, 20 dialed — frac=0.1 → 2 protected per pool per category. + // Global top-4 of RecentFinalized would be inbound[16..19] — zero dialed. + inbound := makePeers(20) + dialed := make([]*p2p.Peer, 20) + for i := range dialed { + id := enode.ID{byte(100 + i)} + dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil) + } + stats := make(map[string]txtracker.PeerStats) + // Every inbound peer outscores every dialed peer. + for i, p := range inbound { + stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1000 + i)} + } + for i, p := range dialed { + stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1 + i)} + } + + protected := protectedPeersByPool(inbound, dialed, stats) + + // Per-pool top-2 of RecentFinalized: + // inbound: inbound[18], inbound[19] + // dialed: dialed[18], dialed[19] + // Global top-N would contain zero dialed peers, so asserting the top + // dialed peers are protected enforces per-pool independence. + if !protected[dialed[19]] { + t.Fatal("top dialed peer must be protected regardless of globally-higher inbound peers") + } + if !protected[dialed[18]] { + t.Fatal("second-top dialed peer must be protected regardless of globally-higher inbound peers") + } + if !protected[inbound[19]] || !protected[inbound[18]] { + t.Fatal("top inbound peers must also be protected") + } + if len(protected) != 4 { + t.Fatalf("expected 4 protected peers (top-2 of each pool), got %d", len(protected)) + } +} diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 20621c531d..8b917413ec 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -180,10 +180,11 @@ type TxFetcher struct { alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails // Callbacks - validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool - addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool - fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer - dropPeer func(string) // Drops a peer in case of announcement violation + validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool + addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool + fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer + dropPeer func(string) // Drops a peer in case of announcement violation + onAccepted func(peer string, hashes []common.Hash) // Optional: notified with accepted tx hashes per peer step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Monotonic clock or simulated clock for tests @@ -194,15 +195,15 @@ type TxFetcher struct { // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. // Chain can be nil to disable on-chain checks. -func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher { - return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil) +func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash)) *TxFetcher { + return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, mclock.System{}, time.Now, nil) } // NewTxFetcherForTests is a testing method to mock out the realtime clock with // a simulated version and the internal randomness with a deterministic one. // Chain can be nil to disable on-chain checks. func NewTxFetcherForTests( - chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), + chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher { return &TxFetcher{ notify: make(chan *txAnnounce), @@ -224,6 +225,7 @@ func NewTxFetcherForTests( addTxs: addTxs, fetchTxs: fetchTxs, dropPeer: dropPeer, + onAccepted: onAccepted, clock: clock, realTime: realTime, rand: rand, @@ -344,6 +346,8 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) ) batch := txs[i:end] + var accepted []common.Hash + for j, err := range f.addTxs(batch) { // Track the transaction hash if the price is too low for us. // Avoid re-request this transaction when we receive another @@ -353,7 +357,8 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) } // Track a few interesting failure types switch { - case err == nil: // Noop, but need to handle to not count these + case err == nil: + accepted = append(accepted, batch[j].Hash()) case errors.Is(err, txpool.ErrAlreadyKnown): duplicate++ @@ -385,6 +390,10 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) underpricedMeter.Mark(underpriced) otherRejectMeter.Mark(otherreject) + // Notify the tracker which txs from this peer were accepted. + if f.onAccepted != nil && len(accepted) > 0 { + f.onAccepted(peer, accepted) + } // If 'other reject' is >25% of the deliveries in any batch, sleep a bit. if otherreject > int64((len(batch)+3)/4) { log.Debug("Peer delivering stale or invalid transactions", "peer", peer, "rejected", otherreject) diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 6c2719631e..3fe11fda21 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -97,7 +97,7 @@ func newTestTxFetcher() *TxFetcher { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, - nil, + nil, nil, ) } @@ -2203,6 +2203,7 @@ func TestTransactionForgotten(t *testing.T) { }, func(string, []common.Hash) error { return nil }, func(string) {}, + nil, mockClock, mockTime, rand.New(rand.NewSource(0)), // Use fixed seed for deterministic behavior diff --git a/eth/handler.go b/eth/handler.go index 76df635fb0..80d49aa66d 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/eth/fetcher" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" + "github.com/ethereum/go-ethereum/eth/txtracker" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" @@ -122,6 +123,7 @@ type handler struct { downloader *downloader.Downloader txFetcher *fetcher.TxFetcher + txTracker *txtracker.Tracker peers *peerSet txBroadcastKey [16]byte @@ -181,7 +183,8 @@ func newHandler(config *handlerConfig) (*handler, error) { } return nil } - h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer) + h.txTracker = txtracker.New() + h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted) return h, nil } @@ -396,6 +399,7 @@ func (h *handler) unregisterPeer(id string) { } h.downloader.UnregisterPeer(id) h.txFetcher.Drop(id) + h.txTracker.NotifyPeerDrop(id) if err := h.peers.unregisterPeer(id); err != nil { logger.Error("Ethereum peer removal failed", "err", err) diff --git a/eth/txtracker/tracker.go b/eth/txtracker/tracker.go new file mode 100644 index 0000000000..93797cad9d --- /dev/null +++ b/eth/txtracker/tracker.go @@ -0,0 +1,268 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package txtracker provides minimal per-peer transaction inclusion tracking. +// +// It records which peer delivered each accepted transaction (via NotifyAccepted) +// and monitors the chain for inclusion and finalization events. When a +// delivered transaction is finalized on chain, the delivering peer is +// credited. A per-block exponential moving average (EMA) of inclusions +// tracks recent peer productivity. +// +// The primary consumer is the peer dropper (eth/dropper.go), which uses +// these stats to protect high-value peers from random disconnection. +package txtracker + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" +) + +const ( + // Maximum number of tx→deliverer mappings to retain. + maxTracked = 262144 + // EMA smoothing factor for per-block inclusion rate. + emaAlpha = 0.05 + // EMA smoothing factor for per-block finalization rate. Very slow on + // purpose: finalization is permanent, and the score should reflect + // sustained contribution over long windows, not recent bursts. + // Half-life ≈ 6930 chain heads (~23 hours on 12s blocks). + finalizedEMAAlpha = 0.0001 +) + +// PeerStats holds the per-peer inclusion data. +type PeerStats struct { + RecentFinalized float64 // EMA of per-block finalization credits (slow) + RecentIncluded float64 // EMA of per-block inclusions (fast) +} + +// Chain is the blockchain interface needed by the tracker. +type Chain interface { + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription + GetBlockByNumber(number uint64) *types.Block + GetBlock(hash common.Hash, number uint64) *types.Block + CurrentFinalBlock() *types.Header +} + +type peerStats struct { + recentFinalized float64 + recentIncluded float64 +} + +// Tracker records which peer delivered each transaction and credits peers +// when their transactions appear on chain. +type Tracker struct { + mu sync.Mutex + txs map[common.Hash]string // hash → deliverer peer ID + peers map[string]*peerStats + order []common.Hash // insertion order for LRU eviction + + chain Chain + lastFinalNum uint64 // last finalized block number processed + headCh chan core.ChainHeadEvent + sub event.Subscription + + quit chan struct{} + step chan struct{} // test sync: sent after each event is processed + wg sync.WaitGroup +} + +// New creates a new tracker. +func New() *Tracker { + return &Tracker{ + txs: make(map[common.Hash]string), + peers: make(map[string]*peerStats), + quit: make(chan struct{}), + step: make(chan struct{}, 1), + } +} + +// Start begins listening for chain head events. +func (t *Tracker) Start(chain Chain) { + t.chain = chain + // Seed lastFinalNum so checkFinalization doesn't backfill from genesis. + if fh := chain.CurrentFinalBlock(); fh != nil { + t.lastFinalNum = fh.Number.Uint64() + } + t.headCh = make(chan core.ChainHeadEvent, 128) + t.sub = chain.SubscribeChainHeadEvent(t.headCh) + t.wg.Add(1) + go t.loop() +} + +// NotifyPeerDrop removes a disconnected peer's stats to prevent unbounded +// growth. Safe to call from any goroutine. +func (t *Tracker) NotifyPeerDrop(peer string) { + t.mu.Lock() + defer t.mu.Unlock() + delete(t.peers, peer) +} + +// Stop shuts down the tracker. +func (t *Tracker) Stop() { + t.sub.Unsubscribe() + close(t.quit) + t.wg.Wait() +} + +// NotifyAccepted records that a peer delivered transactions that were accepted +// by the pool. Only accepted (not rejected/duplicate) txs should be recorded +// to prevent attribution poisoning from replayed or invalid txs. +// Safe to call from any goroutine. +func (t *Tracker) NotifyAccepted(peer string, hashes []common.Hash) { + t.mu.Lock() + defer t.mu.Unlock() + + for _, hash := range hashes { + if _, ok := t.txs[hash]; ok { + continue // already tracked, keep first deliverer + } + t.txs[hash] = peer + t.order = append(t.order, hash) + } + // Ensure the delivering peer has a stats entry. + if len(hashes) > 0 && t.peers[peer] == nil { + t.peers[peer] = &peerStats{} + } + // Evict oldest entries if over capacity. + for len(t.txs) > maxTracked { + oldest := t.order[0] + t.order = t.order[1:] + delete(t.txs, oldest) + } + // Compact the backing array when it grows too large. Reslicing + // with order[1:] doesn't free earlier slots in the array. + if cap(t.order) > 2*maxTracked { + t.order = append([]common.Hash(nil), t.order...) + } +} + +// GetAllPeerStats returns a snapshot of per-peer inclusion statistics. +// Safe to call from any goroutine. +func (t *Tracker) GetAllPeerStats() map[string]PeerStats { + t.mu.Lock() + defer t.mu.Unlock() + + result := make(map[string]PeerStats, len(t.peers)) + for id, ps := range t.peers { + result[id] = PeerStats{ + RecentFinalized: ps.recentFinalized, + RecentIncluded: ps.recentIncluded, + } + } + return result +} + +func (t *Tracker) loop() { + defer t.wg.Done() + + for { + select { + case ev := <-t.headCh: + t.handleChainHead(ev) + select { + case t.step <- struct{}{}: + default: + } + case <-t.sub.Err(): + return + case <-t.quit: + return + } + } +} + +func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) { + // Fetch the head block by hash (not just number) to avoid using a + // reorged block if the tracker goroutine lags behind the chain. + block := t.chain.GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64()) + if block == nil { + return + } + t.mu.Lock() + defer t.mu.Unlock() + + // Count per-peer inclusions in this block for the inclusion EMA. + blockIncl := make(map[string]int) + for _, tx := range block.Transactions() { + if peer := t.txs[tx.Hash()]; peer != "" { + blockIncl[peer]++ + } + } + // Accumulate per-peer finalization credits over the newly-finalized + // range (possibly zero blocks). Only counts peers still tracked. + blockFinal := t.collectFinalizationCredits() + + // Update both EMAs for all tracked peers (decays inactive ones). + // Don't create entries for unknown peers — they may have been + // removed by NotifyPeerDrop and should not be resurrected. + for peer, ps := range t.peers { + ps.recentIncluded = (1-emaAlpha)*ps.recentIncluded + emaAlpha*float64(blockIncl[peer]) + ps.recentFinalized = (1-finalizedEMAAlpha)*ps.recentFinalized + finalizedEMAAlpha*float64(blockFinal[peer]) + } +} + +// collectFinalizationCredits accumulates per-peer finalization credits for +// blocks newly finalized since lastFinalNum. Returns a (possibly empty) map +// keyed by peer ID; advances lastFinalNum. Must be called with t.mu held. +// Peers that have already been removed by NotifyPeerDrop are skipped so +// dropped peers are not resurrected by old on-chain data. +func (t *Tracker) collectFinalizationCredits() map[string]int { + credits := make(map[string]int) + finalHeader := t.chain.CurrentFinalBlock() + if finalHeader == nil { + return credits + } + finalNum := finalHeader.Number.Uint64() + if finalNum <= t.lastFinalNum { + return credits + } + for num := t.lastFinalNum + 1; num <= finalNum; num++ { + block := t.chain.GetBlockByNumber(num) + if block == nil { + continue + } + for _, tx := range block.Transactions() { + peer := t.txs[tx.Hash()] + if peer == "" { + continue + } + if _, ok := t.peers[peer]; !ok { + continue // peer disconnected, skip credit + } + credits[peer]++ + } + } + if total := sumCounts(credits); total > 0 { + log.Trace("Accumulated finalization credits", + "from", t.lastFinalNum+1, "to", finalNum, "txs", total) + } + t.lastFinalNum = finalNum + return credits +} + +func sumCounts(m map[string]int) int { + var sum int + for _, v := range m { + sum += v + } + return sum +} diff --git a/eth/txtracker/tracker_test.go b/eth/txtracker/tracker_test.go new file mode 100644 index 0000000000..b572597e6e --- /dev/null +++ b/eth/txtracker/tracker_test.go @@ -0,0 +1,456 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package txtracker + +import ( + "math/big" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/trie" +) + +// mockChain implements the Chain interface for testing. +// +// Blocks are stored by hash to exercise the reorg-safe lookup path in +// tracker.handleChainHead (which calls GetBlock(hash, number)). A separate +// canonicalByNum index maps each height to its canonical block hash, used +// by GetBlockByNumber (the finalization path). +type mockChain struct { + mu sync.Mutex + headFeed event.Feed + blocksByHash map[common.Hash]*types.Block + canonicalByNum map[uint64]common.Hash + finalNum uint64 +} + +func newMockChain() *mockChain { + return &mockChain{ + blocksByHash: make(map[common.Hash]*types.Block), + canonicalByNum: make(map[uint64]common.Hash), + } +} + +func (c *mockChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return c.headFeed.Subscribe(ch) +} + +func (c *mockChain) GetBlockByNumber(number uint64) *types.Block { + c.mu.Lock() + defer c.mu.Unlock() + hash, ok := c.canonicalByNum[number] + if !ok { + return nil + } + return c.blocksByHash[hash] +} + +func (c *mockChain) GetBlock(hash common.Hash, number uint64) *types.Block { + c.mu.Lock() + defer c.mu.Unlock() + return c.blocksByHash[hash] +} + +func (c *mockChain) CurrentFinalBlock() *types.Header { + c.mu.Lock() + defer c.mu.Unlock() + if c.finalNum == 0 { + return nil + } + return &types.Header{Number: new(big.Int).SetUint64(c.finalNum)} +} + +// addBlock adds a canonical block at the given height. Overwrites any +// prior canonical block at that height. +func (c *mockChain) addBlock(num uint64, txs []*types.Transaction) *types.Block { + return c.addBlockAtHeight(num, num, txs, true) +} + +// addBlockAtHeight adds a block at the given height. The salt parameter +// ensures distinct block hashes for two blocks at the same height (used +// for reorg tests). If canonical is true, the block becomes the canonical +// block for that height (looked up by GetBlockByNumber). +func (c *mockChain) addBlockAtHeight(num, salt uint64, txs []*types.Transaction, canonical bool) *types.Block { + c.mu.Lock() + defer c.mu.Unlock() + // Mix salt into Extra so siblings at the same height get distinct hashes. + header := &types.Header{ + Number: new(big.Int).SetUint64(num), + Extra: big.NewInt(int64(salt)).Bytes(), + } + block := types.NewBlock(header, &types.Body{Transactions: txs}, nil, trie.NewListHasher()) + c.blocksByHash[block.Hash()] = block + if canonical { + c.canonicalByNum[num] = block.Hash() + } + return block +} + +func (c *mockChain) setFinalBlock(num uint64) { + c.mu.Lock() + defer c.mu.Unlock() + c.finalNum = num +} + +// sendHead emits a chain head event for the canonical block at the given +// height. The emitted header carries the real block's hash so the +// tracker's GetBlock(hash, number) lookup resolves correctly. +func (c *mockChain) sendHead(num uint64) { + c.mu.Lock() + hash := c.canonicalByNum[num] + block := c.blocksByHash[hash] + c.mu.Unlock() + if block == nil { + panic("sendHead: no canonical block at height") + } + c.headFeed.Send(core.ChainHeadEvent{Header: block.Header()}) +} + +// sendHeadBlock emits a chain head event for the given block (may be +// non-canonical). Used for reorg tests. +func (c *mockChain) sendHeadBlock(block *types.Block) { + c.headFeed.Send(core.ChainHeadEvent{Header: block.Header()}) +} + +func hashTxs(txs []*types.Transaction) []common.Hash { + hashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + hashes[i] = tx.Hash() + } + return hashes +} + +func makeTx(nonce uint64) *types.Transaction { + return types.NewTx(&types.LegacyTx{Nonce: nonce, GasPrice: big.NewInt(1), Gas: 21000}) +} + +// waitStep blocks until the tracker has processed one event. +func waitStep(t *testing.T, tr *Tracker) { + t.Helper() + select { + case <-tr.step: + case <-time.After(time.Second): + t.Fatal("timeout waiting for tracker step") + } +} + +func TestNotifyReceived(t *testing.T) { + tr := New() + chain := newMockChain() + tr.Start(chain) + defer tr.Stop() + + txs := []*types.Transaction{makeTx(1), makeTx(2), makeTx(3)} + hashes := hashTxs(txs) + tr.NotifyAccepted("peerA", hashes) + + // Public surface: peer entry was created with zero stats before any + // chain events. Map lookups would return a zero value for a missing + // key, so assert presence explicitly. + stats := tr.GetAllPeerStats() + if len(stats) != 1 { + t.Fatalf("expected 1 peer entry, got %d", len(stats)) + } + ps, ok := stats["peerA"] + if !ok { + t.Fatal("expected peerA entry, not found") + } + if ps.RecentFinalized != 0 || ps.RecentIncluded != 0 { + t.Fatalf("expected zero stats before chain events, got %+v", ps) + } + + // Internal state: all tx→deliverer mappings recorded, insertion order + // preserved in the FIFO slice. + tr.mu.Lock() + defer tr.mu.Unlock() + if len(tr.txs) != 3 { + t.Fatalf("expected 3 tracked txs, got %d", len(tr.txs)) + } + if len(tr.order) != 3 { + t.Fatalf("expected order length 3, got %d", len(tr.order)) + } + for i, h := range hashes { + if got := tr.txs[h]; got != "peerA" { + t.Fatalf("tx %d: expected deliverer=peerA, got %q", i, got) + } + if tr.order[i] != h { + t.Fatalf("order[%d] mismatch", i) + } + } +} + +func TestInclusionEMA(t *testing.T) { + tr := New() + chain := newMockChain() + tr.Start(chain) + defer tr.Stop() + + tx := makeTx(1) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + + // Block 1 includes peerA's tx. + chain.addBlock(1, []*types.Transaction{tx}) + chain.sendHead(1) + waitStep(t, tr) + + stats := tr.GetAllPeerStats() + if stats["peerA"].RecentIncluded <= 0 { + t.Fatalf("expected RecentIncluded > 0 after inclusion, got %f", stats["peerA"].RecentIncluded) + } + ema1 := stats["peerA"].RecentIncluded + + // Block 2 has no txs from peerA — EMA should decay. + chain.addBlock(2, nil) + chain.sendHead(2) + waitStep(t, tr) + + stats = tr.GetAllPeerStats() + if stats["peerA"].RecentIncluded >= ema1 { + t.Fatalf("expected EMA to decay, got %f >= %f", stats["peerA"].RecentIncluded, ema1) + } +} + +func TestFinalization(t *testing.T) { + tr := New() + chain := newMockChain() + tr.Start(chain) + defer tr.Stop() + + tx := makeTx(1) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + + // Include in block 1. + chain.addBlock(1, []*types.Transaction{tx}) + chain.sendHead(1) + waitStep(t, tr) + + // Not finalized yet. + stats := tr.GetAllPeerStats() + if stats["peerA"].RecentFinalized != 0 { + t.Fatalf("expected RecentFinalized=0 before finalization, got %f", stats["peerA"].RecentFinalized) + } + + // Finalize block 1, then send head 2 to trigger the finalization EMA update. + chain.setFinalBlock(1) + chain.addBlock(2, nil) + chain.sendHead(2) + waitStep(t, tr) + + stats = tr.GetAllPeerStats() + if stats["peerA"].RecentFinalized <= 0 { + t.Fatalf("expected RecentFinalized>0 after finalization, got %f", stats["peerA"].RecentFinalized) + } +} + +func TestMultiplePeers(t *testing.T) { + tr := New() + chain := newMockChain() + tr.Start(chain) + defer tr.Stop() + + tx1 := makeTx(1) + tx2 := makeTx(2) + tr.NotifyAccepted("peerA", []common.Hash{tx1.Hash()}) + tr.NotifyAccepted("peerB", []common.Hash{tx2.Hash()}) + + // Both included in block 1. + chain.addBlock(1, []*types.Transaction{tx1, tx2}) + chain.sendHead(1) + waitStep(t, tr) + + // Finalize. + chain.setFinalBlock(1) + chain.addBlock(2, nil) + chain.sendHead(2) + waitStep(t, tr) + + stats := tr.GetAllPeerStats() + if stats["peerA"].RecentFinalized <= 0 { + t.Fatalf("peerA: expected RecentFinalized>0, got %f", stats["peerA"].RecentFinalized) + } + if stats["peerB"].RecentFinalized <= 0 { + t.Fatalf("peerB: expected RecentFinalized>0, got %f", stats["peerB"].RecentFinalized) + } +} + +func TestFirstDelivererWins(t *testing.T) { + tr := New() + chain := newMockChain() + tr.Start(chain) + defer tr.Stop() + + tx := makeTx(1) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + tr.NotifyAccepted("peerB", []common.Hash{tx.Hash()}) // duplicate, should be ignored + + chain.addBlock(1, []*types.Transaction{tx}) + chain.sendHead(1) + waitStep(t, tr) + + chain.setFinalBlock(1) + chain.addBlock(2, nil) + chain.sendHead(2) + waitStep(t, tr) + + stats := tr.GetAllPeerStats() + if stats["peerA"].RecentFinalized <= 0 { + t.Fatalf("peerA should be credited, got RecentFinalized=%f", stats["peerA"].RecentFinalized) + } + if stats["peerB"].RecentFinalized != 0 { + t.Fatalf("peerB should NOT be credited, got RecentFinalized=%f", stats["peerB"].RecentFinalized) + } +} + +func TestNoFinalizationCredit(t *testing.T) { + tr := New() + chain := newMockChain() + tr.Start(chain) + defer tr.Stop() + + tx := makeTx(1) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + + // Include but don't finalize. + chain.addBlock(1, []*types.Transaction{tx}) + chain.sendHead(1) + waitStep(t, tr) + + // Send more heads without finalization. + chain.addBlock(2, nil) + chain.sendHead(2) + waitStep(t, tr) + + stats := tr.GetAllPeerStats() + if stats["peerA"].RecentFinalized != 0 { + t.Fatalf("expected RecentFinalized=0 without finalization, got %f", stats["peerA"].RecentFinalized) + } +} + +func TestEMADecay(t *testing.T) { + tr := New() + chain := newMockChain() + tr.Start(chain) + defer tr.Stop() + + tx := makeTx(1) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + + // Include in block 1. + chain.addBlock(1, []*types.Transaction{tx}) + chain.sendHead(1) + waitStep(t, tr) + + // Send 30 empty blocks — EMA should decay close to zero. + for i := uint64(2); i <= 31; i++ { + chain.addBlock(i, nil) + chain.sendHead(i) + waitStep(t, tr) + } + + stats := tr.GetAllPeerStats() + if stats["peerA"].RecentIncluded > 0.02 { + t.Fatalf("expected RecentIncluded near zero after 30 empty blocks, got %f", stats["peerA"].RecentIncluded) + } +} + +// TestReorgSafety verifies that handleChainHead resolves the head block by +// HASH (not just by number), so a head event announcing a sibling block at +// the same height does not credit transactions from the canonical block. +// +// Regression check: if the tracker were changed to use GetBlockByNumber, +// it would always fetch the canonical block A and credit peerA even when +// the head points to sibling B. +func TestReorgSafety(t *testing.T) { + tr := New() + chain := newMockChain() + tr.Start(chain) + defer tr.Stop() + + tx := makeTx(1) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + + // Two blocks at height 1: canonical A contains tx; sibling B does not. + blockA := chain.addBlockAtHeight(1, 1, []*types.Transaction{tx}, true) + blockB := chain.addBlockAtHeight(1, 2, nil, false) + if blockA.Hash() == blockB.Hash() { + t.Fatal("sibling blocks ended up with the same hash") + } + + // Head announces sibling B. A hash-aware tracker fetches B, sees no + // peerA txs, and leaves the EMA at zero. A number-only tracker would + // instead fetch A and credit peerA. + chain.sendHeadBlock(blockB) + waitStep(t, tr) + + if got := tr.GetAllPeerStats()["peerA"].RecentIncluded; got != 0 { + t.Fatalf("expected RecentIncluded=0 after sibling-B head event, got %f (tracker followed the wrong block)", got) + } + + // Now announce canonical A; peerA should be credited. + chain.sendHeadBlock(blockA) + waitStep(t, tr) + + if got := tr.GetAllPeerStats()["peerA"].RecentIncluded; got <= 0 { + t.Fatalf("expected RecentIncluded>0 after canonical-A head event, got %f", got) + } +} + +// TestRecentFinalizedDecays verifies that the finalization EMA decays +// for a peer that earned credits in the past but has no new +// finalization activity. The decay is slow (α=0.0001), so we +// just assert monotonic decrease, not convergence to zero. +func TestRecentFinalizedDecays(t *testing.T) { + tr := New() + chain := newMockChain() + tr.Start(chain) + defer tr.Stop() + + tx := makeTx(1) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + + // Include and finalize in block 1. + chain.addBlock(1, []*types.Transaction{tx}) + chain.sendHead(1) + waitStep(t, tr) + chain.setFinalBlock(1) + chain.addBlock(2, nil) + chain.sendHead(2) + waitStep(t, tr) + + peak := tr.GetAllPeerStats()["peerA"].RecentFinalized + if peak <= 0 { + t.Fatalf("expected RecentFinalized>0 after finalization, got %f", peak) + } + + // Send many empty heads — peer contributes zero each block, + // EMA should decay monotonically. + for i := uint64(3); i <= 50; i++ { + chain.addBlock(i, nil) + chain.sendHead(i) + waitStep(t, tr) + } + + after := tr.GetAllPeerStats()["peerA"].RecentFinalized + if after >= peak { + t.Fatalf("expected RecentFinalized to decay, got %f >= peak %f", after, peak) + } +} diff --git a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go index bcceaff383..69674d0e62 100644 --- a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go +++ b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go @@ -84,7 +84,7 @@ func fuzz(input []byte) int { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, - nil, + nil, nil, clock, func() time.Time { nanoTime := int64(clock.Now())