Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ pub struct WorkflowOptions {
/// The overall semantics of Priority are:
/// (more will be added here later)
/// 1. First, consider "priority_key": lower number goes first.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Priority {
/// Priority key is a positive integer from 1 to n, where smaller integers
/// correspond to higher priorities (tasks run sooner). In general, tasks in
Expand All @@ -1135,6 +1135,14 @@ impl From<Priority> for common::v1::Priority {
}
}

impl From<common::v1::Priority> for Priority {
fn from(priority: common::v1::Priority) -> Self {
Self {
priority_key: priority.priority_key as u32,
}
}
}

#[async_trait::async_trait]
impl<T> WorkflowClientTrait for T
where
Expand Down
1 change: 1 addition & 0 deletions core/src/worker/activities/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ impl LocalActivityManager {
.and_then(|t| t.try_into().ok()),
heartbeat_timeout: None,
retry_policy: Some(sa.retry_policy),
priority: Some(Default::default()),
is_local: true,
})),
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ message Start {
// (or not) during activity scheduling as the service can override the provided one in case its
// values are not specified or exceed configured system limits.
temporal.api.common.v1.RetryPolicy retry_policy = 16;
// Priority of this activity. Local activities will always have this field set to the default.
temporal.api.common.v1.Priority priority = 18;

// Set to true if this is a local activity. Note that heartbeating does not apply to local
// activities.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ message InitializeWorkflow {
//
// See field in WorkflowExecutionStarted for more detail.
temporal.api.common.v1.WorkflowExecution root_workflow = 24;
// Priority of this workflow execution
temporal.api.common.v1.Priority priority = 25;
}

// Notify a workflow that a timer has fired
Expand Down
2 changes: 2 additions & 0 deletions sdk-core-protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ pub mod coresdk {
search_attributes: attrs.search_attributes,
start_time: Some(start_time),
root_workflow: attrs.root_workflow_execution,
priority: attrs.priority,
}
}
}
Expand Down Expand Up @@ -1265,6 +1266,7 @@ pub mod coresdk {
start_to_close_timeout: r.start_to_close_timeout,
heartbeat_timeout: r.heartbeat_timeout,
retry_policy: r.retry_policy,
priority: r.priority,
is_local: false,
},
)),
Expand Down
5 changes: 5 additions & 0 deletions sdk/src/activity_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
sync::Arc,
time::{Duration as StdDuration, SystemTime},
};
use temporal_client::Priority;
use temporal_sdk_core_api::Worker;
use temporal_sdk_core_protos::{
coresdk::{ActivityHeartbeat, activity_task},
Expand Down Expand Up @@ -48,6 +49,8 @@ pub struct ActivityInfo {
pub current_attempt_scheduled_time: Option<SystemTime>,
pub retry_policy: Option<RetryPolicy>,
pub is_local: bool,
/// Priority of this activity. If unset uses [Priority::default]
pub priority: Priority,
}

impl ActContext {
Expand Down Expand Up @@ -79,6 +82,7 @@ impl ActContext {
heartbeat_timeout,
retry_policy,
is_local,
priority,
} = task;
let deadline = calculate_deadline(
scheduled_time.as_ref(),
Expand Down Expand Up @@ -113,6 +117,7 @@ impl ActContext {
.try_into_or_none(),
retry_policy,
is_local,
priority: priority.map(Into::into).unwrap_or_default(),
},
},
first_arg,
Expand Down
3 changes: 3 additions & 0 deletions sdk/src/workflow_context/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub struct ActivityOptions {
pub summary: Option<String>,
/// Priority for the activity
pub priority: Option<Priority>,
/// If true, disable eager execution for this activity
pub do_not_eagerly_execute: bool,
}

impl IntoWorkflowCommand for ActivityOptions {
Expand Down Expand Up @@ -98,6 +100,7 @@ impl IntoWorkflowCommand for ActivityOptions {
arguments: vec![self.input],
retry_policy: self.retry_policy,
priority: self.priority.map(Into::into),
do_not_eagerly_execute: self.do_not_eagerly_execute,
..Default::default()
}
.into(),
Expand Down
16 changes: 12 additions & 4 deletions tests/integ_tests/workflow_tests/priority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::time::Duration;
use temporal_client::{Priority, WorkflowClientTrait, WorkflowOptions};
use temporal_sdk::{ActContext, ActivityOptions, ChildWorkflowOptions, WfContext};
use temporal_sdk_core_protos::{
coresdk::AsJsonPayloadExt, temporal::api::history::v1::history_event::Attributes,
coresdk::AsJsonPayloadExt,
temporal::api::{common, history::v1::history_event::Attributes},
};
use temporal_sdk_core_test_utils::CoreWfStarter;

Expand Down Expand Up @@ -34,16 +35,23 @@ async fn priority_values_sent_to_server() {
input: "hello".as_json_payload().unwrap(),
start_to_close_timeout: Some(Duration::from_secs(5)),
priority: Some(Priority { priority_key: 5 }),
// Currently no priority info attached to eagerly run activities
do_not_eagerly_execute: true,
..Default::default()
});
started.result().await;
activity.await;
activity.await.unwrap_ok_payload();
Ok(().into())
});
worker.register_wf(child_type.to_owned(), |_ctx: WfContext| async move {
worker.register_wf(child_type.to_owned(), |ctx: WfContext| async move {
assert_eq!(
ctx.workflow_initial_info().priority,
Some(common::v1::Priority { priority_key: 4 })
);
Ok(().into())
});
worker.register_activity("echo", |_ctx: ActContext, echo_me: String| async move {
worker.register_activity("echo", |ctx: ActContext, echo_me: String| async move {
assert_eq!(ctx.get_info().priority, Priority { priority_key: 5 });
Ok(echo_me)
});

Expand Down
Loading