Skip to content

Drain polled tasks on shutdown#2261

Open
yuandrew wants to merge 11 commits intotemporalio:mainfrom
yuandrew:shutdown-refactor
Open

Drain polled tasks on shutdown#2261
yuandrew wants to merge 11 commits intotemporalio:mainfrom
yuandrew:shutdown-refactor

Conversation

@yuandrew
Copy link
Copy Markdown
Contributor

@yuandrew yuandrew commented Mar 31, 2026

What was changed

Refactored worker shutdown to use a two-stage approach: pollers shut down first, then the task dispatcher drains any remaining tasks before exiting. This ensures tasks polled during shutdown are processed rather than silently dropped.

Key changes:

  • Added pollerWG to baseWorker to track poller goroutines separately from the global stopWG
  • A closer goroutine waits for all pollers to finish, then closes taskQueueCh
  • runTaskDispatcher now ranges over taskQueueCh
  • pollTask always sends to taskQueueCh
  • Removed the 5s timeout hack in doPoll from Fix test flakes #2253 (no longer needed)

Why?

PR #2199 changed shutdown to let the server complete in-flight polls, instead of cancelling them. This exposed a pre-existing race when a poller receives a task during shutdown, Go would silently dropping the task. The dispatcher had the same issue — it could exit on stopCh before reading pending tasks from the channel.

This aligns the Go SDK's shutdown with how Core SDK handles it:

  1. Set a flag so pollers stop polling after their current attempt
  2. Close channels from pollers → task processing
  3. Wait for all in-flight tasks to complete & channels to be empty

Checklist

  1. Closes Drain polled tasks on shutdown #1197

  2. How was this tested:

    • New unit test TestTaskNotDroppedDuringShutdown — verifies a task polled during shutdown is processed, not dropped
    • Existing TestDoPollGracefulShutdown — validates both graceful and legacy poll completion
  3. Any docs updates needed?
    No


Note

Medium Risk
Changes worker shutdown ordering and channel lifecycle, which is concurrency-sensitive and could impact task processing/Stop() behavior under load or broken connections.

Overview
Refactors worker shutdown into a two-stage drain: pollers are tracked via a new pollerWG, and once all pollers exit the worker closes taskQueueCh so the dispatcher can range and drain remaining tasks before exiting.

Adjusts polling/shutdown semantics to avoid dropped tasks: pollTask now always sends polled tasks to taskQueueCh (no stop-select drop), runTaskDispatcher processes queued polled tasks even after limiter cancellation, and basePoller.doPoll removes the prior 5s shutdown timer hack and simply waits for the poll to complete when workerPollCompleteOnShutdown is enabled.

Adds TestTaskNotDroppedDuringShutdown to validate a task returned during shutdown is still dispatched/processed, and tweaks an integration-test sleep comment/timing guidance.

Reviewed by Cursor Bugbot for commit 57fdffe. Bugbot is set up for automated code reviews on this repo. Configure here.

@yuandrew yuandrew changed the title Allow for task finishing on shutdown Drain polled tasks on shutdown Mar 31, 2026
@yuandrew yuandrew marked this pull request as ready for review March 31, 2026 23:36
@yuandrew yuandrew requested a review from a team as a code owner March 31, 2026 23:36
select {
case bw.taskQueueCh <- &polledTask{task: task, permit: slotPermit}:
didSendTask = true
case <-bw.stopCh:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking the stop channel is still used elsewhere since we removed it in two spots

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep! still used in plenty of other places

@yuandrew
Copy link
Copy Markdown
Contributor Author

yuandrew commented Apr 1, 2026

recheck

@yuandrew yuandrew force-pushed the shutdown-refactor branch from b930672 to 29b355a Compare April 1, 2026 23:28
bw.limiterContextCancel()

// Wait for pollers to finish. (pollTaskServiceTimeOut) bounds this if the connection is broken.
bw.pollerWG.Wait()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded pollerWG.Wait() bypasses user-configured stopTimeout

Medium Severity

bw.pollerWG.Wait() in Stop() blocks without any timeout, and runs before awaitWaitGroup(&bw.stopWG, bw.options.stopTimeout). Combined with doPoll now waiting unconditionally on <-doneC (bounded only by pollTaskServiceTimeOut = 70s), Stop() can block for up to 70 seconds before the user's stopTimeout even begins counting. Previously, a 5-second fallback cancellation bounded this. In failure scenarios (broken gRPC connection, unresponsive server), total Stop() duration becomes ~70s + stopTimeout instead of just stopTimeout.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

Comment thread internal/internal_task_pollers.go Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Drain polled tasks on shutdown

2 participants