Skip to content

Commit 6dad670

Browse files
DavidS-ovmcursoragent
authored andcommitted
[ENG-3715] Fix ExecuteQuery responses channel close race (#4684)
<!-- CURSOR_AGENT_PR_BODY_BEGIN --> ## Summary - Fixes a race where `ExecuteQuery` could return (especially after the post-cancel stuck wait) and `close(responses)` ran while pool workers still sent, causing send-on-closed-channel panics and noisy Source WaitGroups stuck logs in prod. - Workers now send to an internal proxy channel; a forwarder is the only goroutine that writes to `responses` and closes it. When `wg.Wait` completes, the proxy is closed and the forwarder drains then closes `responses`. If work never finishes, `executeQuerySafetyTimeout` (default 10 minutes, overridable in tests) closes `responses` and drains the proxy so readers unblock and workers can still finish and call `wg.Done()`. - `executeQueryLongRunningAdaptersTimeout` (default 2 minutes) remains the stuck-adapter log window after cancel; both timeouts are package-level variables for short test runs. - Tests: `TestExecuteQuery_CancelledContextDoesNotPanicOnChannelClose` (embedded NATS), `TestExecuteQuery_SafetyTimeoutClosesResponsesWithoutPanic`. - Follow-ups from review: `time.NewTimer` in the forwarder instead of `time.After` for the safety window (avoids leaking a 10-minute timer on every query); the safety path no longer closes `responses` while workers send on the same channel. ## Ticket ENG-3715 — Fix WaitGroup send-on-closed-channel panic in ExecuteQuery ## Changes - `go/discovery/enginerequests.go` — proxy channel, forwarder goroutine, dual timeouts, explicit close on early-return paths. - `go/discovery/enginerequests_test.go` — regression tests listed above. ## Approved plan - Plan approver: Elliot - Approval ticket: ENG-3716 Deviation analysis and reviewer assignment are handled automatically by the pre-approved PR review automation (see docs/PREAPPROVED_CHANGES.md). <!-- CURSOR_AGENT_PR_BODY_END --> <div><a href="https://cursor.com/agents/bc-1ee118b8-43b0-4b5c-8931-0904b36917b3"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cursor.com/assets/images/open-in-web-dark.png"><source media="(prefers-color-scheme: light)" srcset="https://cursor.com/assets/images/open-in-web-light.png"><img alt="Open in Web" width="114" height="28" src="https://cursor.com/assets/images/open-in-web-dark.png"></picture></a>&nbsp;<a href="https://cursor.com/background-agent?bcId=bc-1ee118b8-43b0-4b5c-8931-0904b36917b3"><picture><source media="(prefers-color-scheme: dark)" srcset="https://cursor.com/assets/images/open-in-cursor-dark.png"><source media="(prefers-color-scheme: light)" srcset="https://cursor.com/assets/images/open-in-cursor-light.png"><img alt="Open in Cursor" width="131" height="28" src="https://cursor.com/assets/images/open-in-cursor-dark.png"></picture></a>&nbsp;</div> --------- Co-authored-by: Cursor Agent <cursoragent@cursor.com> GitOrigin-RevId: e88b0a97f3a0929d71d600e9fa8972befaa6b634
1 parent d0b385d commit 6dad670

2 files changed

Lines changed: 259 additions & 11 deletions

File tree

go/discovery/enginerequests.go

Lines changed: 65 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,18 @@ func captureGoroutineSummary(maxBytes int) string {
204204
var (
205205
listExecutionPoolCount atomic.Int32
206206
getExecutionPoolCount atomic.Int32
207+
208+
// executeQueryLongRunningAdaptersTimeout is how long ExecuteQuery waits after
209+
// ctx cancellation before giving up on the per-query WaitGroup. It is a
210+
// package-level variable so tests can set a shorter duration without
211+
// waiting two minutes. Production uses the default.
212+
executeQueryLongRunningAdaptersTimeout = 2 * time.Minute
213+
214+
// executeQuerySafetyTimeout is the absolute upper bound on how long
215+
// ExecuteQuery waits for all workers before closing the responses
216+
// channel. It is a package-level variable so tests can override it
217+
// without waiting 10 minutes.
218+
executeQuerySafetyTimeout = 10 * time.Minute
207219
)
208220

209221
// ExecuteQuery Executes a single Query and returns the results without any
@@ -216,12 +228,14 @@ var (
216228
func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, responses chan<- *sdp.QueryResponse) error {
217229
span := trace.SpanFromContext(ctx)
218230

219-
// Make sure we close channels once we're done
220-
if responses != nil {
221-
defer close(responses)
222-
}
231+
// responses is closed after all pool workers finish (see waitGroupDone below),
232+
// not when this function returns. Deferring close here races with workers that
233+
// are still running after the stuck-timeout path returns.
223234

224235
if ctx.Err() != nil {
236+
if responses != nil {
237+
close(responses)
238+
}
225239
return ctx.Err()
226240
}
227241

@@ -232,11 +246,14 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, responses c
232246
)
233247

234248
if len(expanded) == 0 {
235-
responses <- sdp.NewQueryResponseFromError(&sdp.QueryError{
236-
ErrorType: sdp.QueryError_NOSCOPE,
237-
ErrorString: "no matching adapters found",
238-
Scope: query.GetScope(),
239-
})
249+
if responses != nil {
250+
responses <- sdp.NewQueryResponseFromError(&sdp.QueryError{
251+
ErrorType: sdp.QueryError_NOSCOPE,
252+
ErrorString: "no matching adapters found",
253+
Scope: query.GetScope(),
254+
})
255+
close(responses)
256+
}
240257

241258
return errors.New("no matching adapters found")
242259
}
@@ -247,6 +264,40 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, responses c
247264
expandedMutex := sync.RWMutex{}
248265
totalQueries := len(expanded)
249266
var poolWaitMaxNs atomic.Int64
267+
268+
// Workers send to workerCh (never closed by safety timeout). A forwarder
269+
// goroutine copies workerCh → responses, and is the sole closer of
270+
// responses. This eliminates send-on-closed-channel races: when the
271+
// safety timeout fires we close responses (unblocking the reader) and
272+
// then drain workerCh so late-finishing workers can still call wg.Done()
273+
// instead of blocking on a full/unread channel.
274+
var workerCh chan<- *sdp.QueryResponse
275+
if responses != nil {
276+
proxy := make(chan *sdp.QueryResponse, cap(responses))
277+
workerCh = proxy
278+
279+
go func() {
280+
timer := time.NewTimer(executeQuerySafetyTimeout)
281+
defer timer.Stop()
282+
283+
for {
284+
select {
285+
case r, ok := <-proxy:
286+
if !ok {
287+
close(responses)
288+
return
289+
}
290+
responses <- r
291+
case <-timer.C:
292+
close(responses)
293+
for range proxy {
294+
}
295+
return
296+
}
297+
}
298+
}()
299+
}
300+
250301
expandedMutex.RLock()
251302
for q, adapter := range expanded {
252303
wg.Add(1)
@@ -311,7 +362,7 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, responses c
311362
}
312363

313364
// Execute the query against the adapter
314-
e.Execute(ctx, localQ, localAdapter, responses)
365+
e.Execute(ctx, localQ, localAdapter, workerCh)
315366
})
316367
}()
317368
}
@@ -320,6 +371,9 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, responses c
320371
waitGroupDone := make(chan struct{})
321372
go func() {
322373
wg.Wait()
374+
if workerCh != nil {
375+
close(workerCh)
376+
}
323377
close(waitGroupDone)
324378
}()
325379

@@ -332,7 +386,7 @@ func (e *Engine) ExecuteQuery(ctx context.Context, query *sdp.Query, responses c
332386
// quickly now. We will check this though to make sure. This will wait
333387
// until we reach Change Analysis SLO violation territory. If this is
334388
// too quick, we are only spamming logs for nothing.
335-
longRunningAdaptersTimeout := 2 * time.Minute
389+
longRunningAdaptersTimeout := executeQueryLongRunningAdaptersTimeout
336390

337391
// Wait for the wait group, but ping the logs if it's taking
338392
// too long

go/discovery/enginerequests_test.go

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package discovery
22

33
import (
44
"context"
5+
"errors"
56
"reflect"
7+
"sync"
68
"testing"
79
"time"
810

@@ -41,6 +43,198 @@ func (e *Engine) executeQuerySync(ctx context.Context, q *sdp.Query) ([]*sdp.Ite
4143
return items, edges, errs, err
4244
}
4345

46+
// cancelBlockingGetAdapter blocks in Get until the query context is cancelled.
47+
// Used to exercise ExecuteQuery returning after the stuck-timeout path while
48+
// a worker may still send on responses (must not close the channel until
49+
// wg.Done).
50+
type cancelBlockingGetAdapter struct {
51+
ready sync.Once
52+
// started is closed the first time Get begins waiting on ctx.Done().
53+
started chan struct{}
54+
}
55+
56+
func newCancelBlockingGetAdapter() *cancelBlockingGetAdapter {
57+
return &cancelBlockingGetAdapter{
58+
started: make(chan struct{}),
59+
}
60+
}
61+
62+
func (a *cancelBlockingGetAdapter) Type() string {
63+
return "blockingcancel"
64+
}
65+
66+
func (a *cancelBlockingGetAdapter) Name() string {
67+
return "cancelBlockingGetAdapter"
68+
}
69+
70+
func (a *cancelBlockingGetAdapter) Scopes() []string {
71+
return []string{"test"}
72+
}
73+
74+
func (a *cancelBlockingGetAdapter) Metadata() *sdp.AdapterMetadata {
75+
return &sdp.AdapterMetadata{
76+
Type: a.Type(),
77+
DescriptiveName: "Blocking cancel test",
78+
}
79+
}
80+
81+
func (a *cancelBlockingGetAdapter) Get(ctx context.Context, scope, query string, _ bool) (*sdp.Item, error) {
82+
a.ready.Do(func() { close(a.started) })
83+
<-ctx.Done()
84+
return nil, ctx.Err()
85+
}
86+
87+
func TestExecuteQuery_CancelledContextDoesNotPanicOnChannelClose(t *testing.T) {
88+
natsURL := startEmbeddedNATSServer(t)
89+
90+
prev := executeQueryLongRunningAdaptersTimeout
91+
executeQueryLongRunningAdaptersTimeout = 50 * time.Millisecond
92+
t.Cleanup(func() { executeQueryLongRunningAdaptersTimeout = prev })
93+
94+
adapter := newCancelBlockingGetAdapter()
95+
e := newStartedEngine(t, "TestExecuteQueryCancelClose",
96+
&auth.NATSOptions{
97+
Servers: []string{natsURL},
98+
ConnectionName: "test-connection",
99+
ConnectionTimeout: time.Second,
100+
MaxReconnects: 5,
101+
},
102+
nil,
103+
adapter,
104+
)
105+
106+
ctx, cancel := context.WithCancel(context.Background())
107+
u := uuid.New()
108+
q := &sdp.Query{
109+
UUID: u[:],
110+
Type: adapter.Type(),
111+
Method: sdp.QueryMethod_GET,
112+
Query: "q",
113+
Scope: "test",
114+
Deadline: timestamppb.New(time.Now().Add(10 * time.Minute)),
115+
RecursionBehaviour: &sdp.Query_RecursionBehaviour{
116+
LinkDepth: 0,
117+
},
118+
}
119+
120+
responses := make(chan *sdp.QueryResponse, 10)
121+
errCh := make(chan error, 1)
122+
go func() {
123+
errCh <- e.ExecuteQuery(ctx, q, responses)
124+
}()
125+
126+
<-adapter.started
127+
cancel()
128+
129+
err := <-errCh
130+
if !errors.Is(err, context.Canceled) {
131+
t.Fatalf("ExecuteQuery() err = %v, want %v", err, context.Canceled)
132+
}
133+
134+
for range responses {
135+
}
136+
}
137+
138+
// foreverBlockingGetAdapter ignores context cancellation and blocks in Get
139+
// until an external signal. Used to exercise the safety timeout path.
140+
type foreverBlockingGetAdapter struct {
141+
ready sync.Once
142+
// started is closed when Get begins blocking.
143+
started chan struct{}
144+
// release is closed by the test to let Get return.
145+
release chan struct{}
146+
}
147+
148+
func newForeverBlockingGetAdapter() *foreverBlockingGetAdapter {
149+
return &foreverBlockingGetAdapter{
150+
started: make(chan struct{}),
151+
release: make(chan struct{}),
152+
}
153+
}
154+
155+
func (a *foreverBlockingGetAdapter) Type() string { return "foreverblocking" }
156+
func (a *foreverBlockingGetAdapter) Name() string { return "foreverBlockingGetAdapter" }
157+
func (a *foreverBlockingGetAdapter) Scopes() []string { return []string{"test"} }
158+
func (a *foreverBlockingGetAdapter) Metadata() *sdp.AdapterMetadata {
159+
return &sdp.AdapterMetadata{
160+
Type: a.Type(),
161+
DescriptiveName: "Forever blocking test",
162+
}
163+
}
164+
165+
func (a *foreverBlockingGetAdapter) Get(_ context.Context, _, _ string, _ bool) (*sdp.Item, error) {
166+
a.ready.Do(func() { close(a.started) })
167+
<-a.release
168+
return nil, errors.New("released")
169+
}
170+
171+
func TestExecuteQuery_SafetyTimeoutClosesResponsesWithoutPanic(t *testing.T) {
172+
natsURL := startEmbeddedNATSServer(t)
173+
174+
prevLong := executeQueryLongRunningAdaptersTimeout
175+
executeQueryLongRunningAdaptersTimeout = 10 * time.Millisecond
176+
prevSafety := executeQuerySafetyTimeout
177+
executeQuerySafetyTimeout = 100 * time.Millisecond
178+
t.Cleanup(func() {
179+
executeQueryLongRunningAdaptersTimeout = prevLong
180+
executeQuerySafetyTimeout = prevSafety
181+
})
182+
183+
adapter := newForeverBlockingGetAdapter()
184+
t.Cleanup(func() { close(adapter.release) })
185+
186+
e := newStartedEngine(t, "TestExecuteQuerySafetyTimeout",
187+
&auth.NATSOptions{
188+
Servers: []string{natsURL},
189+
ConnectionName: "test-connection",
190+
ConnectionTimeout: time.Second,
191+
MaxReconnects: 5,
192+
},
193+
nil,
194+
adapter,
195+
)
196+
197+
ctx, cancel := context.WithCancel(context.Background())
198+
defer cancel()
199+
200+
u := uuid.New()
201+
q := &sdp.Query{
202+
UUID: u[:],
203+
Type: adapter.Type(),
204+
Method: sdp.QueryMethod_GET,
205+
Query: "q",
206+
Scope: "test",
207+
Deadline: timestamppb.New(time.Now().Add(10 * time.Minute)),
208+
RecursionBehaviour: &sdp.Query_RecursionBehaviour{
209+
LinkDepth: 0,
210+
},
211+
}
212+
213+
responses := make(chan *sdp.QueryResponse, 10)
214+
errCh := make(chan error, 1)
215+
go func() {
216+
errCh <- e.ExecuteQuery(ctx, q, responses)
217+
}()
218+
219+
<-adapter.started
220+
cancel()
221+
222+
// Drain responses — the safety timeout should close the channel without
223+
// panicking, even though the worker is still blocked in Get.
224+
for range responses {
225+
}
226+
227+
// ExecuteQuery should have returned after the stuck-timeout path.
228+
select {
229+
case err := <-errCh:
230+
if !errors.Is(err, context.Canceled) {
231+
t.Fatalf("ExecuteQuery() err = %v, want %v", err, context.Canceled)
232+
}
233+
case <-time.After(5 * time.Second):
234+
t.Fatal("timed out waiting for ExecuteQuery to return")
235+
}
236+
}
237+
44238
func TestExecuteQuery(t *testing.T) {
45239
adapter := TestAdapter{
46240
ReturnType: "person",

0 commit comments

Comments
 (0)