Skip to content

Commit 4708950

Browse files
allformlessCopilots1na
authored
eth/filters: fix race in pending tx and new heads subscriptions (#33990) (#3686)
`TestSubscribePendingTxHashes` hangs indefinitely because pending tx events are permanently missed due to a race condition in `NewPendingTransactions` (and `NewHeads`). Both handlers called their event subscription functions (`SubscribePendingTxs`, `SubscribeNewHeads`) inside goroutines, so the RPC handler returned the subscription ID to the client before the filter was installed in the event loop. When the client then sent a transaction, the event fired but no filter existed to catch it — the event was silently lost. - Move `SubscribePendingTxs` and `SubscribeNewHeads` calls out of goroutines so filters are installed synchronously before the RPC response is sent, matching the pattern already used by `Logs` and `TransactionReceipts` <!-- START COPILOT CODING AGENT TIPS --> --- 💬 We'd love your input! Share your thoughts on Copilot coding agent in our [2 minute survey](https://gh.io/copilot-coding-agent-survey). --------- Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com> Co-authored-by: s1na <1591639+s1na@users.noreply.github.com>
1 parent 3020997 commit 4708950

1 file changed

Lines changed: 10 additions & 6 deletions

File tree

eth/filters/api.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,13 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
175175
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
176176
}
177177

178-
rpcSub := notifier.CreateSubscription()
178+
var (
179+
rpcSub = notifier.CreateSubscription()
180+
txs = make(chan []*types.Transaction, 128)
181+
pendingTxSub = api.events.SubscribePendingTxs(txs)
182+
)
179183

180184
gopool.Submit(func() {
181-
txs := make(chan []*types.Transaction, 128)
182-
pendingTxSub := api.events.SubscribePendingTxs(txs)
183185
defer pendingTxSub.Unsubscribe()
184186

185187
chainConfig := api.sys.backend.ChainConfig()
@@ -309,11 +311,13 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
309311
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
310312
}
311313

312-
rpcSub := notifier.CreateSubscription()
314+
var (
315+
rpcSub = notifier.CreateSubscription()
316+
headers = make(chan *types.Header)
317+
headersSub = api.events.SubscribeNewHeads(headers)
318+
)
313319

314320
gopool.Submit(func() {
315-
headers := make(chan *types.Header)
316-
headersSub := api.events.SubscribeNewHeads(headers)
317321
defer headersSub.Unsubscribe()
318322

319323
for {

0 commit comments

Comments
 (0)