Skip to content

Commit c4c8be1

Browse files
Sushisourcejmaeagle99
authored andcommitted
Shutdown rpc initiate shutdown (#1224)
1 parent 24703da commit c4c8be1

14 files changed

Lines changed: 590 additions & 175 deletions

File tree

.cargo/config.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[env]
22
# This temporarily overrides the version of the CLI used for integration tests, locally and in CI
3-
#CLI_VERSION_OVERRIDE = "v1.4.1-cloud-v1-29-0-139-2.0"
3+
CLI_VERSION_OVERRIDE = "v1.6.3-serverless"
44

55
[alias]
66
# Not sure why --all-features doesn't work

crates/sdk-core-c-bridge/src/worker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,7 @@ pub extern "C" fn temporal_core_worker_request_workflow_eviction(
930930
#[unsafe(no_mangle)]
931931
pub extern "C" fn temporal_core_worker_initiate_shutdown(worker: *mut Worker) {
932932
let worker = unsafe { &*worker };
933+
enter_sync!(worker.runtime);
933934
worker.worker.as_ref().unwrap().initiate_shutdown();
934935
}
935936

crates/sdk-core/src/core_tests/workers.rs

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,15 @@ use crate::{
1313
},
1414
};
1515
use futures_util::{stream, stream::StreamExt};
16+
use std::sync::{
17+
Arc,
18+
atomic::{AtomicBool, Ordering},
19+
};
1620
use std::{cell::RefCell, collections::HashMap, time::Duration};
21+
use temporalio_common::protos::temporal::api::{
22+
namespace::v1::{NamespaceInfo, namespace_info::Capabilities},
23+
workflowservice::v1::DescribeNamespaceResponse,
24+
};
1725
use temporalio_common::{
1826
protos::{
1927
canned_histories,
@@ -51,6 +59,7 @@ use temporalio_common::{
5159
},
5260
worker::WorkerTaskTypes,
5361
};
62+
use tokio::sync::Notify;
5463
use tokio::sync::{Barrier, watch};
5564
use uuid::Uuid;
5665

@@ -1209,3 +1218,106 @@ async fn nexus_start_operation_failure_converts_to_legacy_for_old_server(
12091218
worker.shutdown().await;
12101219
worker.finalize_shutdown().await;
12111220
}
1221+
1222+
/// Verifies that `initiate_shutdown` sends the `ShutdownWorker` RPC so that the server can
1223+
/// complete in-flight polls. Without this, graceful poll shutdown deadlocks: the SDK waits for
1224+
/// polls to drain, but the server was never told to flush them.
1225+
#[tokio::test]
1226+
async fn graceful_shutdown_sends_shutdown_worker_rpc_during_initiate() {
1227+
let shutdown_rpc_called = Arc::new(AtomicBool::new(false));
1228+
let shutdown_rpc_called_clone = shutdown_rpc_called.clone();
1229+
// When the shutdown_worker RPC fires, it signals polls to complete (simulating server
1230+
// behavior where ShutdownWorker causes the server to return empty poll responses).
1231+
let poll_releaser = Arc::new(Notify::new());
1232+
let poll_releaser_for_rpc = poll_releaser.clone();
1233+
1234+
let mut mock_client = MockWorkerClient::new();
1235+
mock_client
1236+
.expect_capabilities()
1237+
.returning(|| Some(*DEFAULT_TEST_CAPABILITIES));
1238+
mock_client
1239+
.expect_workers()
1240+
.returning(|| DEFAULT_WORKERS_REGISTRY.clone());
1241+
mock_client.expect_is_mock().returning(|| true);
1242+
mock_client
1243+
.expect_sdk_name_and_version()
1244+
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
1245+
mock_client
1246+
.expect_identity()
1247+
.returning(|| "test-identity".to_string());
1248+
mock_client
1249+
.expect_worker_grouping_key()
1250+
.returning(Uuid::new_v4);
1251+
mock_client
1252+
.expect_worker_instance_key()
1253+
.returning(Uuid::new_v4);
1254+
mock_client
1255+
.expect_set_heartbeat_client_fields()
1256+
.returning(|hb| {
1257+
hb.sdk_name = "test-core".to_string();
1258+
hb.sdk_version = "0.0.0".to_string();
1259+
hb.worker_identity = "test-identity".to_string();
1260+
hb.heartbeat_time = Some(std::time::SystemTime::now().into());
1261+
});
1262+
// Return the worker_poll_complete_on_shutdown capability so validate() enables graceful mode
1263+
mock_client.expect_describe_namespace().returning(move || {
1264+
Ok(DescribeNamespaceResponse {
1265+
namespace_info: Some(NamespaceInfo {
1266+
capabilities: Some(Capabilities {
1267+
worker_poll_complete_on_shutdown: true,
1268+
..Capabilities::default()
1269+
}),
1270+
..NamespaceInfo::default()
1271+
}),
1272+
..DescribeNamespaceResponse::default()
1273+
})
1274+
});
1275+
// When shutdown_worker RPC is called, mark it and release polls
1276+
mock_client
1277+
.expect_shutdown_worker()
1278+
.returning(move |_, _, _, _| {
1279+
shutdown_rpc_called_clone.store(true, Ordering::SeqCst);
1280+
poll_releaser_for_rpc.notify_waiters();
1281+
Ok(ShutdownWorkerResponse {})
1282+
});
1283+
mock_client
1284+
.expect_complete_workflow_task()
1285+
.returning(|_| Ok(RespondWorkflowTaskCompletedResponse::default()));
1286+
1287+
// Polls block until shutdown_worker RPC releases them (simulating server holding polls
1288+
// open until it receives the ShutdownWorker signal)
1289+
let poll_releaser_for_stream = poll_releaser.clone();
1290+
let stream = stream::unfold(poll_releaser_for_stream, |releaser| async move {
1291+
releaser.notified().await;
1292+
Some((
1293+
Ok(PollWorkflowTaskQueueResponse::default().try_into().unwrap()),
1294+
releaser,
1295+
))
1296+
});
1297+
1298+
let mw = MockWorkerInputs::new(stream.boxed());
1299+
let worker = mock_worker(MocksHolder::from_mock_worker(mock_client, mw));
1300+
1301+
// validate() reads describe_namespace and sets capabilities.graceful_poll_shutdown = true
1302+
worker.validate().await.unwrap();
1303+
1304+
let poll_fut = worker.poll_workflow_activation();
1305+
let shutdown_fut = async {
1306+
// initiate_shutdown must send the ShutdownWorker RPC, which releases the polls
1307+
worker.initiate_shutdown();
1308+
};
1309+
1310+
let (poll_result, _) = tokio::time::timeout(Duration::from_secs(5), async {
1311+
tokio::join!(poll_fut, shutdown_fut)
1312+
})
1313+
.await
1314+
.expect("Shutdown should complete within 5s -- if it hangs, the ShutdownWorker RPC was not sent during initiate_shutdown");
1315+
1316+
assert_matches!(poll_result.unwrap_err(), PollError::ShutDown);
1317+
assert!(
1318+
shutdown_rpc_called.load(Ordering::SeqCst),
1319+
"ShutdownWorker RPC must be called during initiate_shutdown"
1320+
);
1321+
1322+
worker.finalize_shutdown().await;
1323+
}

crates/sdk-core/src/lib.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ pub use temporalio_common::protos::TaskToken;
4242
pub use url::Url;
4343
pub use worker::{
4444
ActivitySlotKind, CompleteActivityError, CompleteNexusError, CompleteWfError,
45-
FixedSizeSlotSupplier, LocalActivitySlotKind, NexusSlotKind, PollError, PollerBehavior,
46-
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceBasedTuner,
47-
ResourceSlotOptions, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext,
48-
SlotReleaseContext, SlotReservationContext, SlotSupplier, SlotSupplierOptions,
49-
SlotSupplierPermit, TunerBuilder, TunerHolder, TunerHolderOptions, TunerHolderOptionsBuilder,
50-
Worker, WorkerConfig, WorkerConfigBuilder, WorkerTuner, WorkerValidationError,
51-
WorkerVersioningStrategy, WorkflowErrorType, WorkflowSlotKind,
45+
FixedSizeSlotSupplier, LocalActivitySlotKind, NamespaceCapabilities, NexusSlotKind, PollError,
46+
PollerBehavior, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder,
47+
ResourceBasedTuner, ResourceSlotOptions, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType,
48+
SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier,
49+
SlotSupplierOptions, SlotSupplierPermit, TunerBuilder, TunerHolder, TunerHolderOptions,
50+
TunerHolderOptionsBuilder, Worker, WorkerConfig, WorkerConfigBuilder, WorkerTuner,
51+
WorkerValidationError, WorkerVersioningStrategy, WorkflowErrorType, WorkflowSlotKind,
5252
};
5353

5454
use crate::{

crates/sdk-core/src/pollers/mod.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,11 @@ where
157157
return match state.poller.poll().await {
158158
Some(Ok((task, permit))) => {
159159
if task == Default::default() {
160+
if state.poller_was_shutdown {
161+
// Server sent an empty response after we initiated
162+
// shutdown — this is the graceful shutdown signal.
163+
return None;
164+
}
160165
// We get the default proto in the event that the long poll
161166
// times out.
162167
debug!("Poll {} task timeout", T::task_name());
@@ -276,3 +281,93 @@ pub(crate) fn new_nexus_task_poller(
276281
)
277282
.into_stream()
278283
}
284+
285+
#[cfg(test)]
286+
mod tests {
287+
use super::*;
288+
use crate::{
289+
abstractions::tests::fixed_size_permit_dealer, pollers::MockPermittedPollBuffer,
290+
test_help::mock_poller, worker::ActivitySlotKind,
291+
};
292+
use futures_util::{StreamExt, pin_mut};
293+
use std::sync::{
294+
Arc,
295+
atomic::{AtomicUsize, Ordering},
296+
};
297+
298+
/// Verify that empty responses after shutdown are not treated as poll timeout and retried
299+
/// indefinitely
300+
#[tokio::test]
301+
async fn empty_response_after_shutdown_terminates_stream() {
302+
let poll_count = Arc::new(AtomicUsize::new(0));
303+
let poll_count_clone = poll_count.clone();
304+
305+
let mut mock_poller = mock_poller();
306+
mock_poller.expect_poll().returning(move || {
307+
poll_count_clone.fetch_add(1, Ordering::SeqCst);
308+
Some(Ok(PollActivityTaskQueueResponse::default()))
309+
});
310+
311+
let sem = Arc::new(fixed_size_permit_dealer::<ActivitySlotKind>(10));
312+
let shutdown_token = CancellationToken::new();
313+
314+
let stream = new_activity_task_poller(
315+
Box::new(MockPermittedPollBuffer::new(sem, mock_poller)),
316+
MetricsContext::no_op(),
317+
shutdown_token.clone(),
318+
);
319+
pin_mut!(stream);
320+
321+
shutdown_token.cancel();
322+
323+
let result = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next()).await;
324+
assert!(
325+
result.is_ok(),
326+
"Stream should terminate promptly after shutdown, not hang"
327+
);
328+
assert!(
329+
result.unwrap().is_none(),
330+
"Stream should return None (terminated) on empty response after shutdown"
331+
);
332+
333+
let total = poll_count.load(Ordering::SeqCst);
334+
assert!(
335+
total < 5,
336+
"Expected stream to terminate quickly, but poller was called {total} times"
337+
);
338+
}
339+
340+
#[tokio::test]
341+
async fn empty_response_before_shutdown_retries() {
342+
let mut mock_poller = mock_poller();
343+
let call_count = Arc::new(AtomicUsize::new(0));
344+
let call_count_clone = call_count.clone();
345+
mock_poller.expect_poll().returning(move || {
346+
let n = call_count_clone.fetch_add(1, Ordering::SeqCst);
347+
if n < 2 {
348+
Some(Ok(PollActivityTaskQueueResponse::default()))
349+
} else {
350+
None
351+
}
352+
});
353+
354+
let sem = Arc::new(fixed_size_permit_dealer::<ActivitySlotKind>(10));
355+
let shutdown_token = CancellationToken::new();
356+
357+
let stream = new_activity_task_poller(
358+
Box::new(MockPermittedPollBuffer::new(sem, mock_poller)),
359+
MetricsContext::no_op(),
360+
shutdown_token,
361+
);
362+
pin_mut!(stream);
363+
364+
// Without shutdown, empty responses should be skipped and the stream terminates
365+
// only when the poller returns None.
366+
let result = stream.next().await;
367+
assert!(
368+
result.is_none(),
369+
"Stream should end when poller returns None"
370+
);
371+
assert_eq!(call_count.load(Ordering::SeqCst), 3);
372+
}
373+
}

0 commit comments

Comments
 (0)