eth: only record deliverers for pool-accepted transactions

NotifyReceived was called before pool validation, allowing a peer
to claim deliverer credit by replaying already-included txs or
sending invalid packets.

Rename to NotifyAccepted (takes hashes, not full txs). Call it from
a new enqueueAndTrack helper in handler_eth.go that runs after
Enqueue and checks pool.Has to identify accepted txs. Only accepted
txs are credited to the delivering peer.
This commit is contained in:
Csaba Kiraly 2026-04-10 10:33:36 +02:00
parent e99330b2bc
commit a1a5d73324
3 changed files with 53 additions and 18 deletions

View file

@ -66,28 +66,53 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
if err != nil { if err != nil {
return fmt.Errorf("Transactions: %v", err) return fmt.Errorf("Transactions: %v", err)
} }
h.txTracker.NotifyReceived(peer.ID(), txs)
if err := handleTransactions(peer, txs, true); err != nil { if err := handleTransactions(peer, txs, true); err != nil {
return fmt.Errorf("Transactions: %v", err) return fmt.Errorf("Transactions: %v", err)
} }
return h.txFetcher.Enqueue(peer.ID(), txs, false) h.enqueueAndTrack(peer.ID(), txs, false)
return nil
case *eth.PooledTransactionsPacket: case *eth.PooledTransactionsPacket:
txs, err := packet.List.Items() txs, err := packet.List.Items()
if err != nil { if err != nil {
return fmt.Errorf("PooledTransactions: %v", err) return fmt.Errorf("PooledTransactions: %v", err)
} }
h.txTracker.NotifyReceived(peer.ID(), txs)
if err := handleTransactions(peer, txs, false); err != nil { if err := handleTransactions(peer, txs, false); err != nil {
return fmt.Errorf("PooledTransactions: %v", err) return fmt.Errorf("PooledTransactions: %v", err)
} }
return h.txFetcher.Enqueue(peer.ID(), txs, true) h.enqueueAndTrack(peer.ID(), txs, true)
return nil
default: default:
return fmt.Errorf("unexpected eth packet type: %T", packet) return fmt.Errorf("unexpected eth packet type: %T", packet)
} }
} }
// enqueueAndTrack sends transactions to the fetcher for pool submission and
// notifies the tracker for any that were accepted by the pool.
func (h *ethHandler) enqueueAndTrack(peer string, txs []*types.Transaction, direct bool) {
// Collect hashes before enqueue (Enqueue may reorder/filter the slice).
hashes := make([]common.Hash, len(txs))
for i, tx := range txs {
hashes[i] = tx.Hash()
}
// Enqueue submits to pool via addTxs callback. After return, check
// which txs the pool now knows about (accepted, not rejected).
h.txFetcher.Enqueue(peer, txs, direct)
// Credit the peer for txs the pool accepted. We check pool.Has
// because Enqueue doesn't return per-tx acceptance status.
var accepted []common.Hash
for _, hash := range hashes {
if h.txpool.Has(hash) {
accepted = append(accepted, hash)
}
}
if len(accepted) > 0 {
h.txTracker.NotifyAccepted(peer, accepted)
}
}
// handleTransactions marks all given transactions as known to the peer // handleTransactions marks all given transactions as known to the peer
// and performs basic validations. // and performs basic validations.
func handleTransactions(peer *eth.Peer, list []*types.Transaction, directBroadcast bool) error { func handleTransactions(peer *eth.Peer, list []*types.Transaction, directBroadcast bool) error {

View file

@ -1,6 +1,6 @@
// Package txtracker provides minimal per-peer transaction inclusion tracking. // Package txtracker provides minimal per-peer transaction inclusion tracking.
// //
// It records which peer delivered each transaction body (via NotifyReceived) // It records which peer delivered each accepted transaction (via NotifyAccepted)
// and monitors the chain for inclusion and finalization events. When a // and monitors the chain for inclusion and finalization events. When a
// delivered transaction is finalized on chain, the delivering peer is // delivered transaction is finalized on chain, the delivering peer is
// credited. A per-block exponential moving average (EMA) of inclusions // credited. A per-block exponential moving average (EMA) of inclusions
@ -91,14 +91,15 @@ func (t *Tracker) Stop() {
t.wg.Wait() t.wg.Wait()
} }
// NotifyReceived records that a peer delivered transaction bodies. // NotifyAccepted records that a peer delivered transactions that were accepted
// by the pool. Only accepted (not rejected/duplicate) txs should be recorded
// to prevent attribution poisoning from replayed or invalid txs.
// Safe to call from any goroutine. // Safe to call from any goroutine.
func (t *Tracker) NotifyReceived(peer string, txs []*types.Transaction) { func (t *Tracker) NotifyAccepted(peer string, hashes []common.Hash) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
for _, tx := range txs { for _, hash := range hashes {
hash := tx.Hash()
if _, ok := t.txs[hash]; ok { if _, ok := t.txs[hash]; ok {
continue // already tracked, keep first deliverer continue // already tracked, keep first deliverer
} }

View file

@ -6,6 +6,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
@ -62,6 +63,14 @@ func (c *mockChain) sendHead(num uint64) {
}) })
} }
func hashTxs(txs []*types.Transaction) []common.Hash {
hashes := make([]common.Hash, len(txs))
for i, tx := range txs {
hashes[i] = tx.Hash()
}
return hashes
}
func makeTx(nonce uint64) *types.Transaction { func makeTx(nonce uint64) *types.Transaction {
return types.NewTx(&types.LegacyTx{Nonce: nonce, GasPrice: big.NewInt(1), Gas: 21000}) return types.NewTx(&types.LegacyTx{Nonce: nonce, GasPrice: big.NewInt(1), Gas: 21000})
} }
@ -78,7 +87,7 @@ func TestNotifyReceived(t *testing.T) {
defer tr.Stop() defer tr.Stop()
txs := []*types.Transaction{makeTx(1), makeTx(2), makeTx(3)} txs := []*types.Transaction{makeTx(1), makeTx(2), makeTx(3)}
tr.NotifyReceived("peerA", txs) tr.NotifyAccepted("peerA", hashTxs(txs))
// No chain events yet — stats should be empty. // No chain events yet — stats should be empty.
stats := tr.GetAllPeerStats() stats := tr.GetAllPeerStats()
@ -94,7 +103,7 @@ func TestInclusionEMA(t *testing.T) {
defer tr.Stop() defer tr.Stop()
tx := makeTx(1) tx := makeTx(1)
tr.NotifyReceived("peerA", []*types.Transaction{tx}) tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()})
// Block 1 includes peerA's tx. // Block 1 includes peerA's tx.
chain.addBlock(1, []*types.Transaction{tx}) chain.addBlock(1, []*types.Transaction{tx})
@ -125,7 +134,7 @@ func TestFinalization(t *testing.T) {
defer tr.Stop() defer tr.Stop()
tx := makeTx(1) tx := makeTx(1)
tr.NotifyReceived("peerA", []*types.Transaction{tx}) tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()})
// Include in block 1. // Include in block 1.
chain.addBlock(1, []*types.Transaction{tx}) chain.addBlock(1, []*types.Transaction{tx})
@ -158,8 +167,8 @@ func TestMultiplePeers(t *testing.T) {
tx1 := makeTx(1) tx1 := makeTx(1)
tx2 := makeTx(2) tx2 := makeTx(2)
tr.NotifyReceived("peerA", []*types.Transaction{tx1}) tr.NotifyAccepted("peerA", []common.Hash{tx1.Hash()})
tr.NotifyReceived("peerB", []*types.Transaction{tx2}) tr.NotifyAccepted("peerB", []common.Hash{tx2.Hash()})
// Both included in block 1. // Both included in block 1.
chain.addBlock(1, []*types.Transaction{tx1, tx2}) chain.addBlock(1, []*types.Transaction{tx1, tx2})
@ -188,8 +197,8 @@ func TestFirstDelivererWins(t *testing.T) {
defer tr.Stop() defer tr.Stop()
tx := makeTx(1) tx := makeTx(1)
tr.NotifyReceived("peerA", []*types.Transaction{tx}) tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()})
tr.NotifyReceived("peerB", []*types.Transaction{tx}) // duplicate, should be ignored tr.NotifyAccepted("peerB", []common.Hash{tx.Hash()}) // duplicate, should be ignored
chain.addBlock(1, []*types.Transaction{tx}) chain.addBlock(1, []*types.Transaction{tx})
chain.sendHead(1) chain.sendHead(1)
@ -216,7 +225,7 @@ func TestNoFinalizationCredit(t *testing.T) {
defer tr.Stop() defer tr.Stop()
tx := makeTx(1) tx := makeTx(1)
tr.NotifyReceived("peerA", []*types.Transaction{tx}) tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()})
// Include but don't finalize. // Include but don't finalize.
chain.addBlock(1, []*types.Transaction{tx}) chain.addBlock(1, []*types.Transaction{tx})
@ -241,7 +250,7 @@ func TestEMADecay(t *testing.T) {
defer tr.Stop() defer tr.Stop()
tx := makeTx(1) tx := makeTx(1)
tr.NotifyReceived("peerA", []*types.Transaction{tx}) tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()})
// Include in block 1. // Include in block 1.
chain.addBlock(1, []*types.Transaction{tx}) chain.addBlock(1, []*types.Transaction{tx})