diff --git a/core-api/src/worker.rs b/core-api/src/worker.rs index 0be577b6b..6888fb7a4 100644 --- a/core-api/src/worker.rs +++ b/core-api/src/worker.rs @@ -5,8 +5,9 @@ use std::{ sync::Arc, time::Duration, }; -use temporal_sdk_core_protos::coresdk::{ - ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo, WorkflowSlotInfo, +use temporal_sdk_core_protos::{ + coresdk::{ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo, WorkflowSlotInfo}, + temporal::api::enums::v1::VersioningBehavior, }; /// Defines per-worker configuration options @@ -19,9 +20,6 @@ pub struct WorkerConfig { /// What task queue will this worker poll from? This task queue name will be used for both /// workflow and activity polling. pub task_queue: String, - /// A string that should be unique to the set of code this worker uses. IE: All the workflow, - /// activity, interceptor, and data converter code. - pub worker_build_id: String, /// A human-readable string that can identify this worker. Using something like sdk version /// and host name is a good default. If set, overrides the identity set (if any) on the client /// used by this worker. @@ -96,13 +94,6 @@ pub struct WorkerConfig { #[builder(default)] pub max_worker_activities_per_second: Option, - /// # UNDER DEVELOPMENT - /// If set to true this worker will opt-in to the whole-worker versioning feature. - /// `worker_build_id` will be used as the version. - /// todo: link to feature docs - #[builder(default = "false")] - pub use_worker_versioning: bool, - /// If set false (default), shutdown will not finish until all pending evictions have been /// issued and replied to. If set true shutdown will be considered complete when the only /// remaining work is pending evictions. @@ -163,6 +154,9 @@ pub struct WorkerConfig { /// Mutually exclusive with `tuner` #[builder(setter(into, strip_option), default)] pub max_outstanding_nexus_tasks: Option, + + /// A versioning strategy for this worker. + pub versioning_strategy: WorkerVersioningStrategy, } impl WorkerConfig { @@ -180,6 +174,10 @@ impl WorkerConfig { .map(|s| s.contains(error_type)) .unwrap_or(false) } + + pub fn build_id(&self) -> &str { + self.versioning_strategy.build_id() + } } impl WorkerConfigBuilder { @@ -218,18 +216,31 @@ impl WorkerConfigBuilder { return Err("max_outstanding_* fields are mutually exclusive with `tuner`".to_owned()); } - if self.use_worker_versioning.unwrap_or_default() - && self - .worker_build_id - .as_ref() - .map(|s| s.is_empty()) - .unwrap_or_default() - { - return Err( - "`worker_build_id` must be non-empty when `use_worker_versioning` is true" - .to_owned(), - ); + if let Some(wv) = self.versioning_strategy.as_ref() { + match wv { + WorkerVersioningStrategy::None { .. } => {} + WorkerVersioningStrategy::WorkerDeploymentBased(d) => { + if d.use_worker_versioning + && (d.version.build_id.is_empty() || d.version.deployment_name.is_empty()) + { + return Err( + "WorkerDeploymentVersion must have a non-empty build_id and \ + deployment_name when deployment-based versioning is enabled" + .to_owned(), + ); + } + } + WorkerVersioningStrategy::LegacyBuildIdBased { build_id } => { + if build_id.is_empty() { + return Err( + "Legacy build id-based versioning must have a non-empty build_id" + .to_owned(), + ); + } + } + } } + Ok(()) } } @@ -491,3 +502,70 @@ impl PollerBehavior { Ok(()) } } + +#[derive(Clone, Debug)] +pub enum WorkerVersioningStrategy { + /// Don't enable any versioning + None { + /// Build ID may still be passed as a way to identify the worker, or may be left empty. + build_id: String, + }, + /// Maybe use the modern deployment-based versioning, or just pass a deployment version. + WorkerDeploymentBased(WorkerDeploymentOptions), + /// Use the legacy build-id-based whole worker versioning. + LegacyBuildIdBased { + /// A Build ID to use, must be non-empty. + build_id: String, + }, +} + +impl Default for WorkerVersioningStrategy { + fn default() -> Self { + WorkerVersioningStrategy::None { + build_id: String::new(), + } + } +} + +impl WorkerVersioningStrategy { + pub fn build_id(&self) -> &str { + match self { + WorkerVersioningStrategy::None { build_id } => build_id, + WorkerVersioningStrategy::WorkerDeploymentBased(opts) => &opts.version.build_id, + WorkerVersioningStrategy::LegacyBuildIdBased { build_id } => build_id, + } + } + + pub fn uses_build_id_based(&self) -> bool { + matches!(self, WorkerVersioningStrategy::LegacyBuildIdBased { .. }) + } + + pub fn default_versioning_behavior(&self) -> Option { + match self { + WorkerVersioningStrategy::WorkerDeploymentBased(opts) => { + opts.default_versioning_behavior + } + _ => None, + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +pub struct WorkerDeploymentOptions { + /// The deployment version of this worker. + pub version: WorkerDeploymentVersion, + /// If set, opts in to the Worker Deployment Versioning feature, meaning this worker will only + /// receive tasks for workflows it claims to be compatible with. + pub use_worker_versioning: bool, + /// The default versioning behavior to use for workflows that do not pass one to Core. + /// It is a startup-time error to specify `Some(Unspecified)` here. + pub default_versioning_behavior: Option, +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash)] +pub struct WorkerDeploymentVersion { + /// Name of the deployment + pub deployment_name: String, + /// Build ID for the worker. + pub build_id: String, +} diff --git a/core/src/core_tests/queries.rs b/core/src/core_tests/queries.rs index 2bcfc761e..6b7bafc93 100644 --- a/core/src/core_tests/queries.rs +++ b/core/src/core_tests/queries.rs @@ -10,7 +10,7 @@ use std::{ collections::{HashMap, VecDeque}, time::Duration, }; -use temporal_sdk_core_api::Worker as WorkerTrait; +use temporal_sdk_core_api::{Worker as WorkerTrait, worker::WorkerVersioningStrategy}; use temporal_sdk_core_protos::{ TestHistoryBuilder, coresdk::{ @@ -881,7 +881,9 @@ async fn build_id_set_properly_on_query_on_first_task() { let mut mock = build_mock_pollers(mh); mock.worker_cfg(|wc| { wc.max_cached_workflows = 10; - wc.worker_build_id = "1.0".to_string(); + wc.versioning_strategy = WorkerVersioningStrategy::None { + build_id: "1.0".to_owned(), + } }); let core = mock_worker(mock); diff --git a/core/src/core_tests/workflow_tasks.rs b/core/src/core_tests/workflow_tasks.rs index 37234bcef..38e39ccf9 100644 --- a/core/src/core_tests/workflow_tasks.rs +++ b/core/src/core_tests/workflow_tasks.rs @@ -35,7 +35,7 @@ use temporal_sdk_core_api::{ errors::PollError, worker::{ PollerBehavior, SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, - SlotSupplier, SlotSupplierPermit, WorkflowSlotKind, + SlotSupplier, SlotSupplierPermit, WorkerVersioningStrategy, WorkflowSlotKind, }, }; use temporal_sdk_core_protos::{ @@ -2977,7 +2977,9 @@ async fn sets_build_id_from_wft_complete() { let mut worker = mock_sdk_cfg( MockPollCfg::from_resp_batches(wfid, t, [ResponseType::AllHistory], mock), |cfg| { - cfg.worker_build_id = "fierce-predator".to_string(); + cfg.versioning_strategy = WorkerVersioningStrategy::None { + build_id: "fierce-predator".to_string(), + }; cfg.max_cached_workflows = 1; }, ); diff --git a/core/src/lib.rs b/core/src/lib.rs index 404b47ab4..dcaf1c147 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -98,8 +98,7 @@ where client, worker_config.namespace.clone(), client_ident, - worker_config.worker_build_id.clone(), - worker_config.use_worker_versioning, + worker_config.versioning_strategy.clone(), )); Ok(Worker::new( diff --git a/core/src/test_help/mod.rs b/core/src/test_help/mod.rs index 2dddb6188..72c167f4a 100644 --- a/core/src/test_help/mod.rs +++ b/core/src/test_help/mod.rs @@ -31,7 +31,11 @@ use std::{ time::Duration, }; use temporal_sdk::interceptors::FailOnNondeterminismInterceptor; -use temporal_sdk_core_api::{Worker as WorkerTrait, errors::PollError, worker::PollerBehavior}; +use temporal_sdk_core_api::{ + Worker as WorkerTrait, + errors::PollError, + worker::{PollerBehavior, WorkerVersioningStrategy}, +}; use temporal_sdk_core_protos::{ coresdk::{ workflow_activation::{WorkflowActivation, workflow_activation_job}, @@ -62,7 +66,9 @@ pub(crate) fn test_worker_cfg() -> WorkerConfigBuilder { let mut wcb = WorkerConfigBuilder::default(); wcb.namespace(NAMESPACE) .task_queue(TEST_Q) - .worker_build_id("test_bin_id") + .versioning_strategy(WorkerVersioningStrategy::None { + build_id: "test_bin_id".to_string(), + }) .ignore_evicts_on_shutdown(true) // Serial polling since it makes mocking much easier. .workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize)); diff --git a/core/src/worker/client.rs b/core/src/worker/client.rs index 4170fec59..32741324d 100644 --- a/core/src/worker/client.rs +++ b/core/src/worker/client.rs @@ -7,6 +7,7 @@ use temporal_client::{ Client, IsWorkerTaskLongPoll, Namespace, NamespacedClient, NoRetryOnMatching, RetryClient, SlotManager, WorkflowService, }; +use temporal_sdk_core_api::worker::WorkerVersioningStrategy; use temporal_sdk_core_protos::{ TaskToken, coresdk::workflow_commands::QueryResult, @@ -16,7 +17,10 @@ use temporal_sdk_core_protos::{ MeteringMetadata, Payloads, WorkerVersionCapabilities, WorkerVersionStamp, WorkflowExecution, }, - enums::v1::{TaskQueueKind, WorkflowTaskFailedCause}, + deployment, + enums::v1::{ + TaskQueueKind, VersioningBehavior, WorkerVersioningMode, WorkflowTaskFailedCause, + }, failure::v1::Failure, nexus, protocol::v1::Message as ProtocolMessage, @@ -35,8 +39,7 @@ pub(crate) struct WorkerClientBag { replaceable_client: RwLock>, namespace: String, identity: String, - worker_build_id: String, - use_versioning: bool, + worker_versioning_strategy: WorkerVersioningStrategy, } impl WorkerClientBag { @@ -44,15 +47,13 @@ impl WorkerClientBag { client: RetryClient, namespace: String, identity: String, - worker_build_id: String, - use_versioning: bool, + worker_versioning_strategy: WorkerVersioningStrategy, ) -> Self { Self { replaceable_client: RwLock::new(client), namespace, identity, - worker_build_id, - use_versioning, + worker_versioning_strategy, } } @@ -68,16 +69,34 @@ impl WorkerClientBag { if self.default_capabilities().build_id_based_versioning { "".to_string() } else { - self.worker_build_id.clone() + self.worker_versioning_strategy.build_id().to_owned() + } + } + + fn deployment_options(&self) -> Option { + match &self.worker_versioning_strategy { + WorkerVersioningStrategy::WorkerDeploymentBased(dopts) => { + Some(deployment::v1::WorkerDeploymentOptions { + deployment_name: dopts.version.deployment_name.clone(), + build_id: dopts.version.build_id.clone(), + worker_versioning_mode: if dopts.use_worker_versioning { + WorkerVersioningMode::Versioned.into() + } else { + WorkerVersioningMode::Unversioned.into() + }, + }) + } + _ => None, } } fn worker_version_capabilities(&self) -> Option { if self.default_capabilities().build_id_based_versioning { Some(WorkerVersionCapabilities { - build_id: self.worker_build_id.clone(), - use_versioning: self.use_versioning, - // TODO: https://github.com/temporalio/sdk-core/issues/866 + build_id: self.worker_versioning_strategy.build_id().to_owned(), + use_versioning: self.worker_versioning_strategy.uses_build_id_based(), + // This will never be used, as it is the v3 version that we never supported in + // Core SDKs. deployment_series_name: "".to_string(), }) } else { @@ -88,8 +107,8 @@ impl WorkerClientBag { fn worker_version_stamp(&self) -> Option { if self.default_capabilities().build_id_based_versioning { Some(WorkerVersionStamp { - build_id: self.worker_build_id.clone(), - use_versioning: self.use_versioning, + build_id: self.worker_versioning_strategy.build_id().to_owned(), + use_versioning: self.worker_versioning_strategy.uses_build_id_based(), }) } else { None @@ -221,8 +240,7 @@ impl WorkerClient for WorkerClientBag { identity: self.identity.clone(), binary_checksum: self.binary_checksum(), worker_version_capabilities: self.worker_version_capabilities(), - // TODO: https://github.com/temporalio/sdk-core/issues/866 - deployment_options: None, + deployment_options: self.deployment_options(), } .into_request(); request.extensions_mut().insert(IsWorkerTaskLongPoll); @@ -258,8 +276,7 @@ impl WorkerClient for WorkerClientBag { max_tasks_per_second: Some(tps), }), worker_version_capabilities: self.worker_version_capabilities(), - // TODO: https://github.com/temporalio/sdk-core/issues/866 - deployment_options: None, + deployment_options: self.deployment_options(), } .into_request(); request.extensions_mut().insert(IsWorkerTaskLongPoll); @@ -291,8 +308,7 @@ impl WorkerClient for WorkerClientBag { }), identity: self.identity.clone(), worker_version_capabilities: self.worker_version_capabilities(), - // TODO: https://github.com/temporalio/sdk-core/issues/866 - deployment_options: None, + deployment_options: self.deployment_options(), } .into_request(); request.extensions_mut().insert(IsWorkerTaskLongPoll); @@ -348,11 +364,10 @@ impl WorkerClient for WorkerClientBag { capabilities: Some(respond_workflow_task_completed_request::Capabilities { discard_speculative_workflow_task_with_events: true, }), - // TODO: https://github.com/temporalio/sdk-core/issues/866 + // Will never be set, deprecated. deployment: None, - versioning_behavior: 0, - // TODO: https://github.com/temporalio/sdk-core/issues/866 - deployment_options: None, + versioning_behavior: request.versioning_behavior.into(), + deployment_options: self.deployment_options(), }; Ok(self .cloned_client() @@ -638,4 +653,6 @@ pub(crate) struct WorkflowTaskCompletion { pub(crate) sdk_metadata: WorkflowTaskCompletedMetadata, /// Metering info pub(crate) metering_metadata: MeteringMetadata, + /// Versioning behavior of the workflow, if any. + pub(crate) versioning_behavior: VersioningBehavior, } diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index 1a462bffa..8c65ca338 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -325,7 +325,7 @@ impl Worker { let slot_context_data = Arc::new(PermitDealerContextData { task_queue: config.task_queue.clone(), worker_identity: config.client_identity_override.clone().unwrap_or_default(), - worker_build_id: config.worker_build_id.clone(), + worker_build_id: config.build_id().to_owned(), }); let wft_slots = MeteredPermitDealer::new( tuner.workflow_task_slot_supplier(), @@ -490,6 +490,9 @@ impl Worker { server_capabilities: client.capabilities().unwrap_or_default(), sdk_name: sdk_name_and_ver.0, sdk_version: sdk_name_and_ver.1, + default_versioning_behavior: config + .versioning_strategy + .default_versioning_behavior(), }, sticky_queue_name.map(|sq| StickyExecutionAttributes { worker_task_queue: Some(TaskQueue { diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index a06132cd5..5092c15bf 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -445,8 +445,8 @@ impl WorkflowMachines { let build_id_for_current_task = if is_replaying { self.current_wft_build_id.clone().unwrap_or_default() } else { - self.current_wft_build_id = Some(self.worker_config.worker_build_id.clone()); - self.worker_config.worker_build_id.clone() + self.current_wft_build_id = Some(self.worker_config.build_id().to_owned()); + self.worker_config.build_id().to_owned() }; WorkflowActivation { timestamp: self.current_wf_time.map(Into::into), @@ -479,8 +479,8 @@ impl WorkflowMachines { // If this worker has a build ID and we're completing the task, we want to say our ID is the // current build ID, so that if we get a query before any new history, we properly can // report that our ID was the one used for the completion. - if !self.worker_config.worker_build_id.is_empty() { - self.current_wft_build_id = Some(self.worker_config.worker_build_id.clone()); + if !self.worker_config.build_id().is_empty() { + self.current_wft_build_id = Some(self.worker_config.build_id().to_owned()); } (*self.observed_internal_flags) .borrow_mut() diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 47e817771..b64fd8e95 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -40,7 +40,8 @@ use temporal_sdk_core_protos::{ workflow_completion, }, temporal::api::{ - command::v1::command::Attributes as CmdAttribs, enums::v1::WorkflowTaskFailedCause, + command::v1::command::Attributes as CmdAttribs, + enums::v1::{VersioningBehavior, WorkflowTaskFailedCause}, failure::v1::Failure, }, }; @@ -374,6 +375,7 @@ impl ManagedRun { &mut self, mut commands: Vec, used_flags: Vec, + versioning_behavior: VersioningBehavior, resp_chan: Option>, is_forced_failure: bool, ) -> Result> { @@ -450,6 +452,7 @@ impl ManagedRun { used_flags, resp_chan, is_forced_failure, + versioning_behavior, }; // Verify we can actually apply the next workflow task, which will happen as part of @@ -620,6 +623,7 @@ impl ManagedRun { metadata: None, }], vec![], + VersioningBehavior::Unspecified, // Doesn't matter since we're failing wf resp_chan, true, ) @@ -692,6 +696,7 @@ impl ManagedRun { has_pending_query: completion.has_pending_query, activation_was_eviction: completion.activation_was_eviction, is_forced_failure: completion.is_forced_failure, + versioning_behavior: completion.versioning_behavior, }; self.wfm.machines.add_lang_used_flags(completion.used_flags); @@ -1141,6 +1146,7 @@ impl ManagedRun { messages, query_responses, sdk_metadata: machines_wft_response.metadata_for_complete(), + versioning_behavior: data.versioning_behavior, }, }) } else { @@ -1339,6 +1345,7 @@ struct CompletionDataForWFT { has_pending_query: bool, activation_was_eviction: bool, is_forced_failure: bool, + versioning_behavior: VersioningBehavior, } /// Manages an instance of a [WorkflowMachines], which is not thread-safe, as well as other data @@ -1483,6 +1490,7 @@ struct RunActivationCompletion { /// Used to notify the worker when the completion is done processing and the completion can /// unblock. Must always be `Some` when initialized. resp_chan: Option>, + versioning_behavior: VersioningBehavior, } #[derive(Debug, derive_more::From)] enum ActOrFulfill { diff --git a/core/src/worker/workflow/mod.rs b/core/src/worker/workflow/mod.rs index 26513abda..c40a301b5 100644 --- a/core/src/worker/workflow/mod.rs +++ b/core/src/worker/workflow/mod.rs @@ -76,7 +76,7 @@ use temporal_sdk_core_protos::{ temporal::api::{ command::v1::{Command as ProtoCommand, Command, command::Attributes}, common::v1::{Memo, MeteringMetadata, RetryPolicy, SearchAttributes, WorkflowExecution}, - enums::v1::WorkflowTaskFailedCause, + enums::v1::{VersioningBehavior, WorkflowTaskFailedCause}, protocol::v1::Message as ProtocolMessage, query::v1::WorkflowQuery, sdk::v1::{UserMetadata, WorkflowTaskCompletedMetadata}, @@ -125,6 +125,7 @@ pub(crate) struct Workflows { wft_semaphore: MeteredPermitDealer, local_act_mgr: Arc, ever_polled: AtomicBool, + default_versioning_behavior: Option, } pub(crate) struct WorkflowBasics { @@ -134,6 +135,7 @@ pub(crate) struct WorkflowBasics { pub(crate) server_capabilities: get_system_info_response::Capabilities, pub(crate) sdk_name: String, pub(crate) sdk_version: String, + pub(crate) default_versioning_behavior: Option, } pub(crate) struct RunBasics<'a> { @@ -166,6 +168,7 @@ impl Workflows { let (fetch_tx, fetch_rx) = unbounded_channel(); let shutdown_tok = basics.shutdown_token.clone(); let task_queue = basics.worker_config.task_queue.clone(); + let default_versioning_behavior = basics.default_versioning_behavior; let extracted_wft_stream = WFTExtractor::build( client.clone(), basics.worker_config.fetching_concurrency, @@ -251,6 +254,7 @@ impl Workflows { wft_semaphore, local_act_mgr, ever_polled: AtomicBool::new(false), + default_versioning_behavior, } } @@ -367,6 +371,7 @@ impl Workflows { query_responses, force_new_wft, sdk_metadata, + mut versioning_behavior, }, } => { let reserved_act_permits = @@ -374,6 +379,11 @@ impl Workflows { debug!(commands=%commands.display(), query_responses=%query_responses.display(), messages=%messages.display(), force_new_wft, "Sending responses to server"); + if let Some(default_vb) = self.default_versioning_behavior.as_ref() { + if versioning_behavior == VersioningBehavior::Unspecified { + versioning_behavior = *default_vb; + } + } let mut completion = WorkflowTaskCompletion { task_token, commands, @@ -389,6 +399,7 @@ impl Workflows { .get_nonfirst_attempt_count(&run_id) as u32, }, + versioning_behavior, }; let sticky_attrs = self.sticky_attrs.clone(); // Do not return new WFT if we would not cache, because returned new WFTs are @@ -895,6 +906,7 @@ pub(crate) enum ActivationAction { query_responses: Vec, force_new_wft: bool, sdk_metadata: WorkflowTaskCompletedMetadata, + versioning_behavior: VersioningBehavior, }, /// We should respond to a legacy query request RespondLegacyQuery { result: Box }, @@ -1088,6 +1100,7 @@ fn validate_completion( run_id: completion.run_id, commands, used_flags: success.used_internal_flags, + versioning_behavior: success.versioning_behavior.try_into().unwrap_or_default(), }) } Some(workflow_activation_completion::Status::Failed(failure)) => { @@ -1111,6 +1124,7 @@ enum ValidatedCompletion { run_id: String, commands: Vec, used_flags: Vec, + versioning_behavior: VersioningBehavior, }, Fail { run_id: String, diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index 4af7b1cd0..7b1326c57 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -40,11 +40,11 @@ impl WFStream { /// Constructs workflow state management and returns a stream which outputs activations. /// /// * `wft_stream` is a stream of validated poll responses and fetched history pages as returned - /// by a poller (or mock), via [WFTExtractor]. + /// by a poller (or mock), via [WFTExtractor]. /// * `local_rx` is a stream of actions that workflow state needs to see. Things like - /// completions, local activities finishing, etc. See [LocalInputs]. + /// completions, local activities finishing, etc. See [LocalInputs]. /// * `local_activity_request_sink` is used to handle outgoing requests to start or cancel - /// local activities, and may return resolutions that need to be handled immediately. + /// local activities, and may return resolutions that need to be handled immediately. /// /// The stream inputs are combined into a stream of [WFActStreamInput]s. The stream processor /// then takes action on those inputs, mutating the [WFStream] state, and then may yield @@ -264,10 +264,12 @@ impl WFStream { ValidatedCompletion::Success { commands, used_flags, + versioning_behavior, .. } => match rh.successful_completion( commands, used_flags, + versioning_behavior, complete.response_tx, false, ) { diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto index fd2f4fd46..a857b654a 100644 --- a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto +++ b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto @@ -37,15 +37,15 @@ import "temporal/sdk/core/nexus/nexus.proto"; // This is because: // * Patches are expected to apply to the entire activation // * Signal and update handlers should be invoked before workflow routines are iterated. That is to -// say before the users' main workflow function and anything spawned by it is allowed to continue. +// say before the users' main workflow function and anything spawned by it is allowed to continue. // * Local activities resolutions go after other normal jobs because while *not* replaying, they -// will always take longer than anything else that produces an immediate job (which is -// effectively instant). When *replaying* we need to scan ahead for LA markers so that we can -// resolve them in the same activation that they completed in when not replaying. However, doing -// so would, by default, put those resolutions *before* any other immediate jobs that happened -// in that same activation (prime example: cancelling not-wait-for-cancel activities). So, we do -// this to ensure the LA resolution happens after that cancel (or whatever else it may be) as it -// normally would have when executing. +// will always take longer than anything else that produces an immediate job (which is +// effectively instant). When *replaying* we need to scan ahead for LA markers so that we can +// resolve them in the same activation that they completed in when not replaying. However, doing +// so would, by default, put those resolutions *before* any other immediate jobs that happened +// in that same activation (prime example: cancelling not-wait-for-cancel activities). So, we do +// this to ensure the LA resolution happens after that cancel (or whatever else it may be) as it +// normally would have when executing. // * Queries always go last (and, in fact, always come in their own activation) // * Evictions also always come in their own activation // diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_completion/workflow_completion.proto b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_completion/workflow_completion.proto index ffc255e41..78637f640 100644 --- a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_completion/workflow_completion.proto +++ b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_completion/workflow_completion.proto @@ -5,6 +5,7 @@ option ruby_package = "Temporalio::Internal::Bridge::Api::WorkflowCompletion"; import "temporal/api/failure/v1/message.proto"; import "temporal/api/enums/v1/failed_cause.proto"; +import "temporal/api/enums/v1/workflow.proto"; import "temporal/sdk/core/common/common.proto"; import "temporal/sdk/core/workflow_commands/workflow_commands.proto"; @@ -24,6 +25,8 @@ message Success { repeated workflow_commands.WorkflowCommand commands = 1; // Any internal flags which the lang SDK used in the processing of this activation repeated uint32 used_internal_flags = 6; + // The versioning behavior this workflow is currently using + temporal.api.enums.v1.VersioningBehavior versioning_behavior = 7; } // Failure to activate or execute a workflow diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index c3e7b5446..9c9ec8533 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -40,7 +40,7 @@ pub mod coresdk { ENCODING_PAYLOAD_KEY, JSON_ENCODING_VAL, temporal::api::{ common::v1::{Payload, Payloads, WorkflowExecution}, - enums::v1::{TimeoutType, WorkflowTaskFailedCause}, + enums::v1::{TimeoutType, VersioningBehavior, WorkflowTaskFailedCause}, failure::v1::{ ActivityFailureInfo, ApplicationFailureInfo, Failure, TimeoutFailureInfo, failure::FailureInfo, @@ -1017,6 +1017,7 @@ pub mod coresdk { Self { commands: v, used_internal_flags: vec![], + versioning_behavior: VersioningBehavior::Unspecified.into(), } } } diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 5cfb11792..b92b678ba 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -11,7 +11,10 @@ //! use std::{str::FromStr, sync::Arc}; //! use temporal_sdk::{sdk_client_options, ActContext, Worker}; //! use temporal_sdk_core::{init_worker, Url, CoreRuntime}; -//! use temporal_sdk_core_api::{worker::WorkerConfigBuilder, telemetry::TelemetryOptionsBuilder}; +//! use temporal_sdk_core_api::{ +//! worker::{WorkerConfigBuilder, WorkerVersioningStrategy}, +//! telemetry::TelemetryOptionsBuilder +//! }; //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { @@ -25,7 +28,7 @@ //! let worker_config = WorkerConfigBuilder::default() //! .namespace("default") //! .task_queue("task_queue") -//! .worker_build_id("rust-sdk") +//! .versioning_strategy(WorkerVersioningStrategy::None { build_id: "rust-sdk".to_owned() }) //! .build()?; //! //! let core_worker = init_worker(&runtime, worker_config, client)?; diff --git a/sdk/src/workflow_future.rs b/sdk/src/workflow_future.rs index 42086f76e..4a0f75614 100644 --- a/sdk/src/workflow_future.rs +++ b/sdk/src/workflow_future.rs @@ -32,7 +32,7 @@ use temporal_sdk_core_protos::{ workflow_completion, workflow_completion::{WorkflowActivationCompletion, workflow_activation_completion}, }, - temporal::api::{common::v1::Payload, failure::v1::Failure}, + temporal::api::{common::v1::Payload, enums::v1::VersioningBehavior, failure::v1::Failure}, utilities::TryIntoOrNone, }; use tokio::sync::{ @@ -160,6 +160,7 @@ impl WorkflowFuture { workflow_completion::Success { commands: activation_cmds, used_internal_flags: vec![], + versioning_behavior: VersioningBehavior::Unspecified.into(), }, )), }) diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 07d21bdc1..3dc37e4f3 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -11,15 +11,20 @@ pub mod workflows; pub use temporal_sdk_core::replay::HistoryForReplay; use crate::stream::{Stream, TryStreamExt}; -use anyhow::Context; +use anyhow::{Context, bail}; use assert_matches::assert_matches; use futures_util::{StreamExt, future, stream, stream::FuturesUnordered}; use parking_lot::Mutex; use prost::Message; use rand::Rng; use std::{ - convert::TryFrom, env, future::Future, net::SocketAddr, path::PathBuf, sync::Arc, - time::Duration, + convert::TryFrom, + env, + future::Future, + net::SocketAddr, + path::PathBuf, + sync::Arc, + time::{Duration, Instant}, }; use temporal_client::{ Client, ClientTlsConfig, NamespacedClient, RetryClient, TlsConfig, WfClientExt, @@ -45,6 +50,7 @@ use temporal_sdk_core_api::{ PrometheusExporterOptionsBuilder, TelemetryOptions, TelemetryOptionsBuilder, metrics::CoreMeter, }, + worker::WorkerVersioningStrategy, }; use temporal_sdk_core_protos::{ DEFAULT_ACTIVITY_TYPE, @@ -109,7 +115,9 @@ pub fn integ_worker_config(tq: &str) -> WorkerConfigBuilder { .max_outstanding_activities(100_usize) .max_outstanding_local_activities(100_usize) .max_outstanding_workflow_tasks(100_usize) - .worker_build_id("test_build_id"); + .versioning_strategy(WorkerVersioningStrategy::None { + build_id: "test_build_id".to_owned(), + }); b } @@ -248,7 +256,7 @@ impl CoreWfStarter { /// Start the workflow defined by the builder and return run id pub async fn start_wf(&mut self) -> String { - self.start_wf_with_id(self.task_queue_name.clone()).await + self.start_wf_with_id(self.get_wf_id().to_owned()).await } /// Starts the workflow using the worker @@ -317,6 +325,20 @@ impl CoreWfStarter { &self.task_queue_name } + /// Fetch the history of the default workflow for this starter. IE: The one that would + /// be started by [CoreWfStarter::start_wf]. + pub async fn get_history(&self) -> History { + self.initted_worker + .get() + .expect("Starter must be initialized") + .client + .get_workflow_execution_history(self.get_wf_id().to_string(), None, vec![]) + .await + .unwrap() + .history + .unwrap() + } + async fn get_or_init(&mut self) -> &InitializedWorker { self.initted_worker .get_or_init(|| async { @@ -917,3 +939,20 @@ impl Drop for AbortOnDrop { self.ah.abort(); } } + +pub async fn eventually(func: F, timeout: Duration) -> Result +where + F: Fn() -> Fut, + Fut: Future>, +{ + let start = Instant::now(); + loop { + if start.elapsed() > timeout { + bail!("Eventually hit timeout"); + } + if let Ok(v) = func().await { + return Ok(v); + } + tokio::time::sleep(Duration::from_millis(50)).await; + } +} diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index 78e1c481f..eb06e9fcf 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -27,7 +27,7 @@ use temporal_sdk_core_api::{ PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder, metrics::{CoreMeter, MetricAttributes, MetricParameters}, }, - worker::{PollerBehavior, WorkerConfigBuilder}, + worker::{PollerBehavior, WorkerConfigBuilder, WorkerVersioningStrategy}, }; use temporal_sdk_core_protos::{ coresdk::{ @@ -148,7 +148,9 @@ async fn one_slot_worker_reports_available_slot() { let worker_cfg = WorkerConfigBuilder::default() .namespace(NAMESPACE) .task_queue(tq) - .worker_build_id("test_build_id") + .versioning_strategy(WorkerVersioningStrategy::None { + build_id: "test_build_id".to_owned(), + }) .max_cached_workflows(2_usize) .max_outstanding_activities(1_usize) .max_outstanding_local_activities(1_usize) diff --git a/tests/integ_tests/update_tests.rs b/tests/integ_tests/update_tests.rs index 1baffbad8..4eedd0857 100644 --- a/tests/integ_tests/update_tests.rs +++ b/tests/integ_tests/update_tests.rs @@ -903,12 +903,7 @@ async fn task_failure_during_validation() { .await .unwrap(); // Verify we did not spam task failures. There should only be one. - let history = client - .get_workflow_execution_history(wf_id, None, vec![]) - .await - .unwrap() - .history - .unwrap(); + let history = starter.get_history().await; assert_eq!( history .events diff --git a/tests/integ_tests/worker_tests.rs b/tests/integ_tests/worker_tests.rs index 28c6daf7b..d50eb006f 100644 --- a/tests/integ_tests/worker_tests.rs +++ b/tests/integ_tests/worker_tests.rs @@ -6,7 +6,7 @@ use temporal_sdk_core::{CoreRuntime, ResourceBasedTuner, ResourceSlotOptions, in use temporal_sdk_core_api::{ Worker, errors::WorkerValidationError, - worker::{PollerBehavior, WorkerConfigBuilder}, + worker::{PollerBehavior, WorkerConfigBuilder, WorkerVersioningStrategy}, }; use temporal_sdk_core_protos::{ coresdk::workflow_completion::{ @@ -34,7 +34,9 @@ async fn worker_validation_fails_on_nonexistent_namespace() { WorkerConfigBuilder::default() .namespace("i_dont_exist") .task_queue("Wheee!") - .worker_build_id("blah") + .versioning_strategy(WorkerVersioningStrategy::None { + build_id: "blah".to_owned(), + }) .build() .unwrap(), retrying_client, diff --git a/tests/integ_tests/worker_versioning_tests.rs b/tests/integ_tests/worker_versioning_tests.rs new file mode 100644 index 000000000..4f63d03d9 --- /dev/null +++ b/tests/integ_tests/worker_versioning_tests.rs @@ -0,0 +1,131 @@ +use std::time::Duration; +use temporal_client::{NamespacedClient, WorkflowService}; +use temporal_sdk_core_api::worker::{ + WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerVersioningStrategy, +}; +use temporal_sdk_core_protos::{ + coresdk::{ + workflow_commands::CompleteWorkflowExecution, workflow_completion, + workflow_completion::WorkflowActivationCompletion, + }, + temporal::api::{ + enums::v1::VersioningBehavior, + history::v1::history_event::Attributes, + workflowservice::v1::{ + DescribeWorkerDeploymentRequest, SetWorkerDeploymentCurrentVersionRequest, + }, + }, +}; +use temporal_sdk_core_test_utils::{CoreWfStarter, WorkerTestHelpers, eventually}; +use tokio::join; + +#[rstest::rstest] +#[tokio::test] +async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_default: bool) { + let wf_type = "sets_deployment_info_on_task_responses"; + let mut starter = CoreWfStarter::new(wf_type); + let deploy_name = format!("deployment-{}", starter.get_task_queue()); + starter + .worker_config + .versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased( + WorkerDeploymentOptions { + version: WorkerDeploymentVersion { + deployment_name: deploy_name.clone(), + build_id: "1.0".to_string(), + }, + use_worker_versioning: true, + default_versioning_behavior: VersioningBehavior::AutoUpgrade.into(), + }, + )) + .no_remote_activities(true); + let core = starter.get_worker().await; + let client = starter.get_client().await; + + // A bit annoying. We have to start up polling here so that the deployment will exist before + // we can describe it and then set the current version. + let worker_task = async { + let res = core.poll_workflow_activation().await.unwrap(); + assert_eq!(res.build_id_for_current_task, "1.0"); + + let mut success_complete = workflow_completion::Success::from_variants(vec![ + CompleteWorkflowExecution { result: None }.into(), + ]); + if !use_default { + success_complete.versioning_behavior = VersioningBehavior::Pinned.into(); + } + core.complete_workflow_activation(WorkflowActivationCompletion { + run_id: res.run_id.clone(), + status: Some(success_complete.into()), + }) + .await + .unwrap(); + }; + + let ops_task = async { + let desc_resp = eventually( + async || { + client + .get_client() + .clone() + .describe_worker_deployment(DescribeWorkerDeploymentRequest { + namespace: client.namespace().to_string(), + deployment_name: deploy_name.clone(), + }) + .await + }, + Duration::from_secs(5), + ) + .await + .unwrap() + .into_inner(); + + client + .get_client() + .clone() + .set_worker_deployment_current_version(SetWorkerDeploymentCurrentVersionRequest { + namespace: client.namespace().to_owned(), + deployment_name: deploy_name.clone(), + version: format!("{}.1.0", deploy_name), + conflict_token: desc_resp.conflict_token, + ..Default::default() + }) + .await + .unwrap(); + + starter.start_wf().await; + }; + + join!(worker_task, ops_task); + core.handle_eviction().await; + core.shutdown().await; + + // Fetch history & verify task complete is properly stamped + let history = starter.get_history().await; + let wft_complete = history + .events + .into_iter() + .find_map(|e| { + if let Attributes::WorkflowTaskCompletedEventAttributes(a) = e.attributes.unwrap() { + Some(a) + } else { + None + } + }) + .unwrap(); + if use_default { + assert_eq!( + wft_complete.versioning_behavior, + VersioningBehavior::AutoUpgrade as i32 + ); + } else { + assert_eq!( + wft_complete.versioning_behavior, + VersioningBehavior::Pinned as i32 + ); + } + assert_eq!(wft_complete.worker_deployment_name, deploy_name); + assert_eq!( + wft_complete.worker_deployment_version, + format!("{}.1.0", deploy_name) + ); +} diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index b36632aae..f8cdb3ab6 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -33,7 +33,7 @@ use temporal_sdk::{ use temporal_sdk_core::{CoreRuntime, replay::HistoryForReplay}; use temporal_sdk_core_api::{ errors::{PollError, WorkflowErrorType}, - worker::PollerBehavior, + worker::{PollerBehavior, WorkerVersioningStrategy}, }; use temporal_sdk_core_protos::{ coresdk::{ @@ -566,7 +566,9 @@ async fn build_id_correct_in_wf_info() { let mut starter = CoreWfStarter::new(wf_type); starter .worker_config - .worker_build_id("1.0") + .versioning_strategy(WorkerVersioningStrategy::None { + build_id: "1.0".to_owned(), + }) .no_remote_activities(true); let core = starter.get_worker().await; starter.start_wf().await; @@ -629,7 +631,12 @@ async fn build_id_correct_in_wf_info() { .unwrap(); let mut starter = starter.clone_no_worker(); - starter.worker_config.worker_build_id("2.0"); + starter + .worker_config + .versioning_strategy(WorkerVersioningStrategy::None { + build_id: "2.0".to_owned(), + }); + let core = starter.get_worker().await; let query_fut = async { @@ -729,12 +736,7 @@ async fn nondeterminism_errors_fail_workflow_when_configured_to( let stopper = async { // Wait for the timer to show up in history and then stop the worker loop { - let hist = client - .get_workflow_execution_history(wf_id.clone(), None, vec![]) - .await - .unwrap() - .history - .unwrap(); + let hist = starter.get_history().await; let has_timer_event = hist .events .iter() diff --git a/tests/main.rs b/tests/main.rs index 84a8f2bc6..dcbdcd720 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -17,6 +17,7 @@ mod integ_tests { mod update_tests; mod visibility_tests; mod worker_tests; + mod worker_versioning_tests; mod workflow_tests; use std::{env, str::FromStr, time::Duration}; diff --git a/tests/runner.rs b/tests/runner.rs index 4bd9331b6..37e8ea2cc 100644 --- a/tests/runner.rs +++ b/tests/runner.rs @@ -99,6 +99,12 @@ async fn main() -> Result<(), anyhow::Error> { "system.enableEagerWorkflowStart=true".to_string(), "--dynamic-config-value".to_string(), "system.enableNexus=true".to_string(), + "--dynamic-config-value".to_owned(), + "frontend.workerVersioningWorkflowAPIs=true".to_owned(), + "--dynamic-config-value".to_owned(), + "frontend.workerVersioningDataAPIs=true".to_owned(), + "--dynamic-config-value".to_owned(), + "system.enableDeploymentVersions=true".to_owned(), "--http-port".to_string(), "7243".to_string(), "--search-attribute".to_string(), @@ -108,7 +114,7 @@ async fn main() -> Result<(), anyhow::Error> { ]) .ui(true) .build()?; - println!("Using temporal CLI"); + println!("Using temporal CLI: {:?}", config); ( Some( config