eth/fetcher: add onAccepted callback to fix attribution race

enqueueAndTrack used pool.Has() after Enqueue to determine accepted
txs. Under concurrent delivery of the same tx from two peers, both
could see Has()==true, making attribution non-deterministic.

Add an onAccepted callback to the fetcher, called from Enqueue with
(peer, acceptedHashes) immediately after pool.Add returns for each
batch. Attribution happens atomically inside Enqueue using the per-tx
error from addTxs (nil = accepted), before another goroutine can
race.

Remove the enqueueAndTrack helper from handler_eth.go — the fetcher
now handles notification directly.
This commit is contained in:
Csaba Kiraly 2026-04-10 11:18:25 +02:00
parent f66323d768
commit 47c603388a
4 changed files with 21 additions and 38 deletions

View file

@ -181,9 +181,10 @@ type TxFetcher struct {
// Callbacks
validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation
onAccepted func(peer string, hashes []common.Hash) // Optional: notified with accepted tx hashes per peer
step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Monotonic clock or simulated clock for tests
@ -194,15 +195,15 @@ type TxFetcher struct {
// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
// Chain can be nil to disable on-chain checks.
func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil)
func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash)) *TxFetcher {
return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, mclock.System{}, time.Now, nil)
}
// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
// Chain can be nil to disable on-chain checks.
func NewTxFetcherForTests(
chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash),
clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
@ -224,6 +225,7 @@ func NewTxFetcherForTests(
addTxs: addTxs,
fetchTxs: fetchTxs,
dropPeer: dropPeer,
onAccepted: onAccepted,
clock: clock,
realTime: realTime,
rand: rand,
@ -344,6 +346,8 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
)
batch := txs[i:end]
var accepted []common.Hash
for j, err := range f.addTxs(batch) {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
@ -353,7 +357,8 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
}
// Track a few interesting failure types
switch {
case err == nil: // Noop, but need to handle to not count these
case err == nil:
accepted = append(accepted, batch[j].Hash())
case errors.Is(err, txpool.ErrAlreadyKnown):
duplicate++
@ -385,6 +390,10 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
underpricedMeter.Mark(underpriced)
otherRejectMeter.Mark(otherreject)
// Notify the tracker which txs from this peer were accepted.
if f.onAccepted != nil && len(accepted) > 0 {
f.onAccepted(peer, accepted)
}
// If 'other reject' is >25% of the deliveries in any batch, sleep a bit.
if otherreject > int64((len(batch)+3)/4) {
log.Debug("Peer delivering stale or invalid transactions", "peer", peer, "rejected", otherreject)

View file

@ -97,7 +97,7 @@ func newTestTxFetcher() *TxFetcher {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
nil, nil,
)
}
@ -2203,6 +2203,7 @@ func TestTransactionForgotten(t *testing.T) {
},
func(string, []common.Hash) error { return nil },
func(string) {},
nil,
mockClock,
mockTime,
rand.New(rand.NewSource(0)), // Use fixed seed for deterministic behavior

View file

@ -190,8 +190,8 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return nil
}
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer)
h.txTracker = txtracker.New()
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted)
return h, nil
}

View file

@ -69,8 +69,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
if err := handleTransactions(peer, txs, true); err != nil {
return fmt.Errorf("Transactions: %v", err)
}
h.enqueueAndTrack(peer.ID(), txs, false)
return nil
return h.txFetcher.Enqueue(peer.ID(), txs, false)
case *eth.PooledTransactionsPacket:
txs, err := packet.List.Items()
@ -80,39 +79,13 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
if err := handleTransactions(peer, txs, false); err != nil {
return fmt.Errorf("PooledTransactions: %v", err)
}
h.enqueueAndTrack(peer.ID(), txs, true)
return nil
return h.txFetcher.Enqueue(peer.ID(), txs, true)
default:
return fmt.Errorf("unexpected eth packet type: %T", packet)
}
}
// enqueueAndTrack sends transactions to the fetcher for pool submission and
// notifies the tracker for any that were accepted by the pool.
func (h *ethHandler) enqueueAndTrack(peer string, txs []*types.Transaction, direct bool) {
// Collect hashes before enqueue (Enqueue may reorder/filter the slice).
hashes := make([]common.Hash, len(txs))
for i, tx := range txs {
hashes[i] = tx.Hash()
}
// Enqueue submits to pool via addTxs callback. After return, check
// which txs the pool now knows about (accepted, not rejected).
h.txFetcher.Enqueue(peer, txs, direct)
// Credit the peer for txs the pool accepted. We check pool.Has
// because Enqueue doesn't return per-tx acceptance status.
var accepted []common.Hash
for _, hash := range hashes {
if h.txpool.Has(hash) {
accepted = append(accepted, hash)
}
}
if len(accepted) > 0 {
h.txTracker.NotifyAccepted(peer, accepted)
}
}
// handleTransactions marks all given transactions as known to the peer
// and performs basic validations.
func handleTransactions(peer *eth.Peer, list []*types.Transaction, directBroadcast bool) error {