Skip to content

Commit 8ad3510

Browse files
committed
Add shutdown test to cloud tests
1 parent ffbb284 commit 8ad3510

3 files changed

Lines changed: 128 additions & 110 deletions

File tree

crates/sdk-core/tests/cloud_tests.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,8 @@ async fn grpc_message_too_large_test() {
3232
async fn priority_values_sent_to_server() {
3333
shared_tests::priority::priority_values_sent_to_server().await
3434
}
35+
36+
#[tokio::test]
37+
async fn shutdown_during_active_timer_activity_workflows() {
38+
shared_tests::shutdown_during_active_timer_activity_workflows().await
39+
}

crates/sdk-core/tests/integ_tests/worker_tests.rs

Lines changed: 2 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@ use std::{
1818
},
1919
time::Duration,
2020
};
21-
use temporalio_client::{
22-
Connection, UntypedWorkflow, WorkflowFetchHistoryOptions, WorkflowStartOptions,
23-
WorkflowTerminateOptions,
24-
};
21+
use temporalio_client::{Connection, WorkflowStartOptions};
2522
use temporalio_common::{
2623
data_converters::{DataConverter, RawValue},
2724
protos::{
@@ -992,107 +989,7 @@ fn test_default_build_id() {
992989
assert_ne!(o.deployment_options.version.build_id, "undetermined");
993990
}
994991

995-
#[workflow]
996-
#[derive(Default)]
997-
struct ShutdownTimerActivityLoopWf;
998-
999-
#[workflow_methods]
1000-
impl ShutdownTimerActivityLoopWf {
1001-
#[run]
1002-
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
1003-
loop {
1004-
ctx.timer(Duration::from_millis(10)).await;
1005-
ctx.start_activity(
1006-
StdActivities::no_op,
1007-
(),
1008-
ActivityOptions::start_to_close_timeout(Duration::from_secs(10)),
1009-
)
1010-
.await
1011-
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
1012-
}
1013-
}
1014-
}
1015-
1016-
/// Starts 10 workflows that each run a tight timer+activity loop, then shuts down the worker
1017-
/// and verifies:
1018-
/// 1. Shutdown completes rapidly (< 5s)
1019-
/// 2. No workflow task failures or timeouts appear in any workflow's history
1020992
#[tokio::test]
1021993
async fn shutdown_during_active_timer_activity_workflows() {
1022-
let wf_name = "shutdown_during_active_timer_activity_workflows";
1023-
let num_workflows = 10;
1024-
1025-
let mut starter = CoreWfStarter::new(wf_name);
1026-
starter.sdk_config.register_activities(StdActivities);
1027-
let mut worker = starter.worker().await;
1028-
worker.register_workflow::<ShutdownTimerActivityLoopWf>();
1029-
1030-
let core = worker.core_worker();
1031-
core.validate().await.unwrap();
1032-
assert!(
1033-
core.get_namespace_capabilities().graceful_poll_shutdown(),
1034-
"Server must support graceful poll shutdown for this test"
1035-
);
1036-
1037-
let task_queue = starter.get_task_queue().to_owned();
1038-
let mut wf_ids = Vec::with_capacity(num_workflows);
1039-
for i in 0..num_workflows {
1040-
let wf_id = format!("{task_queue}-{i}");
1041-
worker
1042-
.submit_workflow(
1043-
ShutdownTimerActivityLoopWf::run,
1044-
(),
1045-
WorkflowStartOptions::new(task_queue.clone(), wf_id.clone()).build(),
1046-
)
1047-
.await
1048-
.unwrap();
1049-
wf_ids.push(wf_id);
1050-
}
1051-
// Don't wait for workflow completion — these loop forever
1052-
worker.fetch_results = false;
1053-
1054-
let shutdown_handle = worker.inner_mut().shutdown_handle();
1055-
let run_fut = async { worker.run_until_done().await.unwrap() };
1056-
1057-
let shutdown_fut = async {
1058-
// Let workflows run a few iterations
1059-
tokio::time::sleep(Duration::from_secs(2)).await;
1060-
shutdown_handle();
1061-
};
1062-
1063-
let shutdown_start = std::time::Instant::now();
1064-
tokio::join!(run_fut, shutdown_fut);
1065-
let shutdown_elapsed = shutdown_start.elapsed();
1066-
1067-
assert!(
1068-
shutdown_elapsed < Duration::from_secs(5),
1069-
"Worker shutdown took {shutdown_elapsed:?}, expected < 5s"
1070-
);
1071-
1072-
let client = starter.get_client().await;
1073-
for wf_id in &wf_ids {
1074-
client
1075-
.get_workflow_handle::<UntypedWorkflow>(wf_id)
1076-
.terminate(WorkflowTerminateOptions::default())
1077-
.await
1078-
.unwrap();
1079-
1080-
let history = client
1081-
.get_workflow_handle::<UntypedWorkflow>(wf_id)
1082-
.fetch_history(WorkflowFetchHistoryOptions::default())
1083-
.await
1084-
.unwrap();
1085-
let bad_events: Vec<_> = history
1086-
.events()
1087-
.iter()
1088-
.filter(|e| {
1089-
e.event_type() == EventType::WorkflowTaskFailed
1090-
|| e.event_type() == EventType::WorkflowTaskTimedOut
1091-
})
1092-
.collect();
1093-
assert!(
1094-
bad_events.is_empty(),
1095-
"Workflow {wf_id} had unexpected WFT failures/timeouts: {bad_events:?}"
1096-
);
1097-
}
994+
shared_tests::shutdown_during_active_timer_activity_workflows().await
1098995
}

crates/sdk-core/tests/shared_tests/mod.rs

Lines changed: 121 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
//! Shared tests that are meant to be run against both local dev server and cloud
22
3-
use crate::common::CoreWfStarter;
4-
use std::sync::{
5-
Arc,
6-
atomic::{AtomicBool, Ordering::Relaxed},
3+
use crate::common::{CoreWfStarter, activity_functions::StdActivities};
4+
use std::{
5+
sync::{
6+
Arc,
7+
atomic::{AtomicBool, Ordering::Relaxed},
8+
},
9+
time::Duration,
10+
};
11+
use temporalio_client::{
12+
WorkflowFetchHistoryOptions, WorkflowStartOptions, WorkflowTerminateOptions,
713
};
814
use temporalio_common::{
15+
UntypedWorkflow,
916
protos::temporal::api::{
1017
enums::v1::{EventType, WorkflowTaskFailedCause::GrpcMessageTooLarge},
1118
history::v1::history_event::Attributes::{
@@ -15,7 +22,7 @@ use temporalio_common::{
1522
worker::WorkerTaskTypes,
1623
};
1724
use temporalio_macros::{workflow, workflow_methods};
18-
use temporalio_sdk::{WorkflowContext, WorkflowResult};
25+
use temporalio_sdk::{ActivityOptions, WorkflowContext, WorkflowResult, WorkflowTermination};
1926

2027
pub(crate) mod priority;
2128

@@ -92,3 +99,112 @@ pub(crate) fn is_oversize_grpc_event(
9299
false
93100
}
94101
}
102+
103+
#[workflow]
104+
#[derive(Default)]
105+
struct ShutdownTimerActivityLoopWf;
106+
107+
#[workflow_methods]
108+
impl ShutdownTimerActivityLoopWf {
109+
#[run]
110+
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
111+
loop {
112+
ctx.timer(Duration::from_millis(10)).await;
113+
ctx.start_activity(
114+
StdActivities::no_op,
115+
(),
116+
ActivityOptions::start_to_close_timeout(Duration::from_secs(10)),
117+
)
118+
.await
119+
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
120+
}
121+
}
122+
}
123+
124+
/// Starts 10 workflows that each run a tight timer+activity loop, then shuts down the worker
125+
/// and verifies:
126+
/// 1. Shutdown completes rapidly (< 5s)
127+
/// 2. No workflow task failures or timeouts appear in any workflow's history
128+
pub(crate) async fn shutdown_during_active_timer_activity_workflows() {
129+
let wf_name = "shutdown_during_active_timer_activity_workflows";
130+
let num_workflows = 10;
131+
132+
let mut starter =
133+
if let Some(wfs) = CoreWfStarter::new_cloud_or_local(wf_name, ">=1.6.3-serverless").await {
134+
wfs
135+
} else {
136+
return;
137+
};
138+
starter.sdk_config.register_activities(StdActivities);
139+
let mut worker = starter.worker().await;
140+
worker.register_workflow::<ShutdownTimerActivityLoopWf>();
141+
142+
let core = worker.core_worker();
143+
core.validate().await.unwrap();
144+
assert!(
145+
core.get_namespace_capabilities().graceful_poll_shutdown(),
146+
"Server must support graceful poll shutdown for this test"
147+
);
148+
149+
let task_queue = starter.get_task_queue().to_owned();
150+
let mut wf_ids = Vec::with_capacity(num_workflows);
151+
for i in 0..num_workflows {
152+
let wf_id = format!("{task_queue}-{i}");
153+
worker
154+
.submit_workflow(
155+
ShutdownTimerActivityLoopWf::run,
156+
(),
157+
WorkflowStartOptions::new(task_queue.clone(), wf_id.clone()).build(),
158+
)
159+
.await
160+
.unwrap();
161+
wf_ids.push(wf_id);
162+
}
163+
// Don't wait for workflow completion — these loop forever
164+
worker.fetch_results = false;
165+
166+
let shutdown_handle = worker.inner_mut().shutdown_handle();
167+
let run_fut = async { worker.run_until_done().await.unwrap() };
168+
169+
let shutdown_fut = async {
170+
// Let workflows run a few iterations
171+
tokio::time::sleep(Duration::from_secs(2)).await;
172+
shutdown_handle();
173+
};
174+
175+
let shutdown_start = std::time::Instant::now();
176+
tokio::join!(run_fut, shutdown_fut);
177+
let shutdown_elapsed = shutdown_start.elapsed();
178+
179+
assert!(
180+
shutdown_elapsed < Duration::from_secs(5),
181+
"Worker shutdown took {shutdown_elapsed:?}, expected < 5s"
182+
);
183+
184+
let client = starter.get_client().await;
185+
for wf_id in &wf_ids {
186+
client
187+
.get_workflow_handle::<UntypedWorkflow>(wf_id)
188+
.terminate(WorkflowTerminateOptions::default())
189+
.await
190+
.unwrap();
191+
192+
let history = client
193+
.get_workflow_handle::<UntypedWorkflow>(wf_id)
194+
.fetch_history(WorkflowFetchHistoryOptions::default())
195+
.await
196+
.unwrap();
197+
let bad_events: Vec<_> = history
198+
.events()
199+
.iter()
200+
.filter(|e| {
201+
e.event_type() == EventType::WorkflowTaskFailed
202+
|| e.event_type() == EventType::WorkflowTaskTimedOut
203+
})
204+
.collect();
205+
assert!(
206+
bad_events.is_empty(),
207+
"Workflow {wf_id} had unexpected WFT failures/timeouts: {bad_events:?}"
208+
);
209+
}
210+
}

0 commit comments

Comments
 (0)