From 47c603388a01341d36616e7ffe21170bb03dd7ec Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Fri, 10 Apr 2026 11:18:25 +0200 Subject: [PATCH] eth/fetcher: add onAccepted callback to fix attribution race MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit enqueueAndTrack used pool.Has() after Enqueue to determine accepted txs. Under concurrent delivery of the same tx from two peers, both could see Has()==true, making attribution non-deterministic. Add an onAccepted callback to the fetcher, called from Enqueue with (peer, acceptedHashes) immediately after pool.Add returns for each batch. Attribution happens atomically inside Enqueue using the per-tx error from addTxs (nil = accepted), before another goroutine can race. Remove the enqueueAndTrack helper from handler_eth.go — the fetcher now handles notification directly. --- eth/fetcher/tx_fetcher.go | 23 ++++++++++++++++------- eth/fetcher/tx_fetcher_test.go | 3 ++- eth/handler.go | 2 +- eth/handler_eth.go | 31 ++----------------------------- 4 files changed, 21 insertions(+), 38 deletions(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 5817dfbcf5..6e916747da 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -181,9 +181,10 @@ type TxFetcher struct { // Callbacks validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool - addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool - fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer - dropPeer func(string) // Drops a peer in case of announcement violation + addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool + fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer + dropPeer func(string) // Drops a peer in case of announcement violation + onAccepted func(peer string, hashes []common.Hash) // Optional: notified with accepted tx hashes per peer step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Monotonic clock or simulated clock for tests @@ -194,15 +195,15 @@ type TxFetcher struct { // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. // Chain can be nil to disable on-chain checks. -func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher { - return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil) +func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash)) *TxFetcher { + return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, mclock.System{}, time.Now, nil) } // NewTxFetcherForTests is a testing method to mock out the realtime clock with // a simulated version and the internal randomness with a deterministic one. // Chain can be nil to disable on-chain checks. func NewTxFetcherForTests( - chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), + chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher { return &TxFetcher{ notify: make(chan *txAnnounce), @@ -224,6 +225,7 @@ func NewTxFetcherForTests( addTxs: addTxs, fetchTxs: fetchTxs, dropPeer: dropPeer, + onAccepted: onAccepted, clock: clock, realTime: realTime, rand: rand, @@ -344,6 +346,8 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) ) batch := txs[i:end] + var accepted []common.Hash + for j, err := range f.addTxs(batch) { // Track the transaction hash if the price is too low for us. // Avoid re-request this transaction when we receive another @@ -353,7 +357,8 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) } // Track a few interesting failure types switch { - case err == nil: // Noop, but need to handle to not count these + case err == nil: + accepted = append(accepted, batch[j].Hash()) case errors.Is(err, txpool.ErrAlreadyKnown): duplicate++ @@ -385,6 +390,10 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) underpricedMeter.Mark(underpriced) otherRejectMeter.Mark(otherreject) + // Notify the tracker which txs from this peer were accepted. + if f.onAccepted != nil && len(accepted) > 0 { + f.onAccepted(peer, accepted) + } // If 'other reject' is >25% of the deliveries in any batch, sleep a bit. if otherreject > int64((len(batch)+3)/4) { log.Debug("Peer delivering stale or invalid transactions", "peer", peer, "rejected", otherreject) diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 6c2719631e..3fe11fda21 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -97,7 +97,7 @@ func newTestTxFetcher() *TxFetcher { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, - nil, + nil, nil, ) } @@ -2203,6 +2203,7 @@ func TestTransactionForgotten(t *testing.T) { }, func(string, []common.Hash) error { return nil }, func(string) {}, + nil, mockClock, mockTime, rand.New(rand.NewSource(0)), // Use fixed seed for deterministic behavior diff --git a/eth/handler.go b/eth/handler.go index 3b6dbec1a5..69469fddd0 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -190,8 +190,8 @@ func newHandler(config *handlerConfig) (*handler, error) { } return nil } - h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer) h.txTracker = txtracker.New() + h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted) return h, nil } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 82ecfa1fb6..8704a86af4 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -69,8 +69,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { if err := handleTransactions(peer, txs, true); err != nil { return fmt.Errorf("Transactions: %v", err) } - h.enqueueAndTrack(peer.ID(), txs, false) - return nil + return h.txFetcher.Enqueue(peer.ID(), txs, false) case *eth.PooledTransactionsPacket: txs, err := packet.List.Items() @@ -80,39 +79,13 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { if err := handleTransactions(peer, txs, false); err != nil { return fmt.Errorf("PooledTransactions: %v", err) } - h.enqueueAndTrack(peer.ID(), txs, true) - return nil + return h.txFetcher.Enqueue(peer.ID(), txs, true) default: return fmt.Errorf("unexpected eth packet type: %T", packet) } } -// enqueueAndTrack sends transactions to the fetcher for pool submission and -// notifies the tracker for any that were accepted by the pool. -func (h *ethHandler) enqueueAndTrack(peer string, txs []*types.Transaction, direct bool) { - // Collect hashes before enqueue (Enqueue may reorder/filter the slice). - hashes := make([]common.Hash, len(txs)) - for i, tx := range txs { - hashes[i] = tx.Hash() - } - // Enqueue submits to pool via addTxs callback. After return, check - // which txs the pool now knows about (accepted, not rejected). - h.txFetcher.Enqueue(peer, txs, direct) - - // Credit the peer for txs the pool accepted. We check pool.Has - // because Enqueue doesn't return per-tx acceptance status. - var accepted []common.Hash - for _, hash := range hashes { - if h.txpool.Has(hash) { - accepted = append(accepted, hash) - } - } - if len(accepted) > 0 { - h.txTracker.NotifyAccepted(peer, accepted) - } -} - // handleTransactions marks all given transactions as known to the peer // and performs basic validations. func handleTransactions(peer *eth.Peer, list []*types.Transaction, directBroadcast bool) error {