eth/filters: fix race in pending tx and new heads subscriptions (#33990)

`TestSubscribePendingTxHashes` hangs indefinitely because pending tx
events are permanently missed due to a race condition in
`NewPendingTransactions` (and `NewHeads`). Both handlers called their
event subscription functions (`SubscribePendingTxs`,
`SubscribeNewHeads`) inside goroutines, so the RPC handler returned the
subscription ID to the client before the filter was installed in the
event loop. When the client then sent a transaction, the event fired but
no filter existed to catch it — the event was silently lost.

- Move `SubscribePendingTxs` and `SubscribeNewHeads` calls out of
goroutines so filters are installed synchronously before the RPC
response is sent, matching the pattern already used by `Logs` and
`TransactionReceipts`

<!-- START COPILOT CODING AGENT TIPS -->
---

💬 We'd love your input! Share your thoughts on Copilot coding agent in
our [2 minute survey](https://gh.io/copilot-coding-agent-survey).

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: s1na <1591639+s1na@users.noreply.github.com>
This commit is contained in:
Copilot 2026-03-12 10:21:45 +08:00 committed by GitHub
parent 7d13acd030
commit de0a452f7d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -187,11 +187,13 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
var (
rpcSub = notifier.CreateSubscription()
txs = make(chan []*types.Transaction, 128)
pendingTxSub = api.events.SubscribePendingTxs(txs)
)
go func() {
txs := make(chan []*types.Transaction, 128)
pendingTxSub := api.events.SubscribePendingTxs(txs)
defer pendingTxSub.Unsubscribe()
chainConfig := api.sys.backend.ChainConfig()
@ -260,11 +262,13 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
var (
rpcSub = notifier.CreateSubscription()
headers = make(chan *types.Header)
headersSub = api.events.SubscribeNewHeads(headers)
)
go func() {
headers := make(chan *types.Header)
headersSub := api.events.SubscribeNewHeads(headers)
defer headersSub.Unsubscribe()
for {