Skip to content

Commit 2c88067

Browse files
committed
Rework guard to detect when dropped without notifying. Add to log collection to tests to ensure that the message fires.
1 parent 1586cf7 commit 2c88067

2 files changed

Lines changed: 36 additions & 11 deletions

File tree

crates/sdk-core/src/core_tests/workers.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ use temporalio_common::{
6262
},
6363
},
6464
},
65+
telemetry::{CoreTelemetry, Logger, TelemetryOptions, telemetry_init},
6566
worker::WorkerTaskTypes,
6667
};
6768
use tokio::sync::{Barrier, Notify, watch};
@@ -1334,6 +1335,16 @@ async fn graceful_shutdown_sends_shutdown_worker_rpc_during_initiate() {
13341335
/// is able to trigger PollShutdown.
13351336
#[tokio::test]
13361337
async fn nexus_shutdown_does_not_hang_when_pending_completion_is_cancelled() {
1338+
let telemetry = telemetry_init(
1339+
TelemetryOptions::builder()
1340+
.logging(Logger::Forward {
1341+
filter: "OFF,temporalio_sdk_core::worker::nexus=ERROR".to_string(),
1342+
})
1343+
.build(),
1344+
)
1345+
.unwrap();
1346+
let _guard = tracing::subscriber::set_default(telemetry.trace_subscriber().unwrap().clone());
1347+
13371348
let mut client = mock_manual_worker_client();
13381349
let completion_rpc_started = Arc::new(Barrier::new(2));
13391350
let completion_rpc_started_clone = completion_rpc_started.clone();
@@ -1398,6 +1409,18 @@ async fn nexus_shutdown_does_not_hang_when_pending_completion_is_cancelled() {
13981409
}
13991410
drop(completion);
14001411

1412+
let logs = telemetry.fetch_buffered_logs();
1413+
assert!(
1414+
logs.iter().any(|log| {
1415+
log.level == tracing::Level::ERROR
1416+
&& log.target == "temporalio_sdk_core::worker::nexus"
1417+
&& log
1418+
.message
1419+
.starts_with("TaskCompletedGuard triggered notify on drop")
1420+
}),
1421+
"expected TaskCompletedGuard error log, got {logs:#?}"
1422+
);
1423+
14011424
// Polling again should now return PollError::ShutDown because the outstanding task map is empty
14021425
// and waiters should have been notified.
14031426
let poll_result = tokio::time::timeout(Duration::from_secs(1), poll_future.as_mut())

crates/sdk-core/src/worker/nexus.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,7 @@ impl NexusManager {
120120
) -> Result<(), CompleteNexusError> {
121121
let removed = self.outstanding_task_map.lock().remove(&tt);
122122
if let Some(task_info) = removed {
123-
let task_completed_notify =
124-
TaskCompletedNotify::new(self.task_completed_notify.clone());
123+
let task_completed_notify = TaskCompletedGuard::new(self.task_completed_notify.clone());
125124
self.metrics
126125
.nexus_task_execution_latency(task_info.start_time.elapsed());
127126
task_info.timeout_task.inspect(|jh| jh.abort());
@@ -331,31 +330,34 @@ impl NexusManager {
331330
/// TaskCompleteNotify is used to ensure that waiters are notified when a task
332331
/// is removed from the outstanding task map even in the event that the running
333332
/// future is dropped.
334-
struct TaskCompletedNotify {
333+
struct TaskCompletedGuard {
335334
inner: Option<Arc<Notify>>,
336335
}
337336

338-
impl TaskCompletedNotify {
337+
impl TaskCompletedGuard {
339338
fn new(notify: Arc<Notify>) -> Self {
340339
Self {
341340
inner: Some(notify),
342341
}
343342
}
344343

345-
fn notify_inner(&mut self) {
344+
fn notify_waiters(mut self) {
346345
if let Some(notify) = self.inner.take() {
347346
notify.notify_waiters();
348347
}
349348
}
350-
351-
fn notify_waiters(mut self) {
352-
self.notify_inner();
353-
}
354349
}
355350

356-
impl Drop for TaskCompletedNotify {
351+
impl Drop for TaskCompletedGuard {
357352
fn drop(&mut self) {
358-
self.notify_inner();
353+
if let Some(notify) = self.inner.take() {
354+
error!(
355+
"TaskCompletedGuard triggered notify on drop. This indicates that the caller has \
356+
dropped the future for `complete_task`. \
357+
This should not happen. Please file a bug report."
358+
);
359+
notify.notify_waiters();
360+
}
359361
}
360362
}
361363

0 commit comments

Comments
 (0)