Skip to content

Commit f8bc1ad

Browse files
committed
Add integ test / log line for Kannan & remove temp fix
1 parent 53914bc commit f8bc1ad

5 files changed

Lines changed: 114 additions & 15 deletions

File tree

crates/sdk-core/src/core_tests/workers.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1330,8 +1330,7 @@ async fn graceful_shutdown_sends_shutdown_worker_rpc_during_initiate() {
13301330
/// without dbg_panic!.
13311331
#[tokio::test]
13321332
async fn all_permits_tracker_timeout_accommodates_graceful_poll_delay() {
1333-
use crate::abstractions::tests::fixed_size_permit_dealer;
1334-
use crate::worker::WorkflowSlotKind;
1333+
use crate::{abstractions::tests::fixed_size_permit_dealer, worker::WorkflowSlotKind};
13351334

13361335
let dealer = fixed_size_permit_dealer::<WorkflowSlotKind>(5);
13371336
let rcv = dealer.get_extant_count_rcv();

crates/sdk-core/src/pollers/poll_buffer.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -370,18 +370,7 @@ where
370370
let shutdown_clone = shutdown.clone();
371371

372372
let r = if graceful_shutdown.load(Ordering::Relaxed) {
373-
// TEMP_FIX: Give the server a reasonable window to
374-
// complete the poll after ShutdownWorker. Fall back
375-
// to cancelling the poll if it takes too long, to
376-
// avoid a 60s hang due to a server-side race
377-
// (temporalio/temporal#9545).
378-
let graceful_interruptor = shutdown_clone
379-
.cancelled()
380-
.then(|_| tokio::time::sleep(Duration::from_secs(5)));
381-
tokio::select! {
382-
r = pf(timeout_override) => r,
383-
_ = graceful_interruptor => return,
384-
}
373+
pf(timeout_override).await
385374
} else {
386375
let poll_interruptor = shutdown.cancelled().then(|_| async move {
387376
if let Some(w) = poll_shutdown_interrupt_wait {

crates/sdk-core/src/worker/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,6 +1417,7 @@ impl Worker {
14171417
.as_ref()
14181418
.map(|hm| hm.heartbeat_callback.clone()());
14191419
let handle = tokio::spawn(async move {
1420+
let start_time = std::time::Instant::now();
14201421
match client
14211422
.shutdown_worker(sticky_name, task_queue, task_queue_types, heartbeat)
14221423
.await
@@ -1434,6 +1435,10 @@ impl Worker {
14341435
}
14351436
_ => {}
14361437
}
1438+
warn!(
1439+
"shutdown_worker rpc completed, took {:?}",
1440+
start_time.elapsed()
1441+
)
14371442
});
14381443
*guard = Some(handle);
14391444
}

crates/sdk-core/tests/common/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,6 +1078,8 @@ pub(crate) fn integ_dev_server_config(
10781078
"frontend.WorkerHeartbeatsEnabled=true".to_owned(),
10791079
"--dynamic-config-value".to_owned(),
10801080
"frontend.ListWorkersEnabled=true".to_owned(),
1081+
"--dynamic-config-value".to_owned(),
1082+
"frontend.enableCancelWorkerPollsOnShutdown=true".to_owned(),
10811083
"--search-attribute".to_string(),
10821084
format!("{SEARCH_ATTR_TXT}=Text"),
10831085
"--search-attribute".to_string(),

crates/sdk-core/tests/integ_tests/worker_tests.rs

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ use std::{
1818
},
1919
time::Duration,
2020
};
21-
use temporalio_client::{Connection, WorkflowStartOptions};
21+
use temporalio_client::{
22+
Connection, UntypedWorkflow, WorkflowFetchHistoryOptions, WorkflowStartOptions,
23+
WorkflowTerminateOptions,
24+
};
2225
use temporalio_common::{
2326
data_converters::{DataConverter, RawValue},
2427
protos::{
@@ -1140,3 +1143,104 @@ fn test_default_build_id() {
11401143
assert!(!o.deployment_options.version.build_id.is_empty());
11411144
assert_ne!(o.deployment_options.version.build_id, "undetermined");
11421145
}
1146+
1147+
#[workflow]
1148+
#[derive(Default)]
1149+
struct ShutdownTimerActivityLoopWf;
1150+
1151+
#[workflow_methods]
1152+
impl ShutdownTimerActivityLoopWf {
1153+
#[run]
1154+
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
1155+
loop {
1156+
ctx.timer(Duration::from_millis(100)).await;
1157+
ctx.start_activity(
1158+
StdActivities::no_op,
1159+
(),
1160+
ActivityOptions {
1161+
start_to_close_timeout: Some(Duration::from_secs(10)),
1162+
..Default::default()
1163+
},
1164+
)
1165+
.await
1166+
.map_err(|e| WorkflowTermination::from(anyhow::Error::from(e)))?;
1167+
}
1168+
}
1169+
}
1170+
1171+
/// Starts 10 workflows that each run a tight timer+activity loop, then shuts down the worker
1172+
/// and verifies:
1173+
/// 1. Shutdown completes rapidly (< 5s)
1174+
/// 2. No workflow task failures or timeouts appear in any workflow's history
1175+
#[tokio::test]
1176+
async fn shutdown_during_active_timer_activity_workflows() {
1177+
let wf_name = "shutdown_during_active_timer_activity_workflows";
1178+
let num_workflows = 10;
1179+
1180+
let mut starter = CoreWfStarter::new(wf_name);
1181+
starter.sdk_config.register_activities(StdActivities);
1182+
let mut worker = starter.worker().await;
1183+
worker.register_workflow::<ShutdownTimerActivityLoopWf>();
1184+
1185+
let task_queue = starter.get_task_queue().to_owned();
1186+
let mut wf_ids = Vec::with_capacity(num_workflows);
1187+
for i in 0..num_workflows {
1188+
let wf_id = format!("{task_queue}-{i}");
1189+
worker
1190+
.submit_workflow(
1191+
ShutdownTimerActivityLoopWf::run,
1192+
(),
1193+
WorkflowStartOptions::new(task_queue.clone(), wf_id.clone()).build(),
1194+
)
1195+
.await
1196+
.unwrap();
1197+
wf_ids.push(wf_id);
1198+
}
1199+
// Don't wait for workflow completion — these loop forever
1200+
worker.fetch_results = false;
1201+
1202+
let shutdown_handle = worker.inner_mut().shutdown_handle();
1203+
let run_fut = async { worker.run_until_done().await.unwrap() };
1204+
1205+
let shutdown_fut = async {
1206+
// Let workflows run a few iterations
1207+
tokio::time::sleep(Duration::from_secs(2)).await;
1208+
shutdown_handle();
1209+
};
1210+
1211+
let shutdown_start = std::time::Instant::now();
1212+
tokio::join!(run_fut, shutdown_fut);
1213+
let shutdown_elapsed = shutdown_start.elapsed();
1214+
1215+
assert!(
1216+
shutdown_elapsed < Duration::from_secs(5),
1217+
"Worker shutdown took {shutdown_elapsed:?}, expected < 5s"
1218+
);
1219+
1220+
let client = starter.get_client().await;
1221+
for wf_id in &wf_ids {
1222+
client
1223+
.get_workflow_handle::<UntypedWorkflow>(wf_id)
1224+
.terminate(WorkflowTerminateOptions::default())
1225+
.await
1226+
.unwrap();
1227+
1228+
let history = client
1229+
.get_workflow_handle::<UntypedWorkflow>(wf_id)
1230+
.fetch_history(WorkflowFetchHistoryOptions::default())
1231+
.await
1232+
.unwrap();
1233+
let bad_events: Vec<_> = history
1234+
.events()
1235+
.iter()
1236+
.filter(|e| {
1237+
e.event_type() == EventType::WorkflowTaskFailed
1238+
|| e.event_type() == EventType::WorkflowTaskTimedOut
1239+
})
1240+
.collect();
1241+
assert!(
1242+
bad_events.is_empty(),
1243+
"Workflow {wf_id} had unexpected WFT failures/timeouts: {bad_events:?}"
1244+
);
1245+
}
1246+
}

0 commit comments

Comments
 (0)