Skip to content

Commit 5bdf128

Browse files
committed
Make test match prod behavior
1 parent 48ebedf commit 5bdf128

2 files changed

Lines changed: 24 additions & 15 deletions

File tree

internal/internal_worker_base.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -579,10 +579,9 @@ func (bw *baseWorker) runTaskDispatcher() {
579579
// is cancelled, so Wait returns immediately — we still process the
580580
// task rather than dropping it.
581581
if _, isPolledTask := task.(*polledTask); isPolledTask {
582-
if err := bw.taskLimiter.Wait(bw.limiterContext); err != nil {
583-
// Context cancelled during shutdown — skip rate limiting
584-
// but still process remaining tasks.
585-
}
582+
// Ignore error: during shutdown the limiter context is
583+
// cancelled, but we still process remaining tasks.
584+
_ = bw.taskLimiter.Wait(bw.limiterContext)
586585
}
587586
bw.processTaskAsync(task)
588587
}

internal/internal_worker_base_test.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,8 @@ func TestTaskNotDroppedDuringShutdown(t *testing.T) {
322322
taskProcessed := make(chan struct{})
323323
pollStarted := make(chan struct{})
324324

325-
// A poller that blocks until returnTask is closed, then returns a task.
325+
// A poller that blocks until returnTask is closed, then returns a task
326+
// exactly once. Subsequent polls return nil so the poller can exit.
326327
tp := &shutdownTaskPoller{
327328
pollStarted: pollStarted,
328329
returnTask: make(chan struct{}),
@@ -348,26 +349,35 @@ func TestTaskNotDroppedDuringShutdown(t *testing.T) {
348349

349350
bw.Start()
350351

351-
// Wait for the poller to start polling
352+
// Wait for the poller to be actively polling.
352353
<-pollStarted
353354

354-
// Signal the poller to return a task, then stop the worker.
355-
// The task should be processed, not dropped.
356-
bw.noRepoll.Store(true)
357-
close(bw.stopCh)
355+
// Release the poller so it returns a task, then stop the worker.
356+
// The poller returns a task and then nil on subsequent polls,
357+
// allowing it to exit via noRepoll/stopCh during Stop().
358358
close(tp.returnTask)
359-
bw.limiterContextCancel()
359+
360+
// Stop exercises the real shutdown path: noRepoll, close(stopCh),
361+
// limiterContextCancel, and awaitWaitGroup.
362+
stopDone := make(chan struct{})
363+
go func() {
364+
bw.Stop()
365+
close(stopDone)
366+
}()
360367

361368
select {
362369
case <-taskProcessed:
363-
// Success: the task was dispatched and processed
370+
// Success: the task was dispatched and processed during shutdown
364371
case <-time.After(5 * time.Second):
365372
t.Fatal("task polled during shutdown was not processed (dropped)")
366373
}
367374

368-
// Wait for full cleanup. We already closed stopCh manually, so
369-
// replicate the remaining Stop() logic.
370-
awaitWaitGroup(&bw.stopWG, bw.options.stopTimeout)
375+
select {
376+
case <-stopDone:
377+
// Stop completed cleanly
378+
case <-time.After(5 * time.Second):
379+
t.Fatal("Stop() did not return in time")
380+
}
371381
}
372382

373383
// shutdownTaskPoller blocks until returnTask is closed, then returns a task

0 commit comments

Comments
 (0)