diff --git a/crates/sdk-core/src/worker/workflow/managed_run.rs b/crates/sdk-core/src/worker/workflow/managed_run.rs index d42dcd693..fb7939168 100644 --- a/crates/sdk-core/src/worker/workflow/managed_run.rs +++ b/crates/sdk-core/src/worker/workflow/managed_run.rs @@ -296,16 +296,17 @@ impl ManagedRun { debug!("Marking WFT completed"); let retme = self.wft.take(); - // Only record latency metrics if we genuinely reported to server + if let Some(ot) = &retme + && let Some(ct) = report_status.completion_time() + { + self.metrics.wf_task_latency(ct.sub(ot.start_time)); + } + if let WFTReportStatus::Reported { reset_last_started_to, - completion_time, + .. } = report_status { - if let Some(ot) = &retme { - self.metrics - .wf_task_latency(completion_time.sub(ot.start_time)); - } if let Some(id) = reset_last_started_to { self.wfm.machines.reset_last_started_id(id); } diff --git a/crates/sdk-core/src/worker/workflow/mod.rs b/crates/sdk-core/src/worker/workflow/mod.rs index 4c3ab10dc..3d8f36153 100644 --- a/crates/sdk-core/src/worker/workflow/mod.rs +++ b/crates/sdk-core/src/worker/workflow/mod.rs @@ -528,7 +528,9 @@ impl Workflows { self.handle_activation_failed(run_id, completion_time, outcome) .await } - ActivationCompleteOutcome::WFTFailedDontReport => WFTReportStatus::DropWft, + ActivationCompleteOutcome::WFTFailedDontReport => { + WFTReportStatus::DropWft { completion_time } + } ActivationCompleteOutcome::DoNothing => WFTReportStatus::NotReported, } } @@ -1197,8 +1199,21 @@ enum WFTReportStatus { /// work to be done. EX: Running LAs. NotReported, /// We didn't report, but we want to clear the outstanding workflow task anyway. See - /// [ActivationCompleteOutcome::WFTFailedDontReport] - DropWft, + /// [ActivationCompleteOutcome::WFTFailedDontReport]. + DropWft { completion_time: Instant }, +} +impl WFTReportStatus { + fn completion_time(&self) -> Option { + match self { + WFTReportStatus::Reported { + completion_time, .. + } + | WFTReportStatus::DropWft { + completion_time, .. + } => Some(*completion_time), + WFTReportStatus::NotReported => None, + } + } } #[derive(Debug, Default)] struct BufferedTasks { diff --git a/crates/sdk-core/tests/integ_tests/metrics_tests.rs b/crates/sdk-core/tests/integ_tests/metrics_tests.rs index b02e601dc..139bff508 100644 --- a/crates/sdk-core/tests/integ_tests/metrics_tests.rs +++ b/crates/sdk-core/tests/integ_tests/metrics_tests.rs @@ -1680,3 +1680,77 @@ async fn terminal_metric_not_recorded_on_rejected_completion() { ), } } + +#[tokio::test] +async fn wf_task_latency_recorded_on_dropped_wft() { + let (telemopts, addr, _aborter) = prom_metrics(None); + let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap(); + let meter = rt.telemetry().get_temporal_metric_meter().unwrap(); + + let mut t = TestHistoryBuilder::default(); + t.add_by_type( + temporalio_common::protos::temporal::api::enums::v1::EventType::WorkflowExecutionStarted, + ); + t.add_workflow_task_scheduled_and_started(); + + let mut mh = MockPollCfg::from_resp_batches( + "fake_wf_id", + t, + [ResponseType::AllHistory, ResponseType::AllHistory], + mock_worker_client(), + ); + mh.num_expected_fails = 1; + mh.num_expected_completions = Some(0.into()); + + let mut mock = build_mock_pollers(mh); + mock.worker_cfg(|wc| wc.max_cached_workflows = 1); + mock.set_temporal_meter(meter); + let core = mock_worker(mock); + + // First poll (attempt=1): fail the activation — this is reported to the server + let act = core.poll_workflow_activation().await.unwrap(); + core.complete_workflow_activation(WorkflowActivationCompletion::fail( + act.run_id, + "test failure".into(), + None, + )) + .await + .unwrap(); + core.handle_eviction().await; + + // Second poll (attempt=2): fail again — this goes through WFTFailedDontReport/DropWft + let act = core.poll_workflow_activation().await.unwrap(); + core.complete_workflow_activation(WorkflowActivationCompletion::fail( + act.run_id, + "test failure".into(), + None, + )) + .await + .unwrap(); + + core.drain_pollers_and_shutdown().await; + + // Both WFT processing attempts should have recorded latency, even the dropped one + eventually( + || { + let endpoint = format!("http://{addr}/metrics"); + async move { + let body = get_text(endpoint).await; + let line = body + .lines() + .find(|l| l.starts_with("temporal_workflow_task_execution_latency_count{")) + .ok_or_else(|| anyhow!("wf_task_execution_latency metric not found"))? + .to_string(); + if !line.ends_with(" 2") { + bail!( + "Expected latency count of 2 (both reported and dropped WFT), got: {line}" + ); + } + Ok(()) + } + }, + Duration::from_secs(5), + ) + .await + .unwrap(); +}