Skip to content

Commit b505e47

Browse files
committed
fix(natsrouter,testutil): close Add+Wait race; tighten panic-backstop test fidelity; document best-effort cleanup
CodeRabbit review on PR #157 surfaced three items on the squashed branch: (MAJOR) pkg/natsrouter/router.go — Shutdown's wg.Wait() phase could race with Add(1) calls from still-draining subscriptions when the closeLoop broke early on ctx.Done(). Per sync.WaitGroup docs, Add concurrent with Wait is undefined when the counter is zero (panic risk). Fix: track allClosed; only enter the wg.Wait block when every subscription confirmed close. If ctx expired before all subs closed, surface the error and let the caller's deadline take precedence -- remaining handler goroutines continue in the background until process exit. Comment block rewritten to explain the invariant. (NIT) pkg/natsrouter/integration_test.go — TestIntegration_SpawnSite- PanicBackstop used WithMaxConcurrency(2). With cap=2, a leaked semaphore slot would still leave capacity for the follow-up "ok" request, masking a cleanup regression. Switched to cap=1 so a leaked slot blocks the second request and the test actually observes slot release. Added a comment explaining why cap=1 is load-bearing. (NIT) pkg/testutil/minio.go — replaced two `_ = container.Terminate(ctx)` calls with explicit "best-effort cleanup" comments per CLAUDE.md "never ignore errors silently — comment if intentionally discarded". The discarded errors are intentional (init already failed; Docker reaps the container on test-process exit either way), now documented.
1 parent 3126cd4 commit b505e47

3 files changed

Lines changed: 35 additions & 17 deletions

File tree

pkg/natsrouter/integration_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,13 @@ func TestIntegration_SpawnSitePanicBackstop(t *testing.T) {
281281
nc := setupNATS(t)
282282
// Note: NO Recovery middleware installed. We're testing the spawn-site
283283
// backstop, not the middleware path.
284-
r := natsrouter.New(nc, "integration-panic-backstop", natsrouter.WithMaxConcurrency(2))
284+
//
285+
// MaxConcurrency=1 is load-bearing: with cap=1, a leaked semaphore slot
286+
// would block every subsequent request. cap=2 (or higher) would let
287+
// the follow-up "ok" request acquire a slot even if cleanup were
288+
// broken, masking the regression. cap=1 forces the test to actually
289+
// observe slot release.
290+
r := natsrouter.New(nc, "integration-panic-backstop", natsrouter.WithMaxConcurrency(1))
285291

286292
natsrouter.Register(r, "boom.{id}",
287293
func(c *natsrouter.Context, req echoReq) (*echoResp, error) {

pkg/natsrouter/router.go

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -256,32 +256,40 @@ func (r *Router) Shutdown(ctx context.Context) error {
256256
}
257257

258258
// Wait for each subscription's dispatcher to finish. On ctx expiry,
259-
// record the error and stop waiting on remaining subscriptions — but
260-
// DO NOT return early. We must fall through to the WaitGroup wait
261-
// below so in-flight handler goroutines are not abandoned. The
262-
// WaitGroup wait itself also respects ctx and will short-circuit.
259+
// record the error and stop waiting. allClosed tracks whether every
260+
// subscription confirmed close; if not, we MUST NOT enter r.wg.Wait()
261+
// below -- a still-draining subscription can fire natsHandler, which
262+
// calls r.wg.Add(1) concurrently with our Wait(). Per sync.WaitGroup
263+
// docs that's an Add+Wait race (panic if counter was 0 when Wait
264+
// started). When ctx expires we instead surface the error and let the
265+
// caller's deadline take precedence: any remaining handler goroutines
266+
// continue in the background until process exit.
267+
allClosed := true
263268
closeLoop:
264269
for i, ch := range closed {
265270
select {
266271
case <-ch:
267272
case <-ctx.Done():
268273
errs = append(errs, fmt.Errorf("waiting for %q close: %w", subs[i].Subject, ctx.Err()))
274+
allClosed = false
269275
break closeLoop
270276
}
271277
}
272278

273-
// Subscriptions are drained: no new natsHandler callbacks will fire.
274-
// Wait for any in-flight handler goroutines that were already spawned
275-
// before drain completed. Use a channel so we can select on ctx.Done().
276-
wgDone := make(chan struct{})
277-
go func() {
278-
r.wg.Wait()
279-
close(wgDone)
280-
}()
281-
select {
282-
case <-wgDone:
283-
case <-ctx.Done():
284-
errs = append(errs, fmt.Errorf("waiting for in-flight handlers: %w", ctx.Err()))
279+
// Subscriptions are drained: no new natsHandler callbacks will fire,
280+
// so r.wg counter is now stable and Wait() is race-free. Skip this
281+
// block when allClosed is false (see comment above).
282+
if allClosed {
283+
wgDone := make(chan struct{})
284+
go func() {
285+
r.wg.Wait()
286+
close(wgDone)
287+
}()
288+
select {
289+
case <-wgDone:
290+
case <-ctx.Done():
291+
errs = append(errs, fmt.Errorf("waiting for in-flight handlers: %w", ctx.Err()))
292+
}
285293
}
286294

287295
return errors.Join(errs...)

pkg/testutil/minio.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ func ensureMinIOClient() (*minio.Client, error) {
3636
// already (no scheme). No TrimPrefix needed.
3737
endpoint, err := container.ConnectionString(ctx)
3838
if err != nil {
39+
// Best-effort: the primary error is what callers need to see;
40+
// a Terminate failure during init-failure cleanup is noise.
41+
// Docker will reap the container on test-process exit either way.
3942
_ = container.Terminate(ctx)
4043
minioInitErr = fmt.Errorf("get minio endpoint: %w", err)
4144
return
@@ -45,6 +48,7 @@ func ensureMinIOClient() (*minio.Client, error) {
4548
Secure: false,
4649
})
4750
if err != nil {
51+
// Best-effort cleanup; see comment above.
4852
_ = container.Terminate(ctx)
4953
minioInitErr = fmt.Errorf("connect minio: %w", err)
5054
return

0 commit comments

Comments
 (0)