From 9f2575efeb7fbc4e6d1b1d22004081d396ac1ab2 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 10 Apr 2026 08:30:21 +0200 Subject: [PATCH] eth/txtracker: add minimal tracker as inclusion stats provider MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Minimal txtracker that records which peer delivered each transaction and credits peers when their transactions appear on chain. Provides the PeerInclusionStats needed by the dropper's protection logic. Design: - NotifyReceived(peer, txs): records deliverer per tx hash (called from handler_eth.go when tx bodies arrive via P2P) - Subscribes to ChainHeadEvent, fetches block txs, credits the delivering peer for each included tx - Per-peer EMA of recent inclusions (alpha=0.05), updated every block - LRU eviction at 262K entries to bound memory - Mutex-based (not channel-based) for simplicity — the hot path (NotifyReceived) is a fast map insert Wired into the dropper via an adapter callback in backend.go that converts txtracker.PeerStats to the dropper's PeerInclusionStats. --- eth/backend.go | 15 +++- eth/handler.go | 3 + eth/handler_eth.go | 2 + eth/txtracker/tracker.go | 165 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 eth/txtracker/tracker.go diff --git a/eth/backend.go b/eth/backend.go index 98aab2de00..5dcc2f03f9 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -459,8 +459,18 @@ func (s *Ethereum) Start() error { // Start the networking layer s.handler.Start(s.p2pServer.MaxPeers) - // Start the connection manager - s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, nil) + // Start the transaction tracker (records tx deliveries, credits peer inclusions). + s.handler.txTracker.Start(s.blockchain) + + // Start the connection manager with inclusion-based peer protection. + s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, func() map[string]PeerInclusionStats { + stats := s.handler.txTracker.GetAllPeerStats() + result := make(map[string]PeerInclusionStats, len(stats)) + for id, ps := range stats { + result[id] = PeerInclusionStats{Included: ps.Included, RecentIncluded: ps.RecentIncluded} + } + return result + }) // start log indexer s.filterMaps.Start() @@ -584,6 +594,7 @@ func (s *Ethereum) Stop() error { // Stop all the peer-related stuff first. s.discmix.Close() s.dropper.Stop() + s.handler.txTracker.Stop() s.handler.Stop() // Then stop everything else. diff --git a/eth/handler.go b/eth/handler.go index 27b5e60697..90d74a71bf 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -36,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/fetcher" + "github.com/ethereum/go-ethereum/eth/txtracker" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/ethdb" @@ -123,6 +124,7 @@ type handler struct { downloader *downloader.Downloader txFetcher *fetcher.TxFetcher + txTracker *txtracker.Tracker peers *peerSet txBroadcastKey [16]byte @@ -189,6 +191,7 @@ func newHandler(config *handlerConfig) (*handler, error) { return nil } h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer) + h.txTracker = txtracker.New() return h, nil } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 8704a86af4..8974e4b8ab 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -66,6 +66,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { if err != nil { return fmt.Errorf("Transactions: %v", err) } + h.txTracker.NotifyReceived(peer.ID(), txs) if err := handleTransactions(peer, txs, true); err != nil { return fmt.Errorf("Transactions: %v", err) } @@ -76,6 +77,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { if err != nil { return fmt.Errorf("PooledTransactions: %v", err) } + h.txTracker.NotifyReceived(peer.ID(), txs) if err := handleTransactions(peer, txs, false); err != nil { return fmt.Errorf("PooledTransactions: %v", err) } diff --git a/eth/txtracker/tracker.go b/eth/txtracker/tracker.go new file mode 100644 index 0000000000..8383970810 --- /dev/null +++ b/eth/txtracker/tracker.go @@ -0,0 +1,165 @@ +// Package txtracker provides minimal per-peer transaction inclusion tracking. +// It records which peer delivered each transaction and credits peers when +// their delivered transactions are included on chain. +package txtracker + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" +) + +const ( + // Maximum number of tx→deliverer mappings to retain. + maxTracked = 262144 + // EMA smoothing factor for per-block inclusion rate. + emaAlpha = 0.05 +) + +// PeerStats holds the per-peer inclusion data. +type PeerStats struct { + Included int64 // Cumulative on-chain inclusions attributed to this peer + RecentIncluded float64 // EMA of per-block inclusions +} + +// Chain is the blockchain interface needed by the tracker. +type Chain interface { + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription + GetBlockByNumber(number uint64) *types.Block +} + +type peerStats struct { + included int64 + recentIncluded float64 +} + +// Tracker records which peer delivered each transaction and credits peers +// when their transactions appear on chain. +type Tracker struct { + mu sync.Mutex + txs map[common.Hash]string // hash → deliverer peer ID + peers map[string]*peerStats + order []common.Hash // insertion order for LRU eviction + + chain Chain + headCh chan core.ChainHeadEvent + sub event.Subscription + + quit chan struct{} + wg sync.WaitGroup +} + +// New creates a new tracker. +func New() *Tracker { + return &Tracker{ + txs: make(map[common.Hash]string), + peers: make(map[string]*peerStats), + quit: make(chan struct{}), + } +} + +// Start begins listening for chain head events. +func (t *Tracker) Start(chain Chain) { + t.chain = chain + t.headCh = make(chan core.ChainHeadEvent, 128) + t.sub = chain.SubscribeChainHeadEvent(t.headCh) + t.wg.Add(1) + go t.loop() +} + +// Stop shuts down the tracker. +func (t *Tracker) Stop() { + t.sub.Unsubscribe() + close(t.quit) + t.wg.Wait() +} + +// NotifyReceived records that a peer delivered transaction bodies. +// Safe to call from any goroutine. +func (t *Tracker) NotifyReceived(peer string, txs []*types.Transaction) { + t.mu.Lock() + defer t.mu.Unlock() + + for _, tx := range txs { + hash := tx.Hash() + if _, ok := t.txs[hash]; ok { + continue // already tracked, keep first deliverer + } + t.txs[hash] = peer + t.order = append(t.order, hash) + } + // Evict oldest entries if over capacity. + for len(t.txs) > maxTracked { + oldest := t.order[0] + t.order = t.order[1:] + delete(t.txs, oldest) + } +} + +// GetAllPeerStats returns a snapshot of per-peer inclusion statistics. +// Safe to call from any goroutine. +func (t *Tracker) GetAllPeerStats() map[string]PeerStats { + t.mu.Lock() + defer t.mu.Unlock() + + result := make(map[string]PeerStats, len(t.peers)) + for id, ps := range t.peers { + result[id] = PeerStats{ + Included: ps.included, + RecentIncluded: ps.recentIncluded, + } + } + return result +} + +func (t *Tracker) loop() { + defer t.wg.Done() + + for { + select { + case ev := <-t.headCh: + t.handleChainHead(ev) + case <-t.sub.Err(): + return + case <-t.quit: + return + } + } +} + +func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) { + block := t.chain.GetBlockByNumber(ev.Header.Number.Uint64()) + if block == nil { + return + } + t.mu.Lock() + defer t.mu.Unlock() + + // Credit delivering peers for each included transaction. + blockIncl := make(map[string]int) + for _, tx := range block.Transactions() { + hash := tx.Hash() + peer, ok := t.txs[hash] + if !ok || peer == "" { + continue + } + ps := t.peers[peer] + if ps == nil { + ps = &peerStats{} + t.peers[peer] = ps + } + ps.included++ + blockIncl[peer]++ + } + // Update per-peer recent-inclusion EMA for all tracked peers. + for peer, ps := range t.peers { + ps.recentIncluded = (1-emaAlpha)*ps.recentIncluded + emaAlpha*float64(blockIncl[peer]) + } + if len(blockIncl) > 0 { + log.Trace("Credited peers for block inclusions", "block", ev.Header.Number, "peers", len(blockIncl)) + } +}