From a1a5d7332445fe4b25f4a72615628225b6de2579 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 10 Apr 2026 10:33:36 +0200 Subject: [PATCH] eth: only record deliverers for pool-accepted transactions NotifyReceived was called before pool validation, allowing a peer to claim deliverer credit by replaying already-included txs or sending invalid packets. Rename to NotifyAccepted (takes hashes, not full txs). Call it from a new enqueueAndTrack helper in handler_eth.go that runs after Enqueue and checks pool.Has to identify accepted txs. Only accepted txs are credited to the delivering peer. --- eth/handler_eth.go | 33 +++++++++++++++++++++++++++++---- eth/txtracker/tracker.go | 11 ++++++----- eth/txtracker/tracker_test.go | 27 ++++++++++++++++++--------- 3 files changed, 53 insertions(+), 18 deletions(-) diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 8974e4b8ab..82ecfa1fb6 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -66,28 +66,53 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { if err != nil { return fmt.Errorf("Transactions: %v", err) } - h.txTracker.NotifyReceived(peer.ID(), txs) if err := handleTransactions(peer, txs, true); err != nil { return fmt.Errorf("Transactions: %v", err) } - return h.txFetcher.Enqueue(peer.ID(), txs, false) + h.enqueueAndTrack(peer.ID(), txs, false) + return nil case *eth.PooledTransactionsPacket: txs, err := packet.List.Items() if err != nil { return fmt.Errorf("PooledTransactions: %v", err) } - h.txTracker.NotifyReceived(peer.ID(), txs) if err := handleTransactions(peer, txs, false); err != nil { return fmt.Errorf("PooledTransactions: %v", err) } - return h.txFetcher.Enqueue(peer.ID(), txs, true) + h.enqueueAndTrack(peer.ID(), txs, true) + return nil default: return fmt.Errorf("unexpected eth packet type: %T", packet) } } +// enqueueAndTrack sends transactions to the fetcher for pool submission and +// notifies the tracker for any that were accepted by the pool. +func (h *ethHandler) enqueueAndTrack(peer string, txs []*types.Transaction, direct bool) { + // Collect hashes before enqueue (Enqueue may reorder/filter the slice). + hashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + hashes[i] = tx.Hash() + } + // Enqueue submits to pool via addTxs callback. After return, check + // which txs the pool now knows about (accepted, not rejected). + h.txFetcher.Enqueue(peer, txs, direct) + + // Credit the peer for txs the pool accepted. We check pool.Has + // because Enqueue doesn't return per-tx acceptance status. + var accepted []common.Hash + for _, hash := range hashes { + if h.txpool.Has(hash) { + accepted = append(accepted, hash) + } + } + if len(accepted) > 0 { + h.txTracker.NotifyAccepted(peer, accepted) + } +} + // handleTransactions marks all given transactions as known to the peer // and performs basic validations. func handleTransactions(peer *eth.Peer, list []*types.Transaction, directBroadcast bool) error { diff --git a/eth/txtracker/tracker.go b/eth/txtracker/tracker.go index 7807c8352a..beaab60977 100644 --- a/eth/txtracker/tracker.go +++ b/eth/txtracker/tracker.go @@ -1,6 +1,6 @@ // Package txtracker provides minimal per-peer transaction inclusion tracking. // -// It records which peer delivered each transaction body (via NotifyReceived) +// It records which peer delivered each accepted transaction (via NotifyAccepted) // and monitors the chain for inclusion and finalization events. When a // delivered transaction is finalized on chain, the delivering peer is // credited. A per-block exponential moving average (EMA) of inclusions @@ -91,14 +91,15 @@ func (t *Tracker) Stop() { t.wg.Wait() } -// NotifyReceived records that a peer delivered transaction bodies. +// NotifyAccepted records that a peer delivered transactions that were accepted +// by the pool. Only accepted (not rejected/duplicate) txs should be recorded +// to prevent attribution poisoning from replayed or invalid txs. // Safe to call from any goroutine. -func (t *Tracker) NotifyReceived(peer string, txs []*types.Transaction) { +func (t *Tracker) NotifyAccepted(peer string, hashes []common.Hash) { t.mu.Lock() defer t.mu.Unlock() - for _, tx := range txs { - hash := tx.Hash() + for _, hash := range hashes { if _, ok := t.txs[hash]; ok { continue // already tracked, keep first deliverer } diff --git a/eth/txtracker/tracker_test.go b/eth/txtracker/tracker_test.go index dc346f5255..35c315ac8f 100644 --- a/eth/txtracker/tracker_test.go +++ b/eth/txtracker/tracker_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" @@ -62,6 +63,14 @@ func (c *mockChain) sendHead(num uint64) { }) } +func hashTxs(txs []*types.Transaction) []common.Hash { + hashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + hashes[i] = tx.Hash() + } + return hashes +} + func makeTx(nonce uint64) *types.Transaction { return types.NewTx(&types.LegacyTx{Nonce: nonce, GasPrice: big.NewInt(1), Gas: 21000}) } @@ -78,7 +87,7 @@ func TestNotifyReceived(t *testing.T) { defer tr.Stop() txs := []*types.Transaction{makeTx(1), makeTx(2), makeTx(3)} - tr.NotifyReceived("peerA", txs) + tr.NotifyAccepted("peerA", hashTxs(txs)) // No chain events yet — stats should be empty. stats := tr.GetAllPeerStats() @@ -94,7 +103,7 @@ func TestInclusionEMA(t *testing.T) { defer tr.Stop() tx := makeTx(1) - tr.NotifyReceived("peerA", []*types.Transaction{tx}) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) // Block 1 includes peerA's tx. chain.addBlock(1, []*types.Transaction{tx}) @@ -125,7 +134,7 @@ func TestFinalization(t *testing.T) { defer tr.Stop() tx := makeTx(1) - tr.NotifyReceived("peerA", []*types.Transaction{tx}) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) // Include in block 1. chain.addBlock(1, []*types.Transaction{tx}) @@ -158,8 +167,8 @@ func TestMultiplePeers(t *testing.T) { tx1 := makeTx(1) tx2 := makeTx(2) - tr.NotifyReceived("peerA", []*types.Transaction{tx1}) - tr.NotifyReceived("peerB", []*types.Transaction{tx2}) + tr.NotifyAccepted("peerA", []common.Hash{tx1.Hash()}) + tr.NotifyAccepted("peerB", []common.Hash{tx2.Hash()}) // Both included in block 1. chain.addBlock(1, []*types.Transaction{tx1, tx2}) @@ -188,8 +197,8 @@ func TestFirstDelivererWins(t *testing.T) { defer tr.Stop() tx := makeTx(1) - tr.NotifyReceived("peerA", []*types.Transaction{tx}) - tr.NotifyReceived("peerB", []*types.Transaction{tx}) // duplicate, should be ignored + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) + tr.NotifyAccepted("peerB", []common.Hash{tx.Hash()}) // duplicate, should be ignored chain.addBlock(1, []*types.Transaction{tx}) chain.sendHead(1) @@ -216,7 +225,7 @@ func TestNoFinalizationCredit(t *testing.T) { defer tr.Stop() tx := makeTx(1) - tr.NotifyReceived("peerA", []*types.Transaction{tx}) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) // Include but don't finalize. chain.addBlock(1, []*types.Transaction{tx}) @@ -241,7 +250,7 @@ func TestEMADecay(t *testing.T) { defer tr.Stop() tx := makeTx(1) - tr.NotifyReceived("peerA", []*types.Transaction{tx}) + tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()}) // Include in block 1. chain.addBlock(1, []*types.Transaction{tx})