eth/txtracker: add minimal tracker as inclusion stats provider

Minimal txtracker that records which peer delivered each transaction
and credits peers when their transactions appear on chain. Provides
the PeerInclusionStats needed by the dropper's protection logic.

Design:
- NotifyReceived(peer, txs): records deliverer per tx hash (called
  from handler_eth.go when tx bodies arrive via P2P)
- Subscribes to ChainHeadEvent, fetches block txs, credits the
  delivering peer for each included tx
- Per-peer EMA of recent inclusions (alpha=0.05), updated every block
- LRU eviction at 262K entries to bound memory
- Mutex-based (not channel-based) for simplicity — the hot path
  (NotifyReceived) is a fast map insert

Wired into the dropper via an adapter callback in backend.go that
converts txtracker.PeerStats to the dropper's PeerInclusionStats.
This commit is contained in:
Csaba Kiraly 2026-04-10 08:30:21 +02:00
parent 5a918be50d
commit 9f2575efeb
4 changed files with 183 additions and 2 deletions

View file

@ -459,8 +459,18 @@ func (s *Ethereum) Start() error {
// Start the networking layer // Start the networking layer
s.handler.Start(s.p2pServer.MaxPeers) s.handler.Start(s.p2pServer.MaxPeers)
// Start the connection manager // Start the transaction tracker (records tx deliveries, credits peer inclusions).
s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, nil) s.handler.txTracker.Start(s.blockchain)
// Start the connection manager with inclusion-based peer protection.
s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, func() map[string]PeerInclusionStats {
stats := s.handler.txTracker.GetAllPeerStats()
result := make(map[string]PeerInclusionStats, len(stats))
for id, ps := range stats {
result[id] = PeerInclusionStats{Included: ps.Included, RecentIncluded: ps.RecentIncluded}
}
return result
})
// start log indexer // start log indexer
s.filterMaps.Start() s.filterMaps.Start()
@ -584,6 +594,7 @@ func (s *Ethereum) Stop() error {
// Stop all the peer-related stuff first. // Stop all the peer-related stuff first.
s.discmix.Close() s.discmix.Close()
s.dropper.Stop() s.dropper.Stop()
s.handler.txTracker.Stop()
s.handler.Stop() s.handler.Stop()
// Then stop everything else. // Then stop everything else.

View file

@ -36,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/fetcher" "github.com/ethereum/go-ethereum/eth/fetcher"
"github.com/ethereum/go-ethereum/eth/txtracker"
"github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
@ -123,6 +124,7 @@ type handler struct {
downloader *downloader.Downloader downloader *downloader.Downloader
txFetcher *fetcher.TxFetcher txFetcher *fetcher.TxFetcher
txTracker *txtracker.Tracker
peers *peerSet peers *peerSet
txBroadcastKey [16]byte txBroadcastKey [16]byte
@ -189,6 +191,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
return nil return nil
} }
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer) h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer)
h.txTracker = txtracker.New()
return h, nil return h, nil
} }

View file

@ -66,6 +66,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
if err != nil { if err != nil {
return fmt.Errorf("Transactions: %v", err) return fmt.Errorf("Transactions: %v", err)
} }
h.txTracker.NotifyReceived(peer.ID(), txs)
if err := handleTransactions(peer, txs, true); err != nil { if err := handleTransactions(peer, txs, true); err != nil {
return fmt.Errorf("Transactions: %v", err) return fmt.Errorf("Transactions: %v", err)
} }
@ -76,6 +77,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
if err != nil { if err != nil {
return fmt.Errorf("PooledTransactions: %v", err) return fmt.Errorf("PooledTransactions: %v", err)
} }
h.txTracker.NotifyReceived(peer.ID(), txs)
if err := handleTransactions(peer, txs, false); err != nil { if err := handleTransactions(peer, txs, false); err != nil {
return fmt.Errorf("PooledTransactions: %v", err) return fmt.Errorf("PooledTransactions: %v", err)
} }

165
eth/txtracker/tracker.go Normal file
View file

@ -0,0 +1,165 @@
// Package txtracker provides minimal per-peer transaction inclusion tracking.
// It records which peer delivered each transaction and credits peers when
// their delivered transactions are included on chain.
package txtracker
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
const (
// Maximum number of tx→deliverer mappings to retain.
maxTracked = 262144
// EMA smoothing factor for per-block inclusion rate.
emaAlpha = 0.05
)
// PeerStats holds the per-peer inclusion data.
type PeerStats struct {
Included int64 // Cumulative on-chain inclusions attributed to this peer
RecentIncluded float64 // EMA of per-block inclusions
}
// Chain is the blockchain interface needed by the tracker.
type Chain interface {
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
GetBlockByNumber(number uint64) *types.Block
}
type peerStats struct {
included int64
recentIncluded float64
}
// Tracker records which peer delivered each transaction and credits peers
// when their transactions appear on chain.
type Tracker struct {
mu sync.Mutex
txs map[common.Hash]string // hash → deliverer peer ID
peers map[string]*peerStats
order []common.Hash // insertion order for LRU eviction
chain Chain
headCh chan core.ChainHeadEvent
sub event.Subscription
quit chan struct{}
wg sync.WaitGroup
}
// New creates a new tracker.
func New() *Tracker {
return &Tracker{
txs: make(map[common.Hash]string),
peers: make(map[string]*peerStats),
quit: make(chan struct{}),
}
}
// Start begins listening for chain head events.
func (t *Tracker) Start(chain Chain) {
t.chain = chain
t.headCh = make(chan core.ChainHeadEvent, 128)
t.sub = chain.SubscribeChainHeadEvent(t.headCh)
t.wg.Add(1)
go t.loop()
}
// Stop shuts down the tracker.
func (t *Tracker) Stop() {
t.sub.Unsubscribe()
close(t.quit)
t.wg.Wait()
}
// NotifyReceived records that a peer delivered transaction bodies.
// Safe to call from any goroutine.
func (t *Tracker) NotifyReceived(peer string, txs []*types.Transaction) {
t.mu.Lock()
defer t.mu.Unlock()
for _, tx := range txs {
hash := tx.Hash()
if _, ok := t.txs[hash]; ok {
continue // already tracked, keep first deliverer
}
t.txs[hash] = peer
t.order = append(t.order, hash)
}
// Evict oldest entries if over capacity.
for len(t.txs) > maxTracked {
oldest := t.order[0]
t.order = t.order[1:]
delete(t.txs, oldest)
}
}
// GetAllPeerStats returns a snapshot of per-peer inclusion statistics.
// Safe to call from any goroutine.
func (t *Tracker) GetAllPeerStats() map[string]PeerStats {
t.mu.Lock()
defer t.mu.Unlock()
result := make(map[string]PeerStats, len(t.peers))
for id, ps := range t.peers {
result[id] = PeerStats{
Included: ps.included,
RecentIncluded: ps.recentIncluded,
}
}
return result
}
func (t *Tracker) loop() {
defer t.wg.Done()
for {
select {
case ev := <-t.headCh:
t.handleChainHead(ev)
case <-t.sub.Err():
return
case <-t.quit:
return
}
}
}
func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) {
block := t.chain.GetBlockByNumber(ev.Header.Number.Uint64())
if block == nil {
return
}
t.mu.Lock()
defer t.mu.Unlock()
// Credit delivering peers for each included transaction.
blockIncl := make(map[string]int)
for _, tx := range block.Transactions() {
hash := tx.Hash()
peer, ok := t.txs[hash]
if !ok || peer == "" {
continue
}
ps := t.peers[peer]
if ps == nil {
ps = &peerStats{}
t.peers[peer] = ps
}
ps.included++
blockIncl[peer]++
}
// Update per-peer recent-inclusion EMA for all tracked peers.
for peer, ps := range t.peers {
ps.recentIncluded = (1-emaAlpha)*ps.recentIncluded + emaAlpha*float64(blockIncl[peer])
}
if len(blockIncl) > 0 {
log.Trace("Credited peers for block inclusions", "block", ev.Header.Number, "peers", len(blockIncl))
}
}