Skip to content

Commit 24703da

Browse files
authored
Add standalone activity run id to bridge (#1189)
1 parent 5e6b836 commit 24703da

5 files changed

Lines changed: 54 additions & 1 deletion

File tree

crates/common/protos/local/temporal/sdk/core/activity_task/activity_task.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ message Start {
6363
// Set to true if this is a local activity. Note that heartbeating does not apply to local
6464
// activities.
6565
bool is_local = 17;
66+
// Run ID of this activity execution. Only set for standalone activities.
67+
string run_id = 19;
6668
}
6769

6870
// Attempt to cancel a running activity

crates/common/src/protos/mod.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1338,6 +1338,7 @@ pub mod coresdk {
13381338
retry_policy: r.retry_policy.map(fix_retry_policy),
13391339
priority: r.priority,
13401340
is_local: false,
1341+
run_id: r.activity_run_id,
13411342
},
13421343
)),
13431344
}
@@ -2931,9 +2932,52 @@ pub fn proto_ts_to_system_time(ts: &prost_types::Timestamp) -> Option<std::time:
29312932

29322933
#[cfg(test)]
29332934
mod tests {
2935+
use crate::protos::coresdk::activity_task;
2936+
use crate::protos::coresdk::activity_task::ActivityTask;
29342937
use crate::protos::temporal::api::failure::v1::Failure;
2938+
use crate::protos::temporal::api::workflowservice::v1::PollActivityTaskQueueResponse;
29352939
use anyhow::anyhow;
29362940

2941+
#[test]
2942+
fn start_from_poll_resp_standalone_activity_populates_run_id() {
2943+
let resp = PollActivityTaskQueueResponse {
2944+
task_token: vec![1, 2, 3],
2945+
activity_run_id: "test-run-id-123".to_string(),
2946+
activity_id: "my-activity".to_string(),
2947+
..Default::default()
2948+
};
2949+
let task = ActivityTask::start_from_poll_resp(resp);
2950+
let start = match task.variant {
2951+
Some(activity_task::activity_task::Variant::Start(s)) => s,
2952+
_ => panic!("expected Start variant"),
2953+
};
2954+
assert_eq!(start.run_id, "test-run-id-123");
2955+
assert!(!start.is_local);
2956+
}
2957+
2958+
#[test]
2959+
fn start_from_poll_resp_workflow_activity_has_empty_run_id() {
2960+
use crate::protos::temporal::api::common::v1::WorkflowExecution;
2961+
let resp = PollActivityTaskQueueResponse {
2962+
task_token: vec![4, 5, 6],
2963+
activity_id: "my-workflow-activity".to_string(),
2964+
workflow_execution: Some(WorkflowExecution {
2965+
workflow_id: "wf-123".to_string(),
2966+
run_id: "wf-run-456".to_string(),
2967+
}),
2968+
// activity_run_id intentionally absent — this is a workflow-scheduled activity
2969+
..Default::default()
2970+
};
2971+
let task = ActivityTask::start_from_poll_resp(resp);
2972+
let start = match task.variant {
2973+
Some(activity_task::activity_task::Variant::Start(s)) => s,
2974+
_ => panic!("expected Start variant"),
2975+
};
2976+
assert!(start.run_id.is_empty());
2977+
// workflow_execution is preserved and distinct from run_id
2978+
assert_eq!(start.workflow_execution.unwrap().run_id, "wf-run-456");
2979+
}
2980+
29372981
#[test]
29382982
fn anyhow_to_failure_conversion() {
29392983
let no_causes: Failure = anyhow!("no causes").into();

crates/sdk-core/src/worker/activities/local_activities.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,7 @@ impl LocalActivityManager {
527527
retry_policy: Some(sa.retry_policy.into()),
528528
priority: Some(Default::default()),
529529
is_local: true,
530+
run_id: String::new(),
530531
})),
531532
}))
532533
}

crates/sdk-core/tests/integ_tests/workflow_tests/activities.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,9 @@ async fn activity_workflow() {
179179
assert_matches!(
180180
task.variant,
181181
Some(act_task::Variant::Start(start_activity)) => {
182-
assert_eq!(start_activity.activity_type, DEFAULT_ACTIVITY_TYPE.to_string())
182+
assert_eq!(start_activity.activity_type, DEFAULT_ACTIVITY_TYPE.to_string());
183+
// Workflow-scheduled activities have no activity run ID
184+
assert!(start_activity.run_id.is_empty());
183185
}
184186
);
185187
let response_payload = Payload {

crates/sdk/src/activities.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ impl ActivityContext {
108108
retry_policy,
109109
is_local,
110110
priority,
111+
run_id,
111112
} = task;
112113
let deadline = calculate_deadline(
113114
scheduled_time.as_ref(),
@@ -140,6 +141,7 @@ impl ActivityContext {
140141
retry_policy,
141142
is_local,
142143
priority: priority.map(Into::into).unwrap_or_default(),
144+
run_id: (!run_id.is_empty()).then_some(run_id),
143145
},
144146
},
145147
input,
@@ -220,6 +222,8 @@ pub struct ActivityInfo {
220222
pub is_local: bool,
221223
/// Priority of this activity. If unset uses [Priority::default].
222224
pub priority: Priority,
225+
/// Run ID of this activity execution. Only set for standalone activities.
226+
pub run_id: Option<String>,
223227
}
224228

225229
/// Returned as errors from activity functions.

0 commit comments

Comments
 (0)