Skip to content

Commit 95db75d

Browse files
authored
Attach priority to core tasks (#893)
1 parent 97093c5 commit 95db75d

8 files changed

Lines changed: 36 additions & 5 deletions

File tree

client/src/lib.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1112,7 +1112,7 @@ pub struct WorkflowOptions {
11121112
/// The overall semantics of Priority are:
11131113
/// (more will be added here later)
11141114
/// 1. First, consider "priority_key": lower number goes first.
1115-
#[derive(Debug, Clone, Default)]
1115+
#[derive(Debug, Clone, Default, PartialEq, Eq)]
11161116
pub struct Priority {
11171117
/// Priority key is a positive integer from 1 to n, where smaller integers
11181118
/// correspond to higher priorities (tasks run sooner). In general, tasks in
@@ -1135,6 +1135,14 @@ impl From<Priority> for common::v1::Priority {
11351135
}
11361136
}
11371137

1138+
impl From<common::v1::Priority> for Priority {
1139+
fn from(priority: common::v1::Priority) -> Self {
1140+
Self {
1141+
priority_key: priority.priority_key as u32,
1142+
}
1143+
}
1144+
}
1145+
11381146
#[async_trait::async_trait]
11391147
impl<T> WorkflowClientTrait for T
11401148
where

core/src/worker/activities/local_activities.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,7 @@ impl LocalActivityManager {
525525
.and_then(|t| t.try_into().ok()),
526526
heartbeat_timeout: None,
527527
retry_policy: Some(sa.retry_policy),
528+
priority: Some(Default::default()),
528529
is_local: true,
529530
})),
530531
}))

sdk-core-protos/protos/local/temporal/sdk/core/activity_task/activity_task.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ message Start {
5757
// (or not) during activity scheduling as the service can override the provided one in case its
5858
// values are not specified or exceed configured system limits.
5959
temporal.api.common.v1.RetryPolicy retry_policy = 16;
60+
// Priority of this activity. Local activities will always have this field set to the default.
61+
temporal.api.common.v1.Priority priority = 18;
6062

6163
// Set to true if this is a local activity. Note that heartbeating does not apply to local
6264
// activities.

sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ message InitializeWorkflow {
195195
//
196196
// See field in WorkflowExecutionStarted for more detail.
197197
temporal.api.common.v1.WorkflowExecution root_workflow = 24;
198+
// Priority of this workflow execution
199+
temporal.api.common.v1.Priority priority = 25;
198200
}
199201

200202
// Notify a workflow that a timer has fired

sdk-core-protos/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,7 @@ pub mod coresdk {
726726
search_attributes: attrs.search_attributes,
727727
start_time: Some(start_time),
728728
root_workflow: attrs.root_workflow_execution,
729+
priority: attrs.priority,
729730
}
730731
}
731732
}
@@ -1265,6 +1266,7 @@ pub mod coresdk {
12651266
start_to_close_timeout: r.start_to_close_timeout,
12661267
heartbeat_timeout: r.heartbeat_timeout,
12671268
retry_policy: r.retry_policy,
1269+
priority: r.priority,
12681270
is_local: false,
12691271
},
12701272
)),

sdk/src/activity_context.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
sync::Arc,
77
time::{Duration as StdDuration, SystemTime},
88
};
9+
use temporal_client::Priority;
910
use temporal_sdk_core_api::Worker;
1011
use temporal_sdk_core_protos::{
1112
coresdk::{ActivityHeartbeat, activity_task},
@@ -48,6 +49,8 @@ pub struct ActivityInfo {
4849
pub current_attempt_scheduled_time: Option<SystemTime>,
4950
pub retry_policy: Option<RetryPolicy>,
5051
pub is_local: bool,
52+
/// Priority of this activity. If unset uses [Priority::default]
53+
pub priority: Priority,
5154
}
5255

5356
impl ActContext {
@@ -79,6 +82,7 @@ impl ActContext {
7982
heartbeat_timeout,
8083
retry_policy,
8184
is_local,
85+
priority,
8286
} = task;
8387
let deadline = calculate_deadline(
8488
scheduled_time.as_ref(),
@@ -113,6 +117,7 @@ impl ActContext {
113117
.try_into_or_none(),
114118
retry_policy,
115119
is_local,
120+
priority: priority.map(Into::into).unwrap_or_default(),
116121
},
117122
},
118123
first_arg,

sdk/src/workflow_context/options.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ pub struct ActivityOptions {
7070
pub summary: Option<String>,
7171
/// Priority for the activity
7272
pub priority: Option<Priority>,
73+
/// If true, disable eager execution for this activity
74+
pub do_not_eagerly_execute: bool,
7375
}
7476

7577
impl IntoWorkflowCommand for ActivityOptions {
@@ -98,6 +100,7 @@ impl IntoWorkflowCommand for ActivityOptions {
98100
arguments: vec![self.input],
99101
retry_policy: self.retry_policy,
100102
priority: self.priority.map(Into::into),
103+
do_not_eagerly_execute: self.do_not_eagerly_execute,
101104
..Default::default()
102105
}
103106
.into(),

tests/integ_tests/workflow_tests/priority.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use std::time::Duration;
22
use temporal_client::{Priority, WorkflowClientTrait, WorkflowOptions};
33
use temporal_sdk::{ActContext, ActivityOptions, ChildWorkflowOptions, WfContext};
44
use temporal_sdk_core_protos::{
5-
coresdk::AsJsonPayloadExt, temporal::api::history::v1::history_event::Attributes,
5+
coresdk::AsJsonPayloadExt,
6+
temporal::api::{common, history::v1::history_event::Attributes},
67
};
78
use temporal_sdk_core_test_utils::CoreWfStarter;
89

@@ -34,16 +35,23 @@ async fn priority_values_sent_to_server() {
3435
input: "hello".as_json_payload().unwrap(),
3536
start_to_close_timeout: Some(Duration::from_secs(5)),
3637
priority: Some(Priority { priority_key: 5 }),
38+
// Currently no priority info attached to eagerly run activities
39+
do_not_eagerly_execute: true,
3740
..Default::default()
3841
});
3942
started.result().await;
40-
activity.await;
43+
activity.await.unwrap_ok_payload();
4144
Ok(().into())
4245
});
43-
worker.register_wf(child_type.to_owned(), |_ctx: WfContext| async move {
46+
worker.register_wf(child_type.to_owned(), |ctx: WfContext| async move {
47+
assert_eq!(
48+
ctx.workflow_initial_info().priority,
49+
Some(common::v1::Priority { priority_key: 4 })
50+
);
4451
Ok(().into())
4552
});
46-
worker.register_activity("echo", |_ctx: ActContext, echo_me: String| async move {
53+
worker.register_activity("echo", |ctx: ActContext, echo_me: String| async move {
54+
assert_eq!(ctx.get_info().priority, Priority { priority_key: 5 });
4755
Ok(echo_me)
4856
});
4957

0 commit comments

Comments
 (0)