Skip to content

Activity poll can hang during shutdown when ShutdownWorker returns UNIMPLEMENTED #1297

@noqcks

Description

@noqcks

Summary

An sdk-core worker can hang during shutdown if:

  • namespace capabilities report worker_poll_complete_on_shutdown=true
  • an activity long poll is in flight
  • shutdown starts
  • ShutdownWorker returns UNIMPLEMENTED or UNAVAILABLE

Our stuck worker reports this metric, using Temporal Cloud:

temporal_request_failure{operation="ShutdownWorker",status_code="UNIMPLEMENTED"} 2

In that case, sdk-core appears to wait for the server-driven graceful poll shutdown path, but the server has already indicated that ShutdownWorker is unavailable. The pending activity poll never resolves to shutdown.

This surfaced through temporalio==1.27.2 in a Python Kubernetes worker: the Python worker had no active workflow/activity code left, but Worker.run() never returned after SIGTERM because the activity worker task was still awaiting the Rust bridge call to poll_activity_task().

I have a potential PR open here: #1298 (note, I am not that familiar with rust)

Why I think this is sdk-core

Live process introspection of a stuck Python worker showed:

  • Python main task waiting in temporalio.worker._worker.Worker._run() at await asyncio.wait(tasks.values())
  • child _ActivityWorker.run() still pending
  • _ActivityWorker.run() pending at await self._bridge_worker().poll_activity_task()
  • no Python-level running activities or workflows
  • workflow worker task had already failed/shut down
  • metrics included temporal_request_failure{operation="ShutdownWorker",status_code="UNIMPLEMENTED"}

So the immediate blocker was: sdk-core did not return activity poll shutdown to Python.

Deterministic repro

This test uses a mock server/client. It does not require Kubernetes, Python, workflows, or real application code.

#[tokio::test]
async fn graceful_activity_poll_shutdown_handles_unimplemented_shutdown_worker() {
    let activity_poll_started = Arc::new(Notify::new());
    let activity_poll_started_clone = activity_poll_started.clone();
    let shutdown_worker_called = Arc::new(Notify::new());
    let shutdown_worker_called_clone = shutdown_worker_called.clone();

    let mut mock_client = mock_manual_worker_client();
    mock_client
        .expect_describe_namespace()
        .times(1)
        .returning(|| {
            async {
                Ok(DescribeNamespaceResponse {
                    namespace_info: Some(NamespaceInfo {
                        capabilities: Some(Capabilities {
                            worker_poll_complete_on_shutdown: true,
                            ..Default::default()
                        }),
                        ..Default::default()
                    }),
                    ..Default::default()
                })
            }
            .boxed()
        });
    mock_client
        .expect_shutdown_worker()
        .times(1)
        .returning(move |_, _, _, _| {
            let shutdown_worker_called = shutdown_worker_called_clone.clone();
            async move {
                shutdown_worker_called.notify_one();
                Err(tonic::Status::unimplemented(
                    "ShutdownWorker disabled by server",
                ))
            }
            .boxed()
        });
    mock_client
        .expect_poll_activity_task()
        .times(1)
        .returning(move |_, _| {
            let activity_poll_started = activity_poll_started_clone.clone();
            async move {
                activity_poll_started.notify_one();
                future::pending::<Result<PollActivityTaskQueueResponse, tonic::Status>>().await
            }
            .boxed()
        });
    mock_client
        .expect_record_activity_heartbeat()
        .returning(|_, _| async { Ok(RecordActivityTaskHeartbeatResponse::default()) }.boxed());

    let mut cfg = test_worker_cfg()
        .activity_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize))
        .build()
        .unwrap();
    cfg.task_types = WorkerTaskTypes::activity_only();
    let worker = Worker::new_test(cfg, mock_client);
    worker.validate().await.unwrap();

    let poll_fut = async { worker.poll_activity_task().await };
    let shutdown_fut = async {
        activity_poll_started.notified().await;
        worker.initiate_shutdown();
        shutdown_worker_called.notified().await;
    };

    let (poll_result, _) = tokio::time::timeout(Duration::from_millis(500), async {
        tokio::join!(poll_fut, shutdown_fut)
    })
    .await
    .expect("activity poll remained pending after shutdown_worker returned UNIMPLEMENTED");

    assert_matches!(poll_result.unwrap_err(), PollError::ShutDown);
}

Required imports:

use std::{future, sync::Arc, time::Duration};
use temporalio_common::protos::temporal::api::{
    namespace::v1::{NamespaceInfo, namespace_info::Capabilities},
    workflowservice::v1::{
        DescribeNamespaceResponse, PollActivityTaskQueueResponse,
        RecordActivityTaskHeartbeatResponse,
    },
};
use temporalio_common::worker::WorkerTaskTypes;
use tokio::sync::Notify;

Actual result

The test fails deterministically:

thread 'core_tests::graceful_activity_poll_shutdown_handles_unimplemented_shutdown_worker' panicked:
activity poll remained pending after shutdown_worker returned UNIMPLEMENTED: Elapsed(())
test core_tests::graceful_activity_poll_shutdown_handles_unimplemented_shutdown_worker ... FAILED

Expected result

If ShutdownWorker returns UNIMPLEMENTED, sdk-core should not wait forever for server-driven graceful poll completion. It should fall back to locally interrupting the in-flight poll, or otherwise ensure poll_activity_task() resolves with PollError::ShutDown.

Notes

I saw PR #1224, which moved/scheduled ShutdownWorker earlier so the server can complete in-flight polls. This issue is about the fallback case where ShutdownWorker is called but returns UNIMPLEMENTED.

From the observed behavior, UNIMPLEMENTED is currently treated as acceptable/best-effort for the RPC itself, but the graceful poll shutdown path still appears to depend on that RPC succeeding.

Code Locations

Please review these code locations while reviewing this issue.

  • crates/sdk-core/src/worker/mod.rs:516
  • crates/sdk-core/src/worker/mod.rs:658
  • crates/sdk-core/src/worker/mod.rs:698
  • crates/sdk-core/src/worker/workflow/wft_poller.rs:49
  • crates/sdk-core/src/worker/mod.rs:1378
  • crates/sdk-core/src/worker/mod.rs:1442
  • crates/sdk-core/src/worker/mod.rs:967
  • crates/sdk-core/src/pollers/mod.rs:152
  • crates/sdk-core/src/pollers/poll_buffer.rs:389
  • crates/sdk-core/src/pollers/poll_buffer.rs:460

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions