eth: use finalized count for total protection, keep EMA on inclusions

Change the long-term protection category from total inclusions to
total finalized inclusions. Finalized txs are harder to game (require
actual block finality, not just inclusion) and represent confirmed
on-chain value.

The recent-inclusion EMA stays on chain head inclusions for
responsiveness — a peer delivering txs that appear in the latest
blocks gets quick protection without waiting for finalization.

The tracker now checks CurrentFinalBlock() on each chain head event
and credits delivering peers for all newly finalized blocks since
the last check.
This commit is contained in:
Csaba Kiraly 2026-04-10 08:56:32 +02:00
parent 9f2575efeb
commit 98ffc7bd37
3 changed files with 58 additions and 26 deletions

View file

@ -467,7 +467,7 @@ func (s *Ethereum) Start() error {
stats := s.handler.txTracker.GetAllPeerStats() stats := s.handler.txTracker.GetAllPeerStats()
result := make(map[string]PeerInclusionStats, len(stats)) result := make(map[string]PeerInclusionStats, len(stats))
for id, ps := range stats { for id, ps := range stats {
result[id] = PeerInclusionStats{Included: ps.Included, RecentIncluded: ps.RecentIncluded} result[id] = PeerInclusionStats{Finalized: ps.Finalized, RecentIncluded: ps.RecentIncluded}
} }
return result return result
}) })

View file

@ -61,7 +61,7 @@ var (
// to decide which peers to protect. Any stats provider (e.g. txtracker) can // to decide which peers to protect. Any stats provider (e.g. txtracker) can
// implement getPeerInclusionStatsFunc by returning this struct per peer ID. // implement getPeerInclusionStatsFunc by returning this struct per peer ID.
type PeerInclusionStats struct { type PeerInclusionStats struct {
Included int64 // Cumulative on-chain inclusions attributed to this peer Finalized int64 // Cumulative finalized inclusions attributed to this peer
RecentIncluded float64 // EMA of per-block inclusions (0 if not tracked) RecentIncluded float64 // EMA of per-block inclusions (0 if not tracked)
} }
@ -79,7 +79,7 @@ type protectionCategory struct {
// protectionCategories is the list of protection criteria. Each category // protectionCategories is the list of protection criteria. Each category
// independently selects its top-N peers per pool; the union is protected. // independently selects its top-N peers per pool; the union is protected.
var protectionCategories = []protectionCategory{ var protectionCategories = []protectionCategory{
{"total-included", func(s PeerInclusionStats) float64 { return float64(s.Included) }, inclusionProtectionFrac}, {"total-finalized", func(s PeerInclusionStats) float64 { return float64(s.Finalized) }, inclusionProtectionFrac},
{"recent-included", func(s PeerInclusionStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac}, {"recent-included", func(s PeerInclusionStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac},
} }

View file

@ -22,18 +22,19 @@ const (
// PeerStats holds the per-peer inclusion data. // PeerStats holds the per-peer inclusion data.
type PeerStats struct { type PeerStats struct {
Included int64 // Cumulative on-chain inclusions attributed to this peer Finalized int64 // Cumulative finalized inclusions attributed to this peer
RecentIncluded float64 // EMA of per-block inclusions RecentIncluded float64 // EMA of per-block inclusions (at chain head time)
} }
// Chain is the blockchain interface needed by the tracker. // Chain is the blockchain interface needed by the tracker.
type Chain interface { type Chain interface {
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
GetBlockByNumber(number uint64) *types.Block GetBlockByNumber(number uint64) *types.Block
CurrentFinalBlock() *types.Header
} }
type peerStats struct { type peerStats struct {
included int64 finalized int64
recentIncluded float64 recentIncluded float64
} }
@ -45,9 +46,10 @@ type Tracker struct {
peers map[string]*peerStats peers map[string]*peerStats
order []common.Hash // insertion order for LRU eviction order []common.Hash // insertion order for LRU eviction
chain Chain chain Chain
headCh chan core.ChainHeadEvent lastFinalNum uint64 // last finalized block number processed
sub event.Subscription headCh chan core.ChainHeadEvent
sub event.Subscription
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -109,7 +111,7 @@ func (t *Tracker) GetAllPeerStats() map[string]PeerStats {
result := make(map[string]PeerStats, len(t.peers)) result := make(map[string]PeerStats, len(t.peers))
for id, ps := range t.peers { for id, ps := range t.peers {
result[id] = PeerStats{ result[id] = PeerStats{
Included: ps.included, Finalized: ps.finalized,
RecentIncluded: ps.recentIncluded, RecentIncluded: ps.recentIncluded,
} }
} }
@ -132,6 +134,7 @@ func (t *Tracker) loop() {
} }
func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) { func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) {
// Update recent-inclusion EMA from the new head block.
block := t.chain.GetBlockByNumber(ev.Header.Number.Uint64()) block := t.chain.GetBlockByNumber(ev.Header.Number.Uint64())
if block == nil { if block == nil {
return return
@ -139,27 +142,56 @@ func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
// Credit delivering peers for each included transaction. // Count per-peer inclusions in this block for the EMA.
blockIncl := make(map[string]int) blockIncl := make(map[string]int)
for _, tx := range block.Transactions() { for _, tx := range block.Transactions() {
hash := tx.Hash() if peer := t.txs[tx.Hash()]; peer != "" {
peer, ok := t.txs[hash] blockIncl[peer]++
if !ok || peer == "" {
continue
} }
ps := t.peers[peer]
if ps == nil {
ps = &peerStats{}
t.peers[peer] = ps
}
ps.included++
blockIncl[peer]++
} }
// Update per-peer recent-inclusion EMA for all tracked peers. // Update EMA for all tracked peers (decay inactive ones).
for peer, ps := range t.peers { for peer, ps := range t.peers {
ps.recentIncluded = (1-emaAlpha)*ps.recentIncluded + emaAlpha*float64(blockIncl[peer]) ps.recentIncluded = (1-emaAlpha)*ps.recentIncluded + emaAlpha*float64(blockIncl[peer])
} }
if len(blockIncl) > 0 { // Check if the finalized block has advanced.
log.Trace("Credited peers for block inclusions", "block", ev.Header.Number, "peers", len(blockIncl)) t.checkFinalization()
} }
// checkFinalization credits peers for transactions in newly finalized blocks.
// Must be called with t.mu held.
func (t *Tracker) checkFinalization() {
finalHeader := t.chain.CurrentFinalBlock()
if finalHeader == nil {
return
}
finalNum := finalHeader.Number.Uint64()
if finalNum <= t.lastFinalNum {
return
}
// Credit peers for all blocks from lastFinalNum+1 to finalNum.
var credited int
for num := t.lastFinalNum + 1; num <= finalNum; num++ {
block := t.chain.GetBlockByNumber(num)
if block == nil {
continue
}
for _, tx := range block.Transactions() {
peer := t.txs[tx.Hash()]
if peer == "" {
continue
}
ps := t.peers[peer]
if ps == nil {
ps = &peerStats{}
t.peers[peer] = ps
}
ps.finalized++
credited++
}
}
if credited > 0 {
log.Trace("Credited peers for finalized inclusions",
"from", t.lastFinalNum+1, "to", finalNum, "txs", credited)
}
t.lastFinalNum = finalNum
} }