diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 912fc80ee..6ae0153e8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -73,7 +73,8 @@ jobs: go-version: ${{ matrix.go-version }} - name: Integration tests (without cache) - run: go run . integration-test -dev-server + # TODO: temporary -run filter to test 1s sleep fix for TestMultipleUpdateOrdering flake + run: go run . integration-test -dev-server -run "TestIntegrationSuite/TestMultipleUpdateOrdering" working-directory: ./internal/cmd/build env: WORKFLOW_CACHE_SIZE: "0" @@ -106,7 +107,8 @@ jobs: go-version: ${{ matrix.go-version }} - name: Integration tests (with cache) - run: go run . integration-test -dev-server + # TODO: temporary -run filter to test 1s sleep fix for TestMultipleUpdateOrdering flake + run: go run . integration-test -dev-server -run "TestIntegrationSuite/TestMultipleUpdateOrdering" working-directory: ./internal/cmd/build docker-compose-test: diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index bc148edf9..97d04b1ca 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -310,24 +310,13 @@ func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (taskForWorker, }() if bp.workerPollCompleteOnShutdown != nil && bp.workerPollCompleteOnShutdown.Load() { - // Don't kill the gRPC stream. After ShutdownWorker, the server returns empty responses. - select { - case <-doneC: - return result, err - case <-bp.stopC: - // TEMP FIX: Give the server a reasonable window to complete the poll after - // ShutdownWorker. Fall back to cancelling the poll if it takes too - // long, e.g. when the gRPC connection was closed before Stop(). - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() - select { - case <-doneC: - case <-timer.C: - cancel() - <-doneC - } - return result, err - } + // Don't cancel the gRPC stream. After ShutdownWorker, the server + // completes the poll with an empty response. The poll is bounded by + // the gRPC timeout (pollTaskServiceTimeOut). The worker's stop + // timeout (WorkerStopTimeout) controls how long Stop() blocks; + // goroutines clean up in the background within the gRPC deadline. + <-doneC + return result, err } // Legacy: cancel in-flight polls immediately on shutdown diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 985ffaf6b..2aae4d04f 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1484,6 +1484,10 @@ func (aw *AggregatedWorker) Stop() { close(aw.stopC) + // TODO: temporary sleep to test if the CancelOutstandingWorkerPolls race + // is caused by polls still being registered when ShutdownWorker runs. + time.Sleep(1 * time.Second) + aw.shutdownWorker() // Issue stop through plugins diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index fc3ae45dd..f856380e1 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -216,6 +216,7 @@ type ( lastPollTaskErrLock sync.Mutex noRepoll atomic.Bool + pollerWG sync.WaitGroup } eagerOrPolledTask interface { @@ -391,6 +392,7 @@ func (bw *baseWorker) Start() { for i := 0; i < taskWorker.pollerCount; i++ { bw.stopWG.Add(1) + bw.pollerWG.Add(1) go bw.runPoller(taskWorker) } @@ -403,6 +405,15 @@ func (bw *baseWorker) Start() { } } + // When all pollers have exited, close taskQueueCh so the dispatcher + // knows no more polled tasks will arrive and can drain what remains. + bw.stopWG.Add(1) + go func() { + defer bw.stopWG.Done() + bw.pollerWG.Wait() + close(bw.taskQueueCh) + }() + bw.stopWG.Add(1) go bw.runTaskDispatcher() @@ -428,6 +439,10 @@ func (bw *baseWorker) isStop() bool { func (bw *baseWorker) runPoller(taskWorker scalableTaskPoller) { defer bw.stopWG.Done() + defer func() { + bw.logger.Info("Poller exiting", "pollerType", taskWorker.taskPollerType) + bw.pollerWG.Done() + }() // Note: With poller autoscaling, this metric doesn't make a lot of sense since the number of pollers can go up and down. bw.metricsHandler.Counter(metrics.PollerStartCounter).Inc(1) @@ -561,24 +576,17 @@ func (bw *baseWorker) processTaskAsync(eagerOrPolled eagerOrPolledTask) { func (bw *baseWorker) runTaskDispatcher() { defer bw.stopWG.Done() - for { - // wait for new task or worker stop - select { - case <-bw.stopCh: - // Currently we can drop any tasks received when closing. - // https://github.com/temporalio/sdk-go/issues/1197 - return - case task := <-bw.taskQueueCh: - // for non-polled-task (local activity result as task or eager task), we don't need to rate limit - _, isPolledTask := task.(*polledTask) - if isPolledTask && bw.taskLimiter.Wait(bw.limiterContext) != nil { - if bw.isStop() { - bw.releaseSlot(task.getPermit(), SlotReleaseReasonUnused) - return - } - } - bw.processTaskAsync(task) + for task := range bw.taskQueueCh { + // For non-polled-task (local activity result as task or eager task), + // we don't need to rate limit. During shutdown the limiter context + // is cancelled, so Wait returns immediately — we still process the + // task rather than dropping it. + if _, isPolledTask := task.(*polledTask); isPolledTask { + // Ignore error: during shutdown the limiter context is + // cancelled, but we still process remaining tasks. + _ = bw.taskLimiter.Wait(bw.limiterContext) } + bw.processTaskAsync(task) } } @@ -639,11 +647,10 @@ func (bw *baseWorker) pollTask(taskWorker scalableTaskPoller, slotPermit *SlotPe taskWorker.pollerAutoscalerReportHandle.handleTask(task) } - select { - case bw.taskQueueCh <- &polledTask{task: task, permit: slotPermit}: - didSendTask = true - case <-bw.stopCh: - } + // The dispatcher is guaranteed to be alive: it only exits after + // taskQueueCh is closed, which happens after all pollers finish. + bw.taskQueueCh <- &polledTask{task: task, permit: slotPermit} + didSendTask = true } } diff --git a/internal/internal_worker_base_test.go b/internal/internal_worker_base_test.go index 81918a33a..f52319a55 100644 --- a/internal/internal_worker_base_test.go +++ b/internal/internal_worker_base_test.go @@ -315,6 +315,94 @@ type noopTaskProcessor struct{} func (noopTaskProcessor) ProcessTask(any) error { return nil } +// TestTaskNotDroppedDuringShutdown verifies the two-stage shutdown: when a +// poller receives a task during shutdown, the task is still dispatched and +// processed rather than silently dropped. +func TestTaskNotDroppedDuringShutdown(t *testing.T) { + taskProcessed := make(chan struct{}) + pollStarted := make(chan struct{}) + + // A poller that blocks until returnTask is closed, then returns a task. + tp := &shutdownTaskPoller{ + pollStarted: pollStarted, + returnTask: make(chan struct{}), + task: &testTask{}, + } + + processor := &recordingTaskProcessor{ + processed: taskProcessed, + } + + bw := newBaseWorker(baseWorkerOptions{ + slotSupplier: &testSlotSupplier{}, + maxTaskPerSecond: 1000, + taskPollers: []scalableTaskPoller{ + {taskPollerType: "test", pollerCount: 1, taskPoller: tp}, + }, + taskProcessor: processor, + workerType: "ShutdownTest", + logger: ilog.NewNopLogger(), + stopTimeout: 5 * time.Second, + metricsHandler: metrics.NopHandler, + }) + + bw.Start() + + // Wait for the poller to start polling + <-pollStarted + + // Signal the poller to return a task, then stop the worker. + // The task should be processed, not dropped. + bw.noRepoll.Store(true) + close(bw.stopCh) + close(tp.returnTask) + bw.limiterContextCancel() + + select { + case <-taskProcessed: + // Success: the task was dispatched and processed + case <-time.After(5 * time.Second): + t.Fatal("task polled during shutdown was not processed (dropped)") + } + + // Wait for full cleanup. We already closed stopCh manually, so + // replicate the remaining Stop() logic. + awaitWaitGroup(&bw.stopWG, bw.options.stopTimeout) +} + +// shutdownTaskPoller blocks until returnTask is closed, then returns a task +// exactly once. Subsequent polls return nil. +type shutdownTaskPoller struct { + pollStarted chan struct{} + returnTask chan struct{} + task taskForWorker + returned atomic.Bool +} + +func (p *shutdownTaskPoller) PollTask() (taskForWorker, error) { + select { + case p.pollStarted <- struct{}{}: + default: + } + <-p.returnTask + if p.returned.CompareAndSwap(false, true) { + return p.task, nil + } + return nil, nil +} + +type recordingTaskProcessor struct { + processed chan struct{} +} + +func (p *recordingTaskProcessor) ProcessTask(any) error { + select { + case p.processed <- struct{}{}: + default: + } + return nil +} + func (s *PollScalerReportHandleSuite) TestAutoscaleDownOnTimeoutWithCapability() { targetSuggestion := 0 ps := newPollScalerReportHandle(pollScalerReportHandleOptions{