Skip to content

Commit b1c7dcf

Browse files
committed
Add shutdown test to cloud tests
1 parent ffbb284 commit b1c7dcf

3 files changed

Lines changed: 123 additions & 110 deletions

File tree

crates/sdk-core/tests/cloud_tests.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,8 @@ async fn grpc_message_too_large_test() {
3232
async fn priority_values_sent_to_server() {
3333
shared_tests::priority::priority_values_sent_to_server().await
3434
}
35+
36+
#[tokio::test]
37+
async fn shutdown_during_active_timer_activity_workflows() {
38+
shared_tests::shutdown_during_active_timer_activity_workflows().await
39+
}

crates/sdk-core/tests/integ_tests/worker_tests.rs

Lines changed: 2 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@ use std::{
1818
},
1919
time::Duration,
2020
};
21-
use temporalio_client::{
22-
Connection, UntypedWorkflow, WorkflowFetchHistoryOptions, WorkflowStartOptions,
23-
WorkflowTerminateOptions,
24-
};
21+
use temporalio_client::{Connection, WorkflowStartOptions};
2522
use temporalio_common::{
2623
data_converters::{DataConverter, RawValue},
2724
protos::{
@@ -992,107 +989,7 @@ fn test_default_build_id() {
992989
assert_ne!(o.deployment_options.version.build_id, "undetermined");
993990
}
994991

995-
#[workflow]
996-
#[derive(Default)]
997-
struct ShutdownTimerActivityLoopWf;
998-
999-
#[workflow_methods]
1000-
impl ShutdownTimerActivityLoopWf {
1001-
#[run]
1002-
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
1003-
loop {
1004-
ctx.timer(Duration::from_millis(10)).await;
1005-
ctx.start_activity(
1006-
StdActivities::no_op,
1007-
(),
1008-
ActivityOptions::start_to_close_timeout(Duration::from_secs(10)),
1009-
)
1010-
.await
1011-
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
1012-
}
1013-
}
1014-
}
1015-
1016-
/// Starts 10 workflows that each run a tight timer+activity loop, then shuts down the worker
1017-
/// and verifies:
1018-
/// 1. Shutdown completes rapidly (< 5s)
1019-
/// 2. No workflow task failures or timeouts appear in any workflow's history
1020992
#[tokio::test]
1021993
async fn shutdown_during_active_timer_activity_workflows() {
1022-
let wf_name = "shutdown_during_active_timer_activity_workflows";
1023-
let num_workflows = 10;
1024-
1025-
let mut starter = CoreWfStarter::new(wf_name);
1026-
starter.sdk_config.register_activities(StdActivities);
1027-
let mut worker = starter.worker().await;
1028-
worker.register_workflow::<ShutdownTimerActivityLoopWf>();
1029-
1030-
let core = worker.core_worker();
1031-
core.validate().await.unwrap();
1032-
assert!(
1033-
core.get_namespace_capabilities().graceful_poll_shutdown(),
1034-
"Server must support graceful poll shutdown for this test"
1035-
);
1036-
1037-
let task_queue = starter.get_task_queue().to_owned();
1038-
let mut wf_ids = Vec::with_capacity(num_workflows);
1039-
for i in 0..num_workflows {
1040-
let wf_id = format!("{task_queue}-{i}");
1041-
worker
1042-
.submit_workflow(
1043-
ShutdownTimerActivityLoopWf::run,
1044-
(),
1045-
WorkflowStartOptions::new(task_queue.clone(), wf_id.clone()).build(),
1046-
)
1047-
.await
1048-
.unwrap();
1049-
wf_ids.push(wf_id);
1050-
}
1051-
// Don't wait for workflow completion — these loop forever
1052-
worker.fetch_results = false;
1053-
1054-
let shutdown_handle = worker.inner_mut().shutdown_handle();
1055-
let run_fut = async { worker.run_until_done().await.unwrap() };
1056-
1057-
let shutdown_fut = async {
1058-
// Let workflows run a few iterations
1059-
tokio::time::sleep(Duration::from_secs(2)).await;
1060-
shutdown_handle();
1061-
};
1062-
1063-
let shutdown_start = std::time::Instant::now();
1064-
tokio::join!(run_fut, shutdown_fut);
1065-
let shutdown_elapsed = shutdown_start.elapsed();
1066-
1067-
assert!(
1068-
shutdown_elapsed < Duration::from_secs(5),
1069-
"Worker shutdown took {shutdown_elapsed:?}, expected < 5s"
1070-
);
1071-
1072-
let client = starter.get_client().await;
1073-
for wf_id in &wf_ids {
1074-
client
1075-
.get_workflow_handle::<UntypedWorkflow>(wf_id)
1076-
.terminate(WorkflowTerminateOptions::default())
1077-
.await
1078-
.unwrap();
1079-
1080-
let history = client
1081-
.get_workflow_handle::<UntypedWorkflow>(wf_id)
1082-
.fetch_history(WorkflowFetchHistoryOptions::default())
1083-
.await
1084-
.unwrap();
1085-
let bad_events: Vec<_> = history
1086-
.events()
1087-
.iter()
1088-
.filter(|e| {
1089-
e.event_type() == EventType::WorkflowTaskFailed
1090-
|| e.event_type() == EventType::WorkflowTaskTimedOut
1091-
})
1092-
.collect();
1093-
assert!(
1094-
bad_events.is_empty(),
1095-
"Workflow {wf_id} had unexpected WFT failures/timeouts: {bad_events:?}"
1096-
);
1097-
}
994+
shared_tests::shutdown_during_active_timer_activity_workflows().await
1098995
}

crates/sdk-core/tests/shared_tests/mod.rs

Lines changed: 116 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
//! Shared tests that are meant to be run against both local dev server and cloud
22
3-
use crate::common::CoreWfStarter;
4-
use std::sync::{
5-
Arc,
6-
atomic::{AtomicBool, Ordering::Relaxed},
3+
use crate::common::{CoreWfStarter, activity_functions::StdActivities};
4+
use std::{
5+
sync::{
6+
Arc,
7+
atomic::{AtomicBool, Ordering::Relaxed},
8+
},
9+
time::Duration,
10+
};
11+
use temporalio_client::{
12+
WorkflowFetchHistoryOptions, WorkflowStartOptions, WorkflowTerminateOptions,
713
};
814
use temporalio_common::{
15+
UntypedWorkflow,
916
protos::temporal::api::{
1017
enums::v1::{EventType, WorkflowTaskFailedCause::GrpcMessageTooLarge},
1118
history::v1::history_event::Attributes::{
@@ -15,7 +22,7 @@ use temporalio_common::{
1522
worker::WorkerTaskTypes,
1623
};
1724
use temporalio_macros::{workflow, workflow_methods};
18-
use temporalio_sdk::{WorkflowContext, WorkflowResult};
25+
use temporalio_sdk::{ActivityOptions, WorkflowContext, WorkflowResult, WorkflowTermination};
1926

2027
pub(crate) mod priority;
2128

@@ -92,3 +99,107 @@ pub(crate) fn is_oversize_grpc_event(
9299
false
93100
}
94101
}
102+
103+
#[workflow]
104+
#[derive(Default)]
105+
struct ShutdownTimerActivityLoopWf;
106+
107+
#[workflow_methods]
108+
impl ShutdownTimerActivityLoopWf {
109+
#[run]
110+
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
111+
loop {
112+
ctx.timer(Duration::from_millis(10)).await;
113+
ctx.start_activity(
114+
StdActivities::no_op,
115+
(),
116+
ActivityOptions::start_to_close_timeout(Duration::from_secs(10)),
117+
)
118+
.await
119+
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
120+
}
121+
}
122+
}
123+
124+
/// Starts 10 workflows that each run a tight timer+activity loop, then shuts down the worker
125+
/// and verifies:
126+
/// 1. Shutdown completes rapidly (< 5s)
127+
/// 2. No workflow task failures or timeouts appear in any workflow's history
128+
pub(crate) async fn shutdown_during_active_timer_activity_workflows() {
129+
let wf_name = "shutdown_during_active_timer_activity_workflows";
130+
let num_workflows = 10;
131+
132+
let mut starter = CoreWfStarter::new(wf_name);
133+
starter.sdk_config.register_activities(StdActivities);
134+
let mut worker = starter.worker().await;
135+
worker.register_workflow::<ShutdownTimerActivityLoopWf>();
136+
137+
let core = worker.core_worker();
138+
core.validate().await.unwrap();
139+
assert!(
140+
core.get_namespace_capabilities().graceful_poll_shutdown(),
141+
"Server must support graceful poll shutdown for this test"
142+
);
143+
144+
let task_queue = starter.get_task_queue().to_owned();
145+
let mut wf_ids = Vec::with_capacity(num_workflows);
146+
for i in 0..num_workflows {
147+
let wf_id = format!("{task_queue}-{i}");
148+
worker
149+
.submit_workflow(
150+
ShutdownTimerActivityLoopWf::run,
151+
(),
152+
WorkflowStartOptions::new(task_queue.clone(), wf_id.clone()).build(),
153+
)
154+
.await
155+
.unwrap();
156+
wf_ids.push(wf_id);
157+
}
158+
// Don't wait for workflow completion — these loop forever
159+
worker.fetch_results = false;
160+
161+
let shutdown_handle = worker.inner_mut().shutdown_handle();
162+
let run_fut = async { worker.run_until_done().await.unwrap() };
163+
164+
let shutdown_fut = async {
165+
// Let workflows run a few iterations
166+
tokio::time::sleep(Duration::from_secs(2)).await;
167+
shutdown_handle();
168+
};
169+
170+
let shutdown_start = std::time::Instant::now();
171+
tokio::join!(run_fut, shutdown_fut);
172+
let shutdown_elapsed = shutdown_start.elapsed();
173+
174+
assert!(
175+
shutdown_elapsed < Duration::from_secs(5),
176+
"Worker shutdown took {shutdown_elapsed:?}, expected < 5s"
177+
);
178+
179+
let client = starter.get_client().await;
180+
for wf_id in &wf_ids {
181+
client
182+
.get_workflow_handle::<UntypedWorkflow>(wf_id)
183+
.terminate(WorkflowTerminateOptions::default())
184+
.await
185+
.unwrap();
186+
187+
let history = client
188+
.get_workflow_handle::<UntypedWorkflow>(wf_id)
189+
.fetch_history(WorkflowFetchHistoryOptions::default())
190+
.await
191+
.unwrap();
192+
let bad_events: Vec<_> = history
193+
.events()
194+
.iter()
195+
.filter(|e| {
196+
e.event_type() == EventType::WorkflowTaskFailed
197+
|| e.event_type() == EventType::WorkflowTaskTimedOut
198+
})
199+
.collect();
200+
assert!(
201+
bad_events.is_empty(),
202+
"Workflow {wf_id} had unexpected WFT failures/timeouts: {bad_events:?}"
203+
);
204+
}
205+
}

0 commit comments

Comments
 (0)