diff --git a/eth/dropper.go b/eth/dropper.go index 591e3c4277..3855439ca5 100644 --- a/eth/dropper.go +++ b/eth/dropper.go @@ -73,7 +73,7 @@ type protectionCategory struct { // protectionCategories is the list of protection criteria. Each category // independently selects its top-N peers per pool; the union is protected. var protectionCategories = []protectionCategory{ - {"total-finalized", func(s txtracker.PeerStats) float64 { return float64(s.Finalized) }, inclusionProtectionFrac}, + {"recent-finalized", func(s txtracker.PeerStats) float64 { return s.RecentFinalized }, inclusionProtectionFrac}, {"recent-included", func(s txtracker.PeerStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac}, } @@ -89,12 +89,14 @@ var protectionCategories = []protectionCategory{ // - Trusted and static peers are never dropped. // - Recently connected peers are also protected from dropping to give them time // to prove their value before being at risk of disconnection. -// - Some peers are protected from dropping based on their role. This is not based -// on a unified score function, but rather on the concept of protected peer pools. -// Each pool independently selects its top fraction of peers by a specific score -// (e.g. total finalized inclusions, recent inclusion EMA); the union of all -// protected sets is shielded from random dropping, and the drop target is chosen -// randomly from the remainder. +// - Some peers are protected from dropping based on their contribution +// to the tx pool. Each pool (inbound/dialed) independently selects its +// top fraction of peers by a per-peer EMA score — a slow EMA of +// finalized inclusions (~1-day half-life, rewards sustained long-term +// contribution) and a fast EMA of recent block inclusions (rewards +// current activity). The union of all protected sets is shielded from +// random dropping, and the drop target is chosen randomly from the +// remainder. type dropper struct { maxDialPeers int // maximum number of dialed peers maxInboundPeers int // maximum number of inbound peers diff --git a/eth/dropper_test.go b/eth/dropper_test.go index a85eb0011b..fd2ed9b611 100644 --- a/eth/dropper_test.go +++ b/eth/dropper_test.go @@ -64,7 +64,7 @@ func TestProtectedPeersTopPeer(t *testing.T) { peers := makePeers(20) stats := make(map[string]txtracker.PeerStats) - stats[peers[0].ID().String()] = txtracker.PeerStats{Finalized: 100} + stats[peers[0].ID().String()] = txtracker.PeerStats{RecentFinalized: 100} stats[peers[1].ID().String()] = txtracker.PeerStats{RecentIncluded: 5.0} cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats } @@ -74,7 +74,7 @@ func TestProtectedPeersTopPeer(t *testing.T) { t.Fatalf("expected 2 protected peers, got %d", len(protected)) } if !protected[peers[0]] { - t.Fatal("peer 0 should be protected (top Finalized)") + t.Fatal("peer 0 should be protected (top RecentFinalized)") } if !protected[peers[1]] { t.Fatal("peer 1 should be protected (top RecentIncluded)") @@ -103,7 +103,7 @@ func TestProtectedPeersOverlap(t *testing.T) { peers := makePeers(20) stats := make(map[string]txtracker.PeerStats) - stats[peers[0].ID().String()] = txtracker.PeerStats{Finalized: 100, RecentIncluded: 5.0} + stats[peers[0].ID().String()] = txtracker.PeerStats{RecentFinalized: 100, RecentIncluded: 5.0} cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats } @@ -128,7 +128,7 @@ func TestProtectedPeersNilFunc(t *testing.T) { // independently in each of the inbound and dialed pools, not globally. // With 10 peers per pool and inclusionProtectionFrac=0.1, exactly 1 peer // is protected per pool per category — so 2 total (one per pool), both -// for the Finalized category since we don't set RecentIncluded. +// for the RecentFinalized category since we don't set RecentIncluded. func TestProtectedByPoolPerPoolTopN(t *testing.T) { inbound := makePeers(10) dialed := makePeers(10) @@ -140,10 +140,10 @@ func TestProtectedByPoolPerPoolTopN(t *testing.T) { // Strictly increasing scores: highest wins in each pool. stats := make(map[string]txtracker.PeerStats) for i, p := range inbound { - stats[p.ID().String()] = txtracker.PeerStats{Finalized: int64(1 + i)} + stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1 + i)} } for i, p := range dialed { - stats[p.ID().String()] = txtracker.PeerStats{Finalized: int64(1 + i)} + stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1 + i)} } protected := protectedPeersByPool(inbound, dialed, stats) @@ -168,15 +168,15 @@ func TestProtectedByPoolPerPoolTopN(t *testing.T) { func TestProtectedByPoolCrossCategoryOverlap(t *testing.T) { // 20 dialed peers so 0.1 * 20 = 2 protected per category. dialed := makePeers(20) - // P0: high Finalized only. P1: high RecentIncluded only. P2: high both. + // P0: high RecentFinalized only. P1: high RecentIncluded only. P2: high both. // With n=2 per category: - // Finalized winners: P2 (tie-broken-ok), P0 + // RecentFinalized winners: P2 (tie-broken-ok), P0 // RecentIncluded winners: P2, P1 // Union: {P0, P1, P2}. stats := make(map[string]txtracker.PeerStats) - stats[dialed[0].ID().String()] = txtracker.PeerStats{Finalized: 100, RecentIncluded: 0} - stats[dialed[1].ID().String()] = txtracker.PeerStats{Finalized: 0, RecentIncluded: 5.0} - stats[dialed[2].ID().String()] = txtracker.PeerStats{Finalized: 200, RecentIncluded: 10.0} + stats[dialed[0].ID().String()] = txtracker.PeerStats{RecentFinalized: 100, RecentIncluded: 0} + stats[dialed[1].ID().String()] = txtracker.PeerStats{RecentFinalized: 0, RecentIncluded: 5.0} + stats[dialed[2].ID().String()] = txtracker.PeerStats{RecentFinalized: 200, RecentIncluded: 10.0} protected := protectedPeersByPool(nil, dialed, stats) @@ -196,7 +196,7 @@ func TestProtectedByPoolCrossCategoryOverlap(t *testing.T) { // top-N must still protect the top dialed peers. func TestProtectedByPoolPerPoolIndependence(t *testing.T) { // 20 inbound, 20 dialed — frac=0.1 → 2 protected per pool per category. - // Global top-4 of Finalized would be inbound[16..19] — zero dialed. + // Global top-4 of RecentFinalized would be inbound[16..19] — zero dialed. inbound := makePeers(20) dialed := make([]*p2p.Peer, 20) for i := range dialed { @@ -206,15 +206,15 @@ func TestProtectedByPoolPerPoolIndependence(t *testing.T) { stats := make(map[string]txtracker.PeerStats) // Every inbound peer outscores every dialed peer. for i, p := range inbound { - stats[p.ID().String()] = txtracker.PeerStats{Finalized: int64(1000 + i)} + stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1000 + i)} } for i, p := range dialed { - stats[p.ID().String()] = txtracker.PeerStats{Finalized: int64(1 + i)} + stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1 + i)} } protected := protectedPeersByPool(inbound, dialed, stats) - // Per-pool top-2 of Finalized: + // Per-pool top-2 of RecentFinalized: // inbound: inbound[18], inbound[19] // dialed: dialed[18], dialed[19] // Global top-N would contain zero dialed peers, so asserting the top diff --git a/eth/txtracker/tracker.go b/eth/txtracker/tracker.go index b5ebc5ba7e..93797cad9d 100644 --- a/eth/txtracker/tracker.go +++ b/eth/txtracker/tracker.go @@ -41,12 +41,17 @@ const ( maxTracked = 262144 // EMA smoothing factor for per-block inclusion rate. emaAlpha = 0.05 + // EMA smoothing factor for per-block finalization rate. Very slow on + // purpose: finalization is permanent, and the score should reflect + // sustained contribution over long windows, not recent bursts. + // Half-life ≈ 6930 chain heads (~23 hours on 12s blocks). + finalizedEMAAlpha = 0.0001 ) // PeerStats holds the per-peer inclusion data. type PeerStats struct { - Finalized int64 // Cumulative finalized inclusions attributed to this peer - RecentIncluded float64 // EMA of per-block inclusions (at chain head time) + RecentFinalized float64 // EMA of per-block finalization credits (slow) + RecentIncluded float64 // EMA of per-block inclusions (fast) } // Chain is the blockchain interface needed by the tracker. @@ -58,8 +63,8 @@ type Chain interface { } type peerStats struct { - finalized int64 - recentIncluded float64 + recentFinalized float64 + recentIncluded float64 } // Tracker records which peer delivered each transaction and credits peers @@ -159,8 +164,8 @@ func (t *Tracker) GetAllPeerStats() map[string]PeerStats { result := make(map[string]PeerStats, len(t.peers)) for id, ps := range t.peers { result[id] = PeerStats{ - Finalized: ps.finalized, - RecentIncluded: ps.recentIncluded, + RecentFinalized: ps.recentFinalized, + RecentIncluded: ps.recentIncluded, } } return result @@ -195,37 +200,41 @@ func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) { t.mu.Lock() defer t.mu.Unlock() - // Count per-peer inclusions in this block for the EMA. + // Count per-peer inclusions in this block for the inclusion EMA. blockIncl := make(map[string]int) for _, tx := range block.Transactions() { if peer := t.txs[tx.Hash()]; peer != "" { blockIncl[peer]++ } } - // Only credit peers that are still tracked (not disconnected). + // Accumulate per-peer finalization credits over the newly-finalized + // range (possibly zero blocks). Only counts peers still tracked. + blockFinal := t.collectFinalizationCredits() + + // Update both EMAs for all tracked peers (decays inactive ones). // Don't create entries for unknown peers — they may have been // removed by NotifyPeerDrop and should not be resurrected. - // Update EMA for all tracked peers (decay inactive ones). for peer, ps := range t.peers { ps.recentIncluded = (1-emaAlpha)*ps.recentIncluded + emaAlpha*float64(blockIncl[peer]) + ps.recentFinalized = (1-finalizedEMAAlpha)*ps.recentFinalized + finalizedEMAAlpha*float64(blockFinal[peer]) } - // Check if the finalized block has advanced. - t.checkFinalization() } -// checkFinalization credits peers for transactions in newly finalized blocks. -// Must be called with t.mu held. -func (t *Tracker) checkFinalization() { +// collectFinalizationCredits accumulates per-peer finalization credits for +// blocks newly finalized since lastFinalNum. Returns a (possibly empty) map +// keyed by peer ID; advances lastFinalNum. Must be called with t.mu held. +// Peers that have already been removed by NotifyPeerDrop are skipped so +// dropped peers are not resurrected by old on-chain data. +func (t *Tracker) collectFinalizationCredits() map[string]int { + credits := make(map[string]int) finalHeader := t.chain.CurrentFinalBlock() if finalHeader == nil { - return + return credits } finalNum := finalHeader.Number.Uint64() if finalNum <= t.lastFinalNum { - return + return credits } - // Credit peers for all blocks from lastFinalNum+1 to finalNum. - var credited int for num := t.lastFinalNum + 1; num <= finalNum; num++ { block := t.chain.GetBlockByNumber(num) if block == nil { @@ -236,17 +245,24 @@ func (t *Tracker) checkFinalization() { if peer == "" { continue } - ps := t.peers[peer] - if ps == nil { + if _, ok := t.peers[peer]; !ok { continue // peer disconnected, skip credit } - ps.finalized++ - credited++ + credits[peer]++ } } - if credited > 0 { - log.Trace("Credited peers for finalized inclusions", - "from", t.lastFinalNum+1, "to", finalNum, "txs", credited) + if total := sumCounts(credits); total > 0 { + log.Trace("Accumulated finalization credits", + "from", t.lastFinalNum+1, "to", finalNum, "txs", total) } t.lastFinalNum = finalNum + return credits +} + +func sumCounts(m map[string]int) int { + var sum int + for _, v := range m { + sum += v + } + return sum } diff --git a/eth/txtracker/tracker_test.go b/eth/txtracker/tracker_test.go index 013a0068b8..b572597e6e 100644 --- a/eth/txtracker/tracker_test.go +++ b/eth/txtracker/tracker_test.go @@ -174,7 +174,7 @@ func TestNotifyReceived(t *testing.T) { if !ok { t.Fatal("expected peerA entry, not found") } - if ps.Finalized != 0 || ps.RecentIncluded != 0 { + if ps.RecentFinalized != 0 || ps.RecentIncluded != 0 { t.Fatalf("expected zero stats before chain events, got %+v", ps) } @@ -245,19 +245,19 @@ func TestFinalization(t *testing.T) { // Not finalized yet. stats := tr.GetAllPeerStats() - if stats["peerA"].Finalized != 0 { - t.Fatalf("expected Finalized=0 before finalization, got %d", stats["peerA"].Finalized) + if stats["peerA"].RecentFinalized != 0 { + t.Fatalf("expected RecentFinalized=0 before finalization, got %f", stats["peerA"].RecentFinalized) } - // Finalize block 1, then send head 2 to trigger checkFinalization. + // Finalize block 1, then send head 2 to trigger the finalization EMA update. chain.setFinalBlock(1) chain.addBlock(2, nil) chain.sendHead(2) waitStep(t, tr) stats = tr.GetAllPeerStats() - if stats["peerA"].Finalized != 1 { - t.Fatalf("expected Finalized=1 after finalization, got %d", stats["peerA"].Finalized) + if stats["peerA"].RecentFinalized <= 0 { + t.Fatalf("expected RecentFinalized>0 after finalization, got %f", stats["peerA"].RecentFinalized) } } @@ -284,11 +284,11 @@ func TestMultiplePeers(t *testing.T) { waitStep(t, tr) stats := tr.GetAllPeerStats() - if stats["peerA"].Finalized != 1 { - t.Fatalf("peerA: expected Finalized=1, got %d", stats["peerA"].Finalized) + if stats["peerA"].RecentFinalized <= 0 { + t.Fatalf("peerA: expected RecentFinalized>0, got %f", stats["peerA"].RecentFinalized) } - if stats["peerB"].Finalized != 1 { - t.Fatalf("peerB: expected Finalized=1, got %d", stats["peerB"].Finalized) + if stats["peerB"].RecentFinalized <= 0 { + t.Fatalf("peerB: expected RecentFinalized>0, got %f", stats["peerB"].RecentFinalized) } } @@ -312,11 +312,11 @@ func TestFirstDelivererWins(t *testing.T) { waitStep(t, tr) stats := tr.GetAllPeerStats() - if stats["peerA"].Finalized != 1 { - t.Fatalf("peerA should be credited, got Finalized=%d", stats["peerA"].Finalized) + if stats["peerA"].RecentFinalized <= 0 { + t.Fatalf("peerA should be credited, got RecentFinalized=%f", stats["peerA"].RecentFinalized) } - if stats["peerB"].Finalized != 0 { - t.Fatalf("peerB should NOT be credited, got Finalized=%d", stats["peerB"].Finalized) + if stats["peerB"].RecentFinalized != 0 { + t.Fatalf("peerB should NOT be credited, got RecentFinalized=%f", stats["peerB"].RecentFinalized) } } @@ -340,8 +340,8 @@ func TestNoFinalizationCredit(t *testing.T) { waitStep(t, tr) stats := tr.GetAllPeerStats() - if stats["peerA"].Finalized != 0 { - t.Fatalf("expected Finalized=0 without finalization, got %d", stats["peerA"].Finalized) + if stats["peerA"].RecentFinalized != 0 { + t.Fatalf("expected RecentFinalized=0 without finalization, got %f", stats["peerA"].RecentFinalized) } } @@ -413,3 +413,44 @@ func TestReorgSafety(t *testing.T) { t.Fatalf("expected RecentIncluded>0 after canonical-A head event, got %f", got) } } + +// TestRecentFinalizedDecays verifies that the finalization EMA decays +// for a peer that earned credits in the past but has no new +// finalization activity. The decay is slow (α=0.0001), so we +// just assert monotonic decrease, not convergence to zero. +func TestRecentFinalizedDecays(t *testing.T) { + tr := New() + chain := newMockChain() + tr.Start(chain) + defer tr.Stop() + + tx := makeTx(1) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + + // Include and finalize in block 1. + chain.addBlock(1, []*types.Transaction{tx}) + chain.sendHead(1) + waitStep(t, tr) + chain.setFinalBlock(1) + chain.addBlock(2, nil) + chain.sendHead(2) + waitStep(t, tr) + + peak := tr.GetAllPeerStats()["peerA"].RecentFinalized + if peak <= 0 { + t.Fatalf("expected RecentFinalized>0 after finalization, got %f", peak) + } + + // Send many empty heads — peer contributes zero each block, + // EMA should decay monotonically. + for i := uint64(3); i <= 50; i++ { + chain.addBlock(i, nil) + chain.sendHead(i) + waitStep(t, tr) + } + + after := tr.GetAllPeerStats()["peerA"].RecentFinalized + if after >= peak { + t.Fatalf("expected RecentFinalized to decay, got %f >= peak %f", after, peak) + } +}