eth/txtracker: replace cumulative Finalized with slow RecentFinalized EMA

The total-finalized protection category ranked peers by a monotonic
cumulative count, so a peer that had been productive in the past kept
a high score forever — even if they had since gone silent — and held
a protected slot without contributing.

Replace txtracker.PeerStats.Finalized (int64 cumulative) with
RecentFinalized (float64 EMA). On each chain head, finalization
credits accumulated over the newly-finalized range are folded into a
slow EMA (alpha=0.0001, half-life ~6930 blocks ≈ 23 hours on 12s
mainnet blocks). Peers that continue contributing keep a high score;
peers that stop decay toward zero over roughly a day.

The dropper category renames to "recent-finalized" accordingly. The
type's docstring is rewritten to describe both categories as EMAs
with different time horizons (slow finalized, fast included).

Refactors checkFinalization to return a per-peer credits map rather
than mutating state directly, so both EMAs update in the same loop
over tracked peers.
This commit is contained in:
Csaba Kiraly 2026-04-19 12:14:23 +02:00
parent 1f2ebc5d59
commit f24161de71
4 changed files with 122 additions and 63 deletions

View file

@ -73,7 +73,7 @@ type protectionCategory struct {
// protectionCategories is the list of protection criteria. Each category
// independently selects its top-N peers per pool; the union is protected.
var protectionCategories = []protectionCategory{
{"total-finalized", func(s txtracker.PeerStats) float64 { return float64(s.Finalized) }, inclusionProtectionFrac},
{"recent-finalized", func(s txtracker.PeerStats) float64 { return s.RecentFinalized }, inclusionProtectionFrac},
{"recent-included", func(s txtracker.PeerStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac},
}
@ -89,12 +89,14 @@ var protectionCategories = []protectionCategory{
// - Trusted and static peers are never dropped.
// - Recently connected peers are also protected from dropping to give them time
// to prove their value before being at risk of disconnection.
// - Some peers are protected from dropping based on their role. This is not based
// on a unified score function, but rather on the concept of protected peer pools.
// Each pool independently selects its top fraction of peers by a specific score
// (e.g. total finalized inclusions, recent inclusion EMA); the union of all
// protected sets is shielded from random dropping, and the drop target is chosen
// randomly from the remainder.
// - Some peers are protected from dropping based on their contribution
// to the tx pool. Each pool (inbound/dialed) independently selects its
// top fraction of peers by a per-peer EMA score — a slow EMA of
// finalized inclusions (~1-day half-life, rewards sustained long-term
// contribution) and a fast EMA of recent block inclusions (rewards
// current activity). The union of all protected sets is shielded from
// random dropping, and the drop target is chosen randomly from the
// remainder.
type dropper struct {
maxDialPeers int // maximum number of dialed peers
maxInboundPeers int // maximum number of inbound peers

View file

@ -64,7 +64,7 @@ func TestProtectedPeersTopPeer(t *testing.T) {
peers := makePeers(20)
stats := make(map[string]txtracker.PeerStats)
stats[peers[0].ID().String()] = txtracker.PeerStats{Finalized: 100}
stats[peers[0].ID().String()] = txtracker.PeerStats{RecentFinalized: 100}
stats[peers[1].ID().String()] = txtracker.PeerStats{RecentIncluded: 5.0}
cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats }
@ -74,7 +74,7 @@ func TestProtectedPeersTopPeer(t *testing.T) {
t.Fatalf("expected 2 protected peers, got %d", len(protected))
}
if !protected[peers[0]] {
t.Fatal("peer 0 should be protected (top Finalized)")
t.Fatal("peer 0 should be protected (top RecentFinalized)")
}
if !protected[peers[1]] {
t.Fatal("peer 1 should be protected (top RecentIncluded)")
@ -103,7 +103,7 @@ func TestProtectedPeersOverlap(t *testing.T) {
peers := makePeers(20)
stats := make(map[string]txtracker.PeerStats)
stats[peers[0].ID().String()] = txtracker.PeerStats{Finalized: 100, RecentIncluded: 5.0}
stats[peers[0].ID().String()] = txtracker.PeerStats{RecentFinalized: 100, RecentIncluded: 5.0}
cm.peerStatsFunc = func() map[string]txtracker.PeerStats { return stats }
@ -128,7 +128,7 @@ func TestProtectedPeersNilFunc(t *testing.T) {
// independently in each of the inbound and dialed pools, not globally.
// With 10 peers per pool and inclusionProtectionFrac=0.1, exactly 1 peer
// is protected per pool per category — so 2 total (one per pool), both
// for the Finalized category since we don't set RecentIncluded.
// for the RecentFinalized category since we don't set RecentIncluded.
func TestProtectedByPoolPerPoolTopN(t *testing.T) {
inbound := makePeers(10)
dialed := makePeers(10)
@ -140,10 +140,10 @@ func TestProtectedByPoolPerPoolTopN(t *testing.T) {
// Strictly increasing scores: highest wins in each pool.
stats := make(map[string]txtracker.PeerStats)
for i, p := range inbound {
stats[p.ID().String()] = txtracker.PeerStats{Finalized: int64(1 + i)}
stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1 + i)}
}
for i, p := range dialed {
stats[p.ID().String()] = txtracker.PeerStats{Finalized: int64(1 + i)}
stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1 + i)}
}
protected := protectedPeersByPool(inbound, dialed, stats)
@ -168,15 +168,15 @@ func TestProtectedByPoolPerPoolTopN(t *testing.T) {
func TestProtectedByPoolCrossCategoryOverlap(t *testing.T) {
// 20 dialed peers so 0.1 * 20 = 2 protected per category.
dialed := makePeers(20)
// P0: high Finalized only. P1: high RecentIncluded only. P2: high both.
// P0: high RecentFinalized only. P1: high RecentIncluded only. P2: high both.
// With n=2 per category:
// Finalized winners: P2 (tie-broken-ok), P0
// RecentFinalized winners: P2 (tie-broken-ok), P0
// RecentIncluded winners: P2, P1
// Union: {P0, P1, P2}.
stats := make(map[string]txtracker.PeerStats)
stats[dialed[0].ID().String()] = txtracker.PeerStats{Finalized: 100, RecentIncluded: 0}
stats[dialed[1].ID().String()] = txtracker.PeerStats{Finalized: 0, RecentIncluded: 5.0}
stats[dialed[2].ID().String()] = txtracker.PeerStats{Finalized: 200, RecentIncluded: 10.0}
stats[dialed[0].ID().String()] = txtracker.PeerStats{RecentFinalized: 100, RecentIncluded: 0}
stats[dialed[1].ID().String()] = txtracker.PeerStats{RecentFinalized: 0, RecentIncluded: 5.0}
stats[dialed[2].ID().String()] = txtracker.PeerStats{RecentFinalized: 200, RecentIncluded: 10.0}
protected := protectedPeersByPool(nil, dialed, stats)
@ -196,7 +196,7 @@ func TestProtectedByPoolCrossCategoryOverlap(t *testing.T) {
// top-N must still protect the top dialed peers.
func TestProtectedByPoolPerPoolIndependence(t *testing.T) {
// 20 inbound, 20 dialed — frac=0.1 → 2 protected per pool per category.
// Global top-4 of Finalized would be inbound[16..19] — zero dialed.
// Global top-4 of RecentFinalized would be inbound[16..19] — zero dialed.
inbound := makePeers(20)
dialed := make([]*p2p.Peer, 20)
for i := range dialed {
@ -206,15 +206,15 @@ func TestProtectedByPoolPerPoolIndependence(t *testing.T) {
stats := make(map[string]txtracker.PeerStats)
// Every inbound peer outscores every dialed peer.
for i, p := range inbound {
stats[p.ID().String()] = txtracker.PeerStats{Finalized: int64(1000 + i)}
stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1000 + i)}
}
for i, p := range dialed {
stats[p.ID().String()] = txtracker.PeerStats{Finalized: int64(1 + i)}
stats[p.ID().String()] = txtracker.PeerStats{RecentFinalized: float64(1 + i)}
}
protected := protectedPeersByPool(inbound, dialed, stats)
// Per-pool top-2 of Finalized:
// Per-pool top-2 of RecentFinalized:
// inbound: inbound[18], inbound[19]
// dialed: dialed[18], dialed[19]
// Global top-N would contain zero dialed peers, so asserting the top

View file

@ -41,12 +41,17 @@ const (
maxTracked = 262144
// EMA smoothing factor for per-block inclusion rate.
emaAlpha = 0.05
// EMA smoothing factor for per-block finalization rate. Very slow on
// purpose: finalization is permanent, and the score should reflect
// sustained contribution over long windows, not recent bursts.
// Half-life ≈ 6930 chain heads (~23 hours on 12s blocks).
finalizedEMAAlpha = 0.0001
)
// PeerStats holds the per-peer inclusion data.
type PeerStats struct {
Finalized int64 // Cumulative finalized inclusions attributed to this peer
RecentIncluded float64 // EMA of per-block inclusions (at chain head time)
RecentFinalized float64 // EMA of per-block finalization credits (slow)
RecentIncluded float64 // EMA of per-block inclusions (fast)
}
// Chain is the blockchain interface needed by the tracker.
@ -58,8 +63,8 @@ type Chain interface {
}
type peerStats struct {
finalized int64
recentIncluded float64
recentFinalized float64
recentIncluded float64
}
// Tracker records which peer delivered each transaction and credits peers
@ -159,8 +164,8 @@ func (t *Tracker) GetAllPeerStats() map[string]PeerStats {
result := make(map[string]PeerStats, len(t.peers))
for id, ps := range t.peers {
result[id] = PeerStats{
Finalized: ps.finalized,
RecentIncluded: ps.recentIncluded,
RecentFinalized: ps.recentFinalized,
RecentIncluded: ps.recentIncluded,
}
}
return result
@ -195,37 +200,41 @@ func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) {
t.mu.Lock()
defer t.mu.Unlock()
// Count per-peer inclusions in this block for the EMA.
// Count per-peer inclusions in this block for the inclusion EMA.
blockIncl := make(map[string]int)
for _, tx := range block.Transactions() {
if peer := t.txs[tx.Hash()]; peer != "" {
blockIncl[peer]++
}
}
// Only credit peers that are still tracked (not disconnected).
// Accumulate per-peer finalization credits over the newly-finalized
// range (possibly zero blocks). Only counts peers still tracked.
blockFinal := t.collectFinalizationCredits()
// Update both EMAs for all tracked peers (decays inactive ones).
// Don't create entries for unknown peers — they may have been
// removed by NotifyPeerDrop and should not be resurrected.
// Update EMA for all tracked peers (decay inactive ones).
for peer, ps := range t.peers {
ps.recentIncluded = (1-emaAlpha)*ps.recentIncluded + emaAlpha*float64(blockIncl[peer])
ps.recentFinalized = (1-finalizedEMAAlpha)*ps.recentFinalized + finalizedEMAAlpha*float64(blockFinal[peer])
}
// Check if the finalized block has advanced.
t.checkFinalization()
}
// checkFinalization credits peers for transactions in newly finalized blocks.
// Must be called with t.mu held.
func (t *Tracker) checkFinalization() {
// collectFinalizationCredits accumulates per-peer finalization credits for
// blocks newly finalized since lastFinalNum. Returns a (possibly empty) map
// keyed by peer ID; advances lastFinalNum. Must be called with t.mu held.
// Peers that have already been removed by NotifyPeerDrop are skipped so
// dropped peers are not resurrected by old on-chain data.
func (t *Tracker) collectFinalizationCredits() map[string]int {
credits := make(map[string]int)
finalHeader := t.chain.CurrentFinalBlock()
if finalHeader == nil {
return
return credits
}
finalNum := finalHeader.Number.Uint64()
if finalNum <= t.lastFinalNum {
return
return credits
}
// Credit peers for all blocks from lastFinalNum+1 to finalNum.
var credited int
for num := t.lastFinalNum + 1; num <= finalNum; num++ {
block := t.chain.GetBlockByNumber(num)
if block == nil {
@ -236,17 +245,24 @@ func (t *Tracker) checkFinalization() {
if peer == "" {
continue
}
ps := t.peers[peer]
if ps == nil {
if _, ok := t.peers[peer]; !ok {
continue // peer disconnected, skip credit
}
ps.finalized++
credited++
credits[peer]++
}
}
if credited > 0 {
log.Trace("Credited peers for finalized inclusions",
"from", t.lastFinalNum+1, "to", finalNum, "txs", credited)
if total := sumCounts(credits); total > 0 {
log.Trace("Accumulated finalization credits",
"from", t.lastFinalNum+1, "to", finalNum, "txs", total)
}
t.lastFinalNum = finalNum
return credits
}
func sumCounts(m map[string]int) int {
var sum int
for _, v := range m {
sum += v
}
return sum
}

View file

@ -174,7 +174,7 @@ func TestNotifyReceived(t *testing.T) {
if !ok {
t.Fatal("expected peerA entry, not found")
}
if ps.Finalized != 0 || ps.RecentIncluded != 0 {
if ps.RecentFinalized != 0 || ps.RecentIncluded != 0 {
t.Fatalf("expected zero stats before chain events, got %+v", ps)
}
@ -245,19 +245,19 @@ func TestFinalization(t *testing.T) {
// Not finalized yet.
stats := tr.GetAllPeerStats()
if stats["peerA"].Finalized != 0 {
t.Fatalf("expected Finalized=0 before finalization, got %d", stats["peerA"].Finalized)
if stats["peerA"].RecentFinalized != 0 {
t.Fatalf("expected RecentFinalized=0 before finalization, got %f", stats["peerA"].RecentFinalized)
}
// Finalize block 1, then send head 2 to trigger checkFinalization.
// Finalize block 1, then send head 2 to trigger the finalization EMA update.
chain.setFinalBlock(1)
chain.addBlock(2, nil)
chain.sendHead(2)
waitStep(t, tr)
stats = tr.GetAllPeerStats()
if stats["peerA"].Finalized != 1 {
t.Fatalf("expected Finalized=1 after finalization, got %d", stats["peerA"].Finalized)
if stats["peerA"].RecentFinalized <= 0 {
t.Fatalf("expected RecentFinalized>0 after finalization, got %f", stats["peerA"].RecentFinalized)
}
}
@ -284,11 +284,11 @@ func TestMultiplePeers(t *testing.T) {
waitStep(t, tr)
stats := tr.GetAllPeerStats()
if stats["peerA"].Finalized != 1 {
t.Fatalf("peerA: expected Finalized=1, got %d", stats["peerA"].Finalized)
if stats["peerA"].RecentFinalized <= 0 {
t.Fatalf("peerA: expected RecentFinalized>0, got %f", stats["peerA"].RecentFinalized)
}
if stats["peerB"].Finalized != 1 {
t.Fatalf("peerB: expected Finalized=1, got %d", stats["peerB"].Finalized)
if stats["peerB"].RecentFinalized <= 0 {
t.Fatalf("peerB: expected RecentFinalized>0, got %f", stats["peerB"].RecentFinalized)
}
}
@ -312,11 +312,11 @@ func TestFirstDelivererWins(t *testing.T) {
waitStep(t, tr)
stats := tr.GetAllPeerStats()
if stats["peerA"].Finalized != 1 {
t.Fatalf("peerA should be credited, got Finalized=%d", stats["peerA"].Finalized)
if stats["peerA"].RecentFinalized <= 0 {
t.Fatalf("peerA should be credited, got RecentFinalized=%f", stats["peerA"].RecentFinalized)
}
if stats["peerB"].Finalized != 0 {
t.Fatalf("peerB should NOT be credited, got Finalized=%d", stats["peerB"].Finalized)
if stats["peerB"].RecentFinalized != 0 {
t.Fatalf("peerB should NOT be credited, got RecentFinalized=%f", stats["peerB"].RecentFinalized)
}
}
@ -340,8 +340,8 @@ func TestNoFinalizationCredit(t *testing.T) {
waitStep(t, tr)
stats := tr.GetAllPeerStats()
if stats["peerA"].Finalized != 0 {
t.Fatalf("expected Finalized=0 without finalization, got %d", stats["peerA"].Finalized)
if stats["peerA"].RecentFinalized != 0 {
t.Fatalf("expected RecentFinalized=0 without finalization, got %f", stats["peerA"].RecentFinalized)
}
}
@ -413,3 +413,44 @@ func TestReorgSafety(t *testing.T) {
t.Fatalf("expected RecentIncluded>0 after canonical-A head event, got %f", got)
}
}
// TestRecentFinalizedDecays verifies that the finalization EMA decays
// for a peer that earned credits in the past but has no new
// finalization activity. The decay is slow (α=0.0001), so we
// just assert monotonic decrease, not convergence to zero.
func TestRecentFinalizedDecays(t *testing.T) {
tr := New()
chain := newMockChain()
tr.Start(chain)
defer tr.Stop()
tx := makeTx(1)
tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()})
// Include and finalize in block 1.
chain.addBlock(1, []*types.Transaction{tx})
chain.sendHead(1)
waitStep(t, tr)
chain.setFinalBlock(1)
chain.addBlock(2, nil)
chain.sendHead(2)
waitStep(t, tr)
peak := tr.GetAllPeerStats()["peerA"].RecentFinalized
if peak <= 0 {
t.Fatalf("expected RecentFinalized>0 after finalization, got %f", peak)
}
// Send many empty heads — peer contributes zero each block,
// EMA should decay monotonically.
for i := uint64(3); i <= 50; i++ {
chain.addBlock(i, nil)
chain.sendHead(i)
waitStep(t, tr)
}
after := tr.GetAllPeerStats()["peerA"].RecentFinalized
if after >= peak {
t.Fatalf("expected RecentFinalized to decay, got %f >= peak %f", after, peak)
}
}