Skip to content

Commit 038ed5b

Browse files
committed
Clean up temp fix / other minor cleanup
1 parent f8bc1ad commit 038ed5b

5 files changed

Lines changed: 12 additions & 245 deletions

File tree

crates/sdk-core/src/core_tests/workers.rs

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,42 +1322,3 @@ async fn graceful_shutdown_sends_shutdown_worker_rpc_during_initiate() {
13221322

13231323
worker.finalize_shutdown().await;
13241324
}
1325-
1326-
/// The all_permits_tracker timeout must accommodate the 5s TEMP_FIX graceful poll timeout.
1327-
/// When a poll holds a permit during the graceful shutdown window, all_permits_tracker must
1328-
/// not fire before the permit is released. This test acquires a permit, holds it for 3s
1329-
/// (simulating a poll blocked during graceful shutdown), and verifies shutdown() completes
1330-
/// without dbg_panic!.
1331-
#[tokio::test]
1332-
async fn all_permits_tracker_timeout_accommodates_graceful_poll_delay() {
1333-
use crate::{abstractions::tests::fixed_size_permit_dealer, worker::WorkflowSlotKind};
1334-
1335-
let dealer = fixed_size_permit_dealer::<WorkflowSlotKind>(5);
1336-
let rcv = dealer.get_extant_count_rcv();
1337-
1338-
// Acquire a permit (simulating a LongPollBuffer poll task holding one)
1339-
let permit = dealer.acquire_owned().await;
1340-
1341-
// Start the same wait logic as shutdown()'s all_permits_tracker
1342-
// This must match the timeout in Worker::shutdown()'s all_permits_tracker select!
1343-
let wait_result = tokio::time::timeout(Duration::from_secs(6), async {
1344-
let mut rcv = rcv;
1345-
let _ = rcv.wait_for(|x| *x == 0).await;
1346-
});
1347-
1348-
// Release the permit after 3s (simulating the graceful poll timeout releasing it)
1349-
let release_task = tokio::spawn(async move {
1350-
tokio::time::sleep(Duration::from_secs(3)).await;
1351-
drop(permit);
1352-
});
1353-
1354-
let result = wait_result.await;
1355-
release_task.await.unwrap();
1356-
1357-
// With the 6s timeout, this should succeed (permit released at 3s < 6s).
1358-
// With the old 1s timeout, this would fail (permit released at 3s > 1s).
1359-
assert!(
1360-
result.is_ok(),
1361-
"all_permits_tracker should complete within 6s when permit is held for 3s"
1362-
);
1363-
}

crates/sdk-core/src/pollers/poll_buffer.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,6 @@ where
367367
};
368368
let graceful_shutdown = graceful_shutdown.clone();
369369
let poll_task = tokio::spawn(async move {
370-
let shutdown_clone = shutdown.clone();
371-
372370
let r = if graceful_shutdown.load(Ordering::Relaxed) {
373371
pf(timeout_override).await
374372
} else {
@@ -389,10 +387,11 @@ where
389387
}
390388
let (should_forward, backoff_duration) = report_handle.poll_result(&r);
391389
if let Some(duration) = backoff_duration {
392-
// Apply backoff BEFORE dropping active_guard to prevent next poll from starting
390+
// Apply backoff BEFORE dropping active_guard to prevent next poll from
391+
// starting
393392
tokio::select! {
394393
_ = tokio::time::sleep(duration) => return,
395-
_ = shutdown_clone.cancelled() => (),
394+
_ = shutdown.cancelled() => (),
396395
};
397396
}
398397
drop(active_guard);

crates/sdk-core/src/worker/mod.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -978,8 +978,7 @@ impl Worker {
978978
// Wait for all permits to be released, but don't totally hang real-world shutdown.
979979
tokio::select! {
980980
_ = async { self.all_permits_tracker.lock().await.all_done().await } => {},
981-
// TEMP: Shutdown can take 5s longer due to TEMP_FIX in poll_buffer.rs
982-
_ = tokio::time::sleep(Duration::from_secs(6)) => {
981+
_ = tokio::time::sleep(Duration::from_secs(1)) => {
983982
dbg_panic!("Waiting for all slot permits to release took too long!");
984983
}
985984
}
@@ -1417,7 +1416,6 @@ impl Worker {
14171416
.as_ref()
14181417
.map(|hm| hm.heartbeat_callback.clone()());
14191418
let handle = tokio::spawn(async move {
1420-
let start_time = std::time::Instant::now();
14211419
match client
14221420
.shutdown_worker(sticky_name, task_queue, task_queue_types, heartbeat)
14231421
.await
@@ -1435,10 +1433,6 @@ impl Worker {
14351433
}
14361434
_ => {}
14371435
}
1438-
warn!(
1439-
"shutdown_worker rpc completed, took {:?}",
1440-
start_time.elapsed()
1441-
)
14421436
});
14431437
*guard = Some(handle);
14441438
}

crates/sdk-core/src/worker/workflow/wft_poller.rs

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,6 @@ fn new_wft_poller(
216216
return match poller.poll().await {
217217
Some(Ok((wft, permit))) => {
218218
if wft == PollWorkflowTaskQueueResponse::default() {
219-
if shutdown_token.is_cancelled() {
220-
poller.shutdown_box().await;
221-
return None;
222-
}
223219
// We get the default proto in the event that the long poll times out.
224220
debug!("Poll wft timeout");
225221
metrics.wf_tq_poll_empty();
@@ -297,50 +293,24 @@ mod tests {
297293
assert_matches!(stream.next().await, None);
298294
}
299295

300-
/// empty responses after shutdown are retried indefinitely in
301-
/// new_wft_poller, causing the WFT stream to spin. This is what caused the workflow_load
302-
/// heavy test to hang on CI when the server has enableCancelWorkerPollsOnShutdown.
296+
/// When the underlying poller returns None (indicating shutdown), the wrapping WFT stream
297+
/// should also return None to terminate.
303298
#[tokio::test]
304-
async fn empty_response_after_shutdown_terminates_wft_stream() {
305-
use std::sync::atomic::{AtomicUsize, Ordering};
306-
307-
let poll_count = Arc::new(AtomicUsize::new(0));
308-
let poll_count_clone = poll_count.clone();
309-
299+
async fn poller_returning_none_terminates_wft_stream() {
310300
let mut mock_poller = mock_poller();
311-
mock_poller.expect_poll().returning(move || {
312-
poll_count_clone.fetch_add(1, Ordering::SeqCst);
313-
Some(Ok(PollWorkflowTaskQueueResponse::default()))
314-
});
315-
mock_poller.expect_shutdown().returning(|| ());
301+
mock_poller.expect_poll().times(1).returning(|| None);
302+
mock_poller.expect_shutdown().times(1).returning(|| ());
316303

317304
let sem = Arc::new(fixed_size_permit_dealer::<WorkflowSlotKind>(10));
318-
let shutdown_token = CancellationToken::new();
319305

320306
let stream = new_wft_poller(
321307
Box::new(MockPermittedPollBuffer::new(sem, mock_poller)),
322308
MetricsContext::no_op(),
323-
shutdown_token.clone(),
309+
CancellationToken::new(),
324310
);
325311
pin_mut!(stream);
326312

327-
shutdown_token.cancel();
328-
329-
let result = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next()).await;
330-
assert!(
331-
result.is_ok(),
332-
"WFT stream should terminate promptly after shutdown, not spin on retries"
333-
);
334-
assert!(
335-
result.unwrap().is_none(),
336-
"WFT stream should return None on empty response after shutdown"
337-
);
338-
339-
let total = poll_count.load(Ordering::SeqCst);
340-
assert!(
341-
total < 5,
342-
"Expected WFT stream to terminate quickly, but poller was called {total} times"
343-
);
313+
assert_matches!(stream.next().await, None);
344314
}
345315

346316
#[tokio::test]

crates/sdk-core/tests/integ_tests/worker_tests.rs

Lines changed: 1 addition & 158 deletions
Original file line numberDiff line numberDiff line change
@@ -980,163 +980,6 @@ async fn shutdown_worker_not_retried() {
980980
assert_eq!(shutdown_call_count.load(Ordering::Relaxed), 1);
981981
}
982982

983-
/// Reproduces the server-side race in temporalio/temporal#9545 where a poll completes naturally
984-
/// (e.g. long-poll timeout) right as shutdown begins, causing the poller to re-poll AFTER the
985-
/// ShutdownWorker RPC has already been processed. The server never flushes this second poll,
986-
/// so without the 5s TEMP_FIX timeout the worker would hang for 60s.
987-
///
988-
/// Sequence:
989-
/// 1. Worker starts polling with graceful_poll_shutdown enabled
990-
/// 2. initiate_shutdown() fires → shutdown token cancelled, ShutdownWorker RPC spawned
991-
/// 3. First poll returns empty (natural timeout) — but this races with ShutdownWorker
992-
/// 4. Poller re-polls (graceful mode: no select! against shutdown token)
993-
/// 5. ShutdownWorker RPC completes — server flushes nothing (first poll already returned)
994-
/// 6. Second poll hangs forever — server doesn't know about it
995-
/// 7. TEMP_FIX: after 5s the graceful_interruptor cancels the hanging poll
996-
///
997-
/// This test verifies shutdown completes within 10s (not 60s), proving the temp fix works.
998-
/// Remove this test when temporalio/temporal#9545 is fully deployed.
999-
#[tokio::test]
1000-
async fn graceful_shutdown_race_temp_fix_prevents_60s_hang() {
1001-
use prost::Message;
1002-
use std::sync::atomic::AtomicUsize;
1003-
use temporalio_common::protos::temporal::api::{
1004-
namespace::v1::{NamespaceInfo, namespace_info::Capabilities},
1005-
workflowservice::v1::DescribeNamespaceResponse,
1006-
};
1007-
use tokio::sync::Notify;
1008-
1009-
fn grpc_ok_empty() -> tonic::codegen::http::Response<tonic::body::Body> {
1010-
tonic::codegen::http::Response::builder()
1011-
.header("content-type", "application/grpc")
1012-
.header("grpc-status", "0")
1013-
.body(tonic::body::Body::empty())
1014-
.unwrap()
1015-
}
1016-
fn grpc_ok_proto(msg: &impl Message) -> tonic::codegen::http::Response<tonic::body::Body> {
1017-
let encoded = msg.encode_to_vec();
1018-
let mut buf = Vec::with_capacity(5 + encoded.len());
1019-
buf.push(0);
1020-
buf.extend_from_slice(&(encoded.len() as u32).to_be_bytes());
1021-
buf.extend_from_slice(&encoded);
1022-
tonic::codegen::http::Response::builder()
1023-
.header("content-type", "application/grpc")
1024-
.header("grpc-status", "0")
1025-
.body(tonic::body::Body::new(http_body_util::Full::new(
1026-
bytes::Bytes::from(buf),
1027-
)))
1028-
.unwrap()
1029-
}
1030-
1031-
// Track poll count to distinguish first poll (returns empty) from re-polls (hang forever)
1032-
let poll_count = Arc::new(AtomicUsize::new(0));
1033-
let poll_count_clone = poll_count.clone();
1034-
// Signal from initiate_shutdown (via shutdown_worker RPC) to release the first poll
1035-
let shutdown_signal = Arc::new(Notify::new());
1036-
let shutdown_signal_for_rpc = shutdown_signal.clone();
1037-
let shutdown_signal_for_poll = shutdown_signal.clone();
1038-
1039-
let fs = fake_server(move |req| {
1040-
let uri = req.uri().to_string();
1041-
let poll_count = poll_count_clone.clone();
1042-
let shutdown_signal_for_poll = shutdown_signal_for_poll.clone();
1043-
let shutdown_signal_for_rpc = shutdown_signal_for_rpc.clone();
1044-
1045-
if uri.contains("DescribeNamespace") {
1046-
let resp = DescribeNamespaceResponse {
1047-
namespace_info: Some(NamespaceInfo {
1048-
capabilities: Some(Capabilities {
1049-
worker_poll_complete_on_shutdown: true,
1050-
..Capabilities::default()
1051-
}),
1052-
..NamespaceInfo::default()
1053-
}),
1054-
..DescribeNamespaceResponse::default()
1055-
};
1056-
async move { grpc_ok_proto(&resp) }.boxed()
1057-
} else if uri.contains("Poll") {
1058-
async move {
1059-
let n = poll_count.fetch_add(1, Ordering::SeqCst);
1060-
if n == 0 {
1061-
// First poll: wait for shutdown to start, then return empty.
1062-
// This simulates the poll timing out naturally right as shutdown begins.
1063-
shutdown_signal_for_poll.notified().await;
1064-
grpc_ok_empty()
1065-
} else {
1066-
// Re-poll after shutdown: hang forever. This is the race —
1067-
// the server already processed ShutdownWorker and won't flush this poll.
1068-
futures_util::future::pending().await
1069-
}
1070-
}
1071-
.boxed()
1072-
} else if uri.contains("ShutdownWorker") {
1073-
// ShutdownWorker arrives — signal the first poll to return (simulating the race
1074-
// where poll returns right as/after ShutdownWorker is processed).
1075-
async move {
1076-
shutdown_signal_for_rpc.notify_waiters();
1077-
grpc_ok_empty()
1078-
}
1079-
.boxed()
1080-
} else {
1081-
async { grpc_ok_empty() }.boxed()
1082-
}
1083-
})
1084-
.await;
1085-
1086-
let mut opts = get_integ_server_options();
1087-
opts.target = format!("http://localhost:{}", fs.addr.port())
1088-
.parse::<url::Url>()
1089-
.unwrap();
1090-
opts.set_skip_get_system_info(true);
1091-
let connection = Connection::connect(opts).await.unwrap();
1092-
let client_opts = temporalio_client::ClientOptions::new("ns").build();
1093-
let client = temporalio_client::Client::new(connection, client_opts).unwrap();
1094-
1095-
let wf_type = "graceful_shutdown_race";
1096-
let mut starter = CoreWfStarter::new_with_overrides(wf_type, None, Some(client));
1097-
let worker = starter.get_worker().await;
1098-
1099-
// Enable graceful poll shutdown via validate()
1100-
worker.validate().await.unwrap();
1101-
1102-
// Start polling BEFORE initiating shutdown so the poll is in-flight.
1103-
// poll_workflow_activation triggers LongPollBuffer to start, which spawns a poll task
1104-
// that hits the fake server and blocks on the first poll (waiting for shutdown_signal).
1105-
let poll_handle = tokio::spawn({
1106-
let w = worker.clone();
1107-
async move {
1108-
// This will block until shutdown causes PollError::ShutDown
1109-
let _ = w.poll_workflow_activation().await;
1110-
}
1111-
});
1112-
let act_handle = tokio::spawn({
1113-
let w = worker.clone();
1114-
async move {
1115-
let _ = w.poll_activity_task().await;
1116-
}
1117-
});
1118-
1119-
// Give polls time to reach the fake server before initiating shutdown
1120-
tokio::time::sleep(Duration::from_millis(500)).await;
1121-
1122-
// Shutdown should complete within 10s (5s TEMP_FIX + margin).
1123-
// Without the temp fix, the re-poll hangs for 60s.
1124-
let result = tokio::time::timeout(Duration::from_secs(10), async {
1125-
worker.shutdown().await;
1126-
})
1127-
.await;
1128-
1129-
assert!(
1130-
result.is_ok(),
1131-
"Shutdown should complete within 10s. If it hangs, the TEMP_FIX graceful poll \
1132-
timeout is not working and the server race (temporal#9545) caused a 60s hang."
1133-
);
1134-
1135-
let _ = poll_handle.await;
1136-
let _ = act_handle.await;
1137-
fs.shutdown().await;
1138-
}
1139-
1140983
#[test]
1141984
fn test_default_build_id() {
1142985
let o = WorkerOptions::new("task_queue").build();
@@ -1153,7 +996,7 @@ impl ShutdownTimerActivityLoopWf {
1153996
#[run]
1154997
async fn run(ctx: &mut WorkflowContext<Self>) -> WorkflowResult<()> {
1155998
loop {
1156-
ctx.timer(Duration::from_millis(100)).await;
999+
ctx.timer(Duration::from_millis(10)).await;
11571000
ctx.start_activity(
11581001
StdActivities::no_op,
11591002
(),

0 commit comments

Comments
 (0)