diff --git a/eth/backend.go b/eth/backend.go index 5dcc2f03f9..d5ec34e21d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -467,7 +467,7 @@ func (s *Ethereum) Start() error { stats := s.handler.txTracker.GetAllPeerStats() result := make(map[string]PeerInclusionStats, len(stats)) for id, ps := range stats { - result[id] = PeerInclusionStats{Included: ps.Included, RecentIncluded: ps.RecentIncluded} + result[id] = PeerInclusionStats{Finalized: ps.Finalized, RecentIncluded: ps.RecentIncluded} } return result }) diff --git a/eth/dropper.go b/eth/dropper.go index 31da9301d8..e8710c3ec7 100644 --- a/eth/dropper.go +++ b/eth/dropper.go @@ -61,7 +61,7 @@ var ( // to decide which peers to protect. Any stats provider (e.g. txtracker) can // implement getPeerInclusionStatsFunc by returning this struct per peer ID. type PeerInclusionStats struct { - Included int64 // Cumulative on-chain inclusions attributed to this peer + Finalized int64 // Cumulative finalized inclusions attributed to this peer RecentIncluded float64 // EMA of per-block inclusions (0 if not tracked) } @@ -79,7 +79,7 @@ type protectionCategory struct { // protectionCategories is the list of protection criteria. Each category // independently selects its top-N peers per pool; the union is protected. var protectionCategories = []protectionCategory{ - {"total-included", func(s PeerInclusionStats) float64 { return float64(s.Included) }, inclusionProtectionFrac}, + {"total-finalized", func(s PeerInclusionStats) float64 { return float64(s.Finalized) }, inclusionProtectionFrac}, {"recent-included", func(s PeerInclusionStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac}, } diff --git a/eth/txtracker/tracker.go b/eth/txtracker/tracker.go index 8383970810..5fc479a49c 100644 --- a/eth/txtracker/tracker.go +++ b/eth/txtracker/tracker.go @@ -22,18 +22,19 @@ const ( // PeerStats holds the per-peer inclusion data. type PeerStats struct { - Included int64 // Cumulative on-chain inclusions attributed to this peer - RecentIncluded float64 // EMA of per-block inclusions + Finalized int64 // Cumulative finalized inclusions attributed to this peer + RecentIncluded float64 // EMA of per-block inclusions (at chain head time) } // Chain is the blockchain interface needed by the tracker. type Chain interface { SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription GetBlockByNumber(number uint64) *types.Block + CurrentFinalBlock() *types.Header } type peerStats struct { - included int64 + finalized int64 recentIncluded float64 } @@ -45,9 +46,10 @@ type Tracker struct { peers map[string]*peerStats order []common.Hash // insertion order for LRU eviction - chain Chain - headCh chan core.ChainHeadEvent - sub event.Subscription + chain Chain + lastFinalNum uint64 // last finalized block number processed + headCh chan core.ChainHeadEvent + sub event.Subscription quit chan struct{} wg sync.WaitGroup @@ -109,7 +111,7 @@ func (t *Tracker) GetAllPeerStats() map[string]PeerStats { result := make(map[string]PeerStats, len(t.peers)) for id, ps := range t.peers { result[id] = PeerStats{ - Included: ps.included, + Finalized: ps.finalized, RecentIncluded: ps.recentIncluded, } } @@ -132,6 +134,7 @@ func (t *Tracker) loop() { } func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) { + // Update recent-inclusion EMA from the new head block. block := t.chain.GetBlockByNumber(ev.Header.Number.Uint64()) if block == nil { return @@ -139,27 +142,56 @@ func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) { t.mu.Lock() defer t.mu.Unlock() - // Credit delivering peers for each included transaction. + // Count per-peer inclusions in this block for the EMA. blockIncl := make(map[string]int) for _, tx := range block.Transactions() { - hash := tx.Hash() - peer, ok := t.txs[hash] - if !ok || peer == "" { - continue + if peer := t.txs[tx.Hash()]; peer != "" { + blockIncl[peer]++ } - ps := t.peers[peer] - if ps == nil { - ps = &peerStats{} - t.peers[peer] = ps - } - ps.included++ - blockIncl[peer]++ } - // Update per-peer recent-inclusion EMA for all tracked peers. + // Update EMA for all tracked peers (decay inactive ones). for peer, ps := range t.peers { ps.recentIncluded = (1-emaAlpha)*ps.recentIncluded + emaAlpha*float64(blockIncl[peer]) } - if len(blockIncl) > 0 { - log.Trace("Credited peers for block inclusions", "block", ev.Header.Number, "peers", len(blockIncl)) - } + // Check if the finalized block has advanced. + t.checkFinalization() +} + +// checkFinalization credits peers for transactions in newly finalized blocks. +// Must be called with t.mu held. +func (t *Tracker) checkFinalization() { + finalHeader := t.chain.CurrentFinalBlock() + if finalHeader == nil { + return + } + finalNum := finalHeader.Number.Uint64() + if finalNum <= t.lastFinalNum { + return + } + // Credit peers for all blocks from lastFinalNum+1 to finalNum. + var credited int + for num := t.lastFinalNum + 1; num <= finalNum; num++ { + block := t.chain.GetBlockByNumber(num) + if block == nil { + continue + } + for _, tx := range block.Transactions() { + peer := t.txs[tx.Hash()] + if peer == "" { + continue + } + ps := t.peers[peer] + if ps == nil { + ps = &peerStats{} + t.peers[peer] = ps + } + ps.finalized++ + credited++ + } + } + if credited > 0 { + log.Trace("Credited peers for finalized inclusions", + "from", t.lastFinalNum+1, "to", finalNum, "txs", credited) + } + t.lastFinalNum = finalNum }