Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions crates/sdk-core/src/worker/workflow/managed_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,17 @@ impl ManagedRun {
debug!("Marking WFT completed");
let retme = self.wft.take();

// Only record latency metrics if we genuinely reported to server
if let Some(ot) = &retme
&& let Some(ct) = report_status.completion_time()
{
self.metrics.wf_task_latency(ct.sub(ot.start_time));
}

if let WFTReportStatus::Reported {
reset_last_started_to,
completion_time,
..
} = report_status
{
if let Some(ot) = &retme {
self.metrics
.wf_task_latency(completion_time.sub(ot.start_time));
}
if let Some(id) = reset_last_started_to {
self.wfm.machines.reset_last_started_id(id);
}
Expand Down
21 changes: 18 additions & 3 deletions crates/sdk-core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,9 @@ impl Workflows {
self.handle_activation_failed(run_id, completion_time, outcome)
.await
}
ActivationCompleteOutcome::WFTFailedDontReport => WFTReportStatus::DropWft,
ActivationCompleteOutcome::WFTFailedDontReport => {
WFTReportStatus::DropWft { completion_time }
}
ActivationCompleteOutcome::DoNothing => WFTReportStatus::NotReported,
}
}
Expand Down Expand Up @@ -1197,8 +1199,21 @@ enum WFTReportStatus {
/// work to be done. EX: Running LAs.
NotReported,
/// We didn't report, but we want to clear the outstanding workflow task anyway. See
/// [ActivationCompleteOutcome::WFTFailedDontReport]
DropWft,
/// [ActivationCompleteOutcome::WFTFailedDontReport].
DropWft { completion_time: Instant },
}
impl WFTReportStatus {
fn completion_time(&self) -> Option<Instant> {
match self {
WFTReportStatus::Reported {
completion_time, ..
}
| WFTReportStatus::DropWft {
completion_time, ..
} => Some(*completion_time),
WFTReportStatus::NotReported => None,
}
}
}
#[derive(Debug, Default)]
struct BufferedTasks {
Expand Down
74 changes: 74 additions & 0 deletions crates/sdk-core/tests/integ_tests/metrics_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1680,3 +1680,77 @@ async fn terminal_metric_not_recorded_on_rejected_completion() {
),
}
}

#[tokio::test]
async fn wf_task_latency_recorded_on_dropped_wft() {
let (telemopts, addr, _aborter) = prom_metrics(None);
let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap();
let meter = rt.telemetry().get_temporal_metric_meter().unwrap();

let mut t = TestHistoryBuilder::default();
t.add_by_type(
temporalio_common::protos::temporal::api::enums::v1::EventType::WorkflowExecutionStarted,
);
t.add_workflow_task_scheduled_and_started();

let mut mh = MockPollCfg::from_resp_batches(
"fake_wf_id",
t,
[ResponseType::AllHistory, ResponseType::AllHistory],
mock_worker_client(),
);
mh.num_expected_fails = 1;
mh.num_expected_completions = Some(0.into());

let mut mock = build_mock_pollers(mh);
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
mock.set_temporal_meter(meter);
let core = mock_worker(mock);

// First poll (attempt=1): fail the activation — this is reported to the server
let act = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::fail(
act.run_id,
"test failure".into(),
None,
))
.await
.unwrap();
core.handle_eviction().await;

// Second poll (attempt=2): fail again — this goes through WFTFailedDontReport/DropWft
let act = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::fail(
act.run_id,
"test failure".into(),
None,
))
.await
.unwrap();

core.drain_pollers_and_shutdown().await;

// Both WFT processing attempts should have recorded latency, even the dropped one
eventually(
|| {
let endpoint = format!("http://{addr}/metrics");
async move {
let body = get_text(endpoint).await;
let line = body
.lines()
.find(|l| l.starts_with("temporal_workflow_task_execution_latency_count{"))
.ok_or_else(|| anyhow!("wf_task_execution_latency metric not found"))?
.to_string();
if !line.ends_with(" 2") {
bail!(
"Expected latency count of 2 (both reported and dropped WFT), got: {line}"
);
}
Ok(())
}
},
Duration::from_secs(5),
)
.await
.unwrap();
}
Loading