diff --git a/client/src/lib.rs b/client/src/lib.rs index c4d969904..695ac4a81 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1112,7 +1112,7 @@ pub struct WorkflowOptions { /// The overall semantics of Priority are: /// (more will be added here later) /// 1. First, consider "priority_key": lower number goes first. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct Priority { /// Priority key is a positive integer from 1 to n, where smaller integers /// correspond to higher priorities (tasks run sooner). In general, tasks in @@ -1135,6 +1135,14 @@ impl From for common::v1::Priority { } } +impl From for Priority { + fn from(priority: common::v1::Priority) -> Self { + Self { + priority_key: priority.priority_key as u32, + } + } +} + #[async_trait::async_trait] impl WorkflowClientTrait for T where diff --git a/core/src/worker/activities/local_activities.rs b/core/src/worker/activities/local_activities.rs index a2a38c89e..68a8fde28 100644 --- a/core/src/worker/activities/local_activities.rs +++ b/core/src/worker/activities/local_activities.rs @@ -525,6 +525,7 @@ impl LocalActivityManager { .and_then(|t| t.try_into().ok()), heartbeat_timeout: None, retry_policy: Some(sa.retry_policy), + priority: Some(Default::default()), is_local: true, })), })) diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/activity_task/activity_task.proto b/sdk-core-protos/protos/local/temporal/sdk/core/activity_task/activity_task.proto index cac2a8e94..88b955ae2 100644 --- a/sdk-core-protos/protos/local/temporal/sdk/core/activity_task/activity_task.proto +++ b/sdk-core-protos/protos/local/temporal/sdk/core/activity_task/activity_task.proto @@ -57,6 +57,8 @@ message Start { // (or not) during activity scheduling as the service can override the provided one in case its // values are not specified or exceed configured system limits. temporal.api.common.v1.RetryPolicy retry_policy = 16; + // Priority of this activity. Local activities will always have this field set to the default. + temporal.api.common.v1.Priority priority = 18; // Set to true if this is a local activity. Note that heartbeating does not apply to local // activities. diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto index bfa2799ef..fd2f4fd46 100644 --- a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto +++ b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto @@ -195,6 +195,8 @@ message InitializeWorkflow { // // See field in WorkflowExecutionStarted for more detail. temporal.api.common.v1.WorkflowExecution root_workflow = 24; + // Priority of this workflow execution + temporal.api.common.v1.Priority priority = 25; } // Notify a workflow that a timer has fired diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index ba6d51270..c3e7b5446 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -726,6 +726,7 @@ pub mod coresdk { search_attributes: attrs.search_attributes, start_time: Some(start_time), root_workflow: attrs.root_workflow_execution, + priority: attrs.priority, } } } @@ -1265,6 +1266,7 @@ pub mod coresdk { start_to_close_timeout: r.start_to_close_timeout, heartbeat_timeout: r.heartbeat_timeout, retry_policy: r.retry_policy, + priority: r.priority, is_local: false, }, )), diff --git a/sdk/src/activity_context.rs b/sdk/src/activity_context.rs index 018c9a69c..d71501a29 100644 --- a/sdk/src/activity_context.rs +++ b/sdk/src/activity_context.rs @@ -6,6 +6,7 @@ use std::{ sync::Arc, time::{Duration as StdDuration, SystemTime}, }; +use temporal_client::Priority; use temporal_sdk_core_api::Worker; use temporal_sdk_core_protos::{ coresdk::{ActivityHeartbeat, activity_task}, @@ -48,6 +49,8 @@ pub struct ActivityInfo { pub current_attempt_scheduled_time: Option, pub retry_policy: Option, pub is_local: bool, + /// Priority of this activity. If unset uses [Priority::default] + pub priority: Priority, } impl ActContext { @@ -79,6 +82,7 @@ impl ActContext { heartbeat_timeout, retry_policy, is_local, + priority, } = task; let deadline = calculate_deadline( scheduled_time.as_ref(), @@ -113,6 +117,7 @@ impl ActContext { .try_into_or_none(), retry_policy, is_local, + priority: priority.map(Into::into).unwrap_or_default(), }, }, first_arg, diff --git a/sdk/src/workflow_context/options.rs b/sdk/src/workflow_context/options.rs index 1b932011f..24cca72b9 100644 --- a/sdk/src/workflow_context/options.rs +++ b/sdk/src/workflow_context/options.rs @@ -70,6 +70,8 @@ pub struct ActivityOptions { pub summary: Option, /// Priority for the activity pub priority: Option, + /// If true, disable eager execution for this activity + pub do_not_eagerly_execute: bool, } impl IntoWorkflowCommand for ActivityOptions { @@ -98,6 +100,7 @@ impl IntoWorkflowCommand for ActivityOptions { arguments: vec![self.input], retry_policy: self.retry_policy, priority: self.priority.map(Into::into), + do_not_eagerly_execute: self.do_not_eagerly_execute, ..Default::default() } .into(), diff --git a/tests/integ_tests/workflow_tests/priority.rs b/tests/integ_tests/workflow_tests/priority.rs index 9db3eb227..a46ce24bc 100644 --- a/tests/integ_tests/workflow_tests/priority.rs +++ b/tests/integ_tests/workflow_tests/priority.rs @@ -2,7 +2,8 @@ use std::time::Duration; use temporal_client::{Priority, WorkflowClientTrait, WorkflowOptions}; use temporal_sdk::{ActContext, ActivityOptions, ChildWorkflowOptions, WfContext}; use temporal_sdk_core_protos::{ - coresdk::AsJsonPayloadExt, temporal::api::history::v1::history_event::Attributes, + coresdk::AsJsonPayloadExt, + temporal::api::{common, history::v1::history_event::Attributes}, }; use temporal_sdk_core_test_utils::CoreWfStarter; @@ -34,16 +35,23 @@ async fn priority_values_sent_to_server() { input: "hello".as_json_payload().unwrap(), start_to_close_timeout: Some(Duration::from_secs(5)), priority: Some(Priority { priority_key: 5 }), + // Currently no priority info attached to eagerly run activities + do_not_eagerly_execute: true, ..Default::default() }); started.result().await; - activity.await; + activity.await.unwrap_ok_payload(); Ok(().into()) }); - worker.register_wf(child_type.to_owned(), |_ctx: WfContext| async move { + worker.register_wf(child_type.to_owned(), |ctx: WfContext| async move { + assert_eq!( + ctx.workflow_initial_info().priority, + Some(common::v1::Priority { priority_key: 4 }) + ); Ok(().into()) }); - worker.register_activity("echo", |_ctx: ActContext, echo_me: String| async move { + worker.register_activity("echo", |ctx: ActContext, echo_me: String| async move { + assert_eq!(ctx.get_info().priority, Priority { priority_key: 5 }); Ok(echo_me) });