diff --git a/core-api/src/worker.rs b/core-api/src/worker.rs index 6888fb7a4..3a4cdff24 100644 --- a/core-api/src/worker.rs +++ b/core-api/src/worker.rs @@ -2,10 +2,12 @@ use crate::{errors::WorkflowErrorType, telemetry::metrics::TemporalMeter}; use std::{ any::Any, collections::{HashMap, HashSet}, + str::FromStr, sync::Arc, time::Duration, }; use temporal_sdk_core_protos::{ + coresdk, coresdk::{ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo, WorkflowSlotInfo}, temporal::api::enums::v1::VersioningBehavior, }; @@ -175,8 +177,21 @@ impl WorkerConfig { .unwrap_or(false) } - pub fn build_id(&self) -> &str { - self.versioning_strategy.build_id() + pub fn computed_deployment_version(&self) -> Option { + let wdv = match self.versioning_strategy { + WorkerVersioningStrategy::None { ref build_id } => WorkerDeploymentVersion { + deployment_name: "".to_owned(), + build_id: build_id.clone(), + }, + WorkerVersioningStrategy::WorkerDeploymentBased(ref opts) => opts.version.clone(), + WorkerVersioningStrategy::LegacyBuildIdBased { ref build_id } => { + WorkerDeploymentVersion { + deployment_name: "".to_owned(), + build_id: build_id.clone(), + } + } + }; + if wdv.is_empty() { None } else { Some(wdv) } } } @@ -316,9 +331,6 @@ pub trait SlotReservationContext: Send + Sync { /// Returns the identity of the worker fn worker_identity(&self) -> &str; - /// Returns the build id of the worker - fn worker_build_id(&self) -> &str; - /// Returns the number of currently outstanding slot permits, whether used or un-used. fn num_issued_slots(&self) -> usize; @@ -569,3 +581,41 @@ pub struct WorkerDeploymentVersion { /// Build ID for the worker. pub build_id: String, } + +impl WorkerDeploymentVersion { + pub fn is_empty(&self) -> bool { + self.deployment_name.is_empty() && self.build_id.is_empty() + } +} + +impl FromStr for WorkerDeploymentVersion { + type Err = (); + + fn from_str(s: &str) -> Result { + match s.split_once('.') { + Some((name, build_id)) => Ok(WorkerDeploymentVersion { + deployment_name: name.to_owned(), + build_id: build_id.to_owned(), + }), + _ => Err(()), + } + } +} + +impl From for coresdk::common::WorkerDeploymentVersion { + fn from(v: WorkerDeploymentVersion) -> coresdk::common::WorkerDeploymentVersion { + coresdk::common::WorkerDeploymentVersion { + deployment_name: v.deployment_name, + build_id: v.build_id, + } + } +} + +impl From for WorkerDeploymentVersion { + fn from(v: coresdk::common::WorkerDeploymentVersion) -> WorkerDeploymentVersion { + WorkerDeploymentVersion { + deployment_name: v.deployment_name, + build_id: v.build_id, + } + } +} diff --git a/core/src/abstractions.rs b/core/src/abstractions.rs index 176024b06..3d842fef4 100644 --- a/core/src/abstractions.rs +++ b/core/src/abstractions.rs @@ -47,7 +47,6 @@ pub(crate) struct MeteredPermitDealer { pub(crate) struct PermitDealerContextData { pub(crate) task_queue: String, pub(crate) worker_identity: String, - pub(crate) worker_build_id: String, } impl MeteredPermitDealer @@ -171,10 +170,6 @@ impl SlotReservationContext for MeteredPermitDealer { &self.context_data.worker_identity } - fn worker_build_id(&self) -> &str { - &self.context_data.worker_build_id - } - fn num_issued_slots(&self) -> usize { *self.extant_permits.1.borrow() } diff --git a/core/src/core_tests/queries.rs b/core/src/core_tests/queries.rs index 6b7bafc93..4c9a9528f 100644 --- a/core/src/core_tests/queries.rs +++ b/core/src/core_tests/queries.rs @@ -888,12 +888,24 @@ async fn build_id_set_properly_on_query_on_first_task() { let core = mock_worker(mock); let task = core.poll_workflow_activation().await.unwrap(); - assert_eq!(task.build_id_for_current_task, "1.0"); + assert_eq!( + task.deployment_version_for_current_task + .as_ref() + .unwrap() + .build_id, + "1.0" + ); core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id)) .await .unwrap(); let task = core.poll_workflow_activation().await.unwrap(); - assert_eq!(task.build_id_for_current_task, "1.0"); + assert_eq!( + task.deployment_version_for_current_task + .as_ref() + .unwrap() + .build_id, + "1.0" + ); core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id)) .await .unwrap(); diff --git a/core/src/core_tests/workflow_tasks.rs b/core/src/core_tests/workflow_tasks.rs index 38e39ccf9..616369c1f 100644 --- a/core/src/core_tests/workflow_tasks.rs +++ b/core/src/core_tests/workflow_tasks.rs @@ -2986,14 +2986,23 @@ async fn sets_build_id_from_wft_complete() { worker.register_wf(DEFAULT_WORKFLOW_TYPE, |ctx: WfContext| async move { // First task, it should be empty, since replaying and nothing in first WFT completed - assert_eq!(ctx.current_build_id(), None); + assert_eq!(ctx.current_deployment_version(), None); ctx.timer(Duration::from_secs(1)).await; - assert_eq!(ctx.current_build_id(), Some("enchi-cat".to_string())); + assert_eq!( + ctx.current_deployment_version().unwrap().build_id, + "enchi-cat" + ); ctx.timer(Duration::from_secs(1)).await; // Not replaying at this point, so we should see the worker's build id - assert_eq!(ctx.current_build_id(), Some("fierce-predator".to_string())); + assert_eq!( + ctx.current_deployment_version().unwrap().build_id, + "fierce-predator" + ); ctx.timer(Duration::from_secs(1)).await; - assert_eq!(ctx.current_build_id(), Some("fierce-predator".to_string())); + assert_eq!( + ctx.current_deployment_version().unwrap().build_id, + "fierce-predator" + ); Ok(().into()) }); worker diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index 8c65ca338..a8f06f247 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -325,7 +325,6 @@ impl Worker { let slot_context_data = Arc::new(PermitDealerContextData { task_queue: config.task_queue.clone(), worker_identity: config.client_identity_override.clone().unwrap_or_default(), - worker_build_id: config.build_id().to_owned(), }); let wft_slots = MeteredPermitDealer::new( tuner.workflow_task_slot_supplier(), diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index 5092c15bf..9a4a8cf74 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -51,7 +51,7 @@ use std::{ sync::Arc, time::{Duration, Instant, SystemTime}, }; -use temporal_sdk_core_api::worker::WorkerConfig; +use temporal_sdk_core_api::worker::{WorkerConfig, WorkerDeploymentVersion}; use temporal_sdk_core_protos::{ coresdk::{ common::{NamespacedWorkflowExecution, VersioningIntent}, @@ -123,8 +123,9 @@ pub(crate) struct WorkflowMachines { history_size_bytes: u64, /// Set on each WFT started event continue_as_new_suggested: bool, - /// Set if the current WFT is already complete and that completion event had a build id in it. - current_wft_build_id: Option, + /// Set if the current WFT is already complete and that completion event had legacy build-id + /// or a deployment version in it. Will use an empty deployment name if it's legacy build-id. + current_wft_deployment_info: Option, all_machines: SlotMap, /// If a machine key is in this map, that machine was created internally by core, not as a @@ -286,7 +287,7 @@ impl WorkflowMachines { observed_internal_flags: Rc::new(RefCell::new(observed_internal_flags)), history_size_bytes: 0, continue_as_new_suggested: false, - current_wft_build_id: None, + current_wft_deployment_info: None, all_machines: Default::default(), machine_is_core_created: Default::default(), machines_by_event_id: Default::default(), @@ -442,11 +443,11 @@ impl WorkflowMachines { ) }); let is_replaying = self.replaying || all_query; - let build_id_for_current_task = if is_replaying { - self.current_wft_build_id.clone().unwrap_or_default() + let deployment_version_for_current_task = if is_replaying { + self.current_wft_deployment_info.clone() } else { - self.current_wft_build_id = Some(self.worker_config.build_id().to_owned()); - self.worker_config.build_id().to_owned() + self.current_wft_deployment_info = self.worker_config.computed_deployment_version(); + self.current_wft_deployment_info.clone() }; WorkflowActivation { timestamp: self.current_wf_time.map(Into::into), @@ -460,7 +461,8 @@ impl WorkflowMachines { .collect(), history_size_bytes: self.history_size_bytes, continue_as_new_suggested: self.continue_as_new_suggested, - build_id_for_current_task, + deployment_version_for_current_task: deployment_version_for_current_task + .map(Into::into), } } @@ -479,9 +481,7 @@ impl WorkflowMachines { // If this worker has a build ID and we're completing the task, we want to say our ID is the // current build ID, so that if we get a query before any new history, we properly can // report that our ID was the one used for the completion. - if !self.worker_config.build_id().is_empty() { - self.current_wft_build_id = Some(self.worker_config.build_id().to_owned()); - } + self.current_wft_deployment_info = self.worker_config.computed_deployment_version(); (*self.observed_internal_flags) .borrow_mut() .gather_for_wft_complete() @@ -587,8 +587,23 @@ impl WorkflowMachines { (*$me.observed_internal_flags) .borrow_mut() .add_from_complete($wtc); + let mut combined_ver = WorkerDeploymentVersion { + deployment_name: "".to_string(), + build_id: "".to_string(), + }; if let Some(bid) = $wtc.worker_version.as_ref().map(|wv| &wv.build_id) { - $me.current_wft_build_id = Some(bid.to_string()); + combined_ver.build_id = bid.to_string(); + } + if !$wtc.worker_deployment_name.is_empty() { + combined_ver.deployment_name = $wtc.worker_deployment_name.clone(); + } + if !$wtc.worker_deployment_version.is_empty() { + if let Ok(ver) = $wtc.worker_deployment_version.parse() { + combined_ver = ver; + } + } + if !combined_ver.is_empty() { + $me.current_wft_deployment_info = Some(combined_ver); } }}; } diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/common/common.proto b/sdk-core-protos/protos/local/temporal/sdk/core/common/common.proto index e296182eb..302050cb0 100644 --- a/sdk-core-protos/protos/local/temporal/sdk/core/common/common.proto +++ b/sdk-core-protos/protos/local/temporal/sdk/core/common/common.proto @@ -29,4 +29,9 @@ enum VersioningIntent { // Indicates that the command should run on the target task queue's current overall-default // build ID. DEFAULT = 2; +} + +message WorkerDeploymentVersion { + string deployment_name = 1; + string build_id = 2; } \ No newline at end of file diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto index a857b654a..de8f64f0f 100644 --- a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto +++ b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto @@ -84,10 +84,14 @@ message WorkflowActivation { uint64 history_size_bytes = 7; // Set true if the most recent WFT started event had this suggestion bool continue_as_new_suggested = 8; - // Set to the Build ID of the worker that processed this task, which may be empty. During replay - // this id may not equal the id of the replaying worker. If not replaying and this worker has - // a defined Build ID, it will equal that ID. It will also be empty for evict-only activations. - string build_id_for_current_task = 9; + // Set to the deployment version of the worker that processed this task, + // which may be empty. During replay this version may not equal the version + // of the replaying worker. If not replaying and this worker has a defined + // Deployment Version, it will equal that. It will also be empty for + // evict-only activations. The deployment name may be empty, but not the + // build id, if this worker was using the deprecated Build ID-only + // feature(s). + common.WorkerDeploymentVersion deployment_version_for_current_task = 9; } message WorkflowActivationJob { diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index 9c9ec8533..e37484f5c 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -489,7 +489,7 @@ pub mod coresdk { available_internal_flags: vec![], history_size_bytes: 0, continue_as_new_suggested: false, - build_id_for_current_task: "".to_string(), + deployment_version_for_current_task: None, } } diff --git a/sdk/src/workflow_context.rs b/sdk/src/workflow_context.rs index 741a1af59..d7b36f30c 100644 --- a/sdk/src/workflow_context.rs +++ b/sdk/src/workflow_context.rs @@ -27,6 +27,7 @@ use std::{ task::Poll, time::{Duration, SystemTime}, }; +use temporal_sdk_core_api::worker::WorkerDeploymentVersion; use temporal_sdk_core_protos::{ coresdk::{ activity_result::{ActivityResolution, activity_resolution}, @@ -132,10 +133,11 @@ impl WfContext { self.shared.read().history_length } - /// Return the Build ID as it was when this point in the workflow was first reached. If this - /// code is being executed for the first time, return this Worker's Build ID if it has one. - pub fn current_build_id(&self) -> Option { - self.shared.read().current_build_id.clone() + /// Return the deployment version, if any, as it was when this point in the workflow was first + /// reached. If this code is being executed for the first time, return this Worker's deployment + /// version if it has one. + pub fn current_deployment_version(&self) -> Option { + self.shared.read().current_deployment_version.clone() } /// Return current values for workflow search attributes @@ -499,7 +501,7 @@ pub(crate) struct WfContextSharedData { pub(crate) is_replaying: bool, pub(crate) wf_time: Option, pub(crate) history_length: u32, - pub(crate) current_build_id: Option, + pub(crate) current_deployment_version: Option, pub(crate) search_attributes: SearchAttributes, pub(crate) random_seed: u64, } diff --git a/sdk/src/workflow_future.rs b/sdk/src/workflow_future.rs index 4a0f75614..f62ae8f49 100644 --- a/sdk/src/workflow_future.rs +++ b/sdk/src/workflow_future.rs @@ -365,11 +365,9 @@ impl Future for WorkflowFuture { wlock.is_replaying = activation.is_replaying; wlock.wf_time = activation.timestamp.try_into_or_none(); wlock.history_length = activation.history_length; - wlock.current_build_id = if activation.build_id_for_current_task.is_empty() { - None - } else { - Some(activation.build_id_for_current_task) - }; + wlock.current_deployment_version = activation + .deployment_version_for_current_task + .map(Into::into); } let mut die_of_eviction_when_done = false; diff --git a/tests/integ_tests/worker_versioning_tests.rs b/tests/integ_tests/worker_versioning_tests.rs index 97051982d..9e549142b 100644 --- a/tests/integ_tests/worker_versioning_tests.rs +++ b/tests/integ_tests/worker_versioning_tests.rs @@ -27,14 +27,15 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau let wf_type = "sets_deployment_info_on_task_responses"; let mut starter = CoreWfStarter::new(wf_type); let deploy_name = format!("deployment-{}", starter.get_task_queue()); + let version = WorkerDeploymentVersion { + deployment_name: deploy_name.clone(), + build_id: "1.0".to_string(), + }; starter .worker_config .versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased( WorkerDeploymentOptions { - version: WorkerDeploymentVersion { - deployment_name: deploy_name.clone(), - build_id: "1.0".to_string(), - }, + version: version.clone(), use_worker_versioning: true, default_versioning_behavior: VersioningBehavior::AutoUpgrade.into(), }, @@ -47,7 +48,10 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau // we can describe it and then set the current version. let worker_task = async { let res = core.poll_workflow_activation().await.unwrap(); - assert_eq!(res.build_id_for_current_task, "1.0"); + assert_eq!( + version, + res.deployment_version_for_current_task.unwrap().into(), + ); let mut success_complete = workflow_completion::Success::from_variants(vec![ CompleteWorkflowExecution { result: None }.into(), diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index f8cdb3ab6..005f77414 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -33,7 +33,9 @@ use temporal_sdk::{ use temporal_sdk_core::{CoreRuntime, replay::HistoryForReplay}; use temporal_sdk_core_api::{ errors::{PollError, WorkflowErrorType}, - worker::{PollerBehavior, WorkerVersioningStrategy}, + worker::{ + PollerBehavior, WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerVersioningStrategy, + }, }; use temporal_sdk_core_protos::{ coresdk::{ @@ -561,14 +563,27 @@ async fn slow_completes_with_small_cache() { } #[tokio::test] -async fn build_id_correct_in_wf_info() { - let wf_type = "build_id_correct_in_wf_info"; +#[rstest::rstest] +async fn deployment_version_correct_in_wf_info(#[values(true, false)] use_only_build_id: bool) { + let wf_type = "deployment_version_correct_in_wf_info"; let mut starter = CoreWfStarter::new(wf_type); - starter - .worker_config - .versioning_strategy(WorkerVersioningStrategy::None { + let version_strat = if use_only_build_id { + WorkerVersioningStrategy::None { build_id: "1.0".to_owned(), + } + } else { + WorkerVersioningStrategy::WorkerDeploymentBased(WorkerDeploymentOptions { + version: WorkerDeploymentVersion { + deployment_name: "deployment-1".to_string(), + build_id: "1.0".to_string(), + }, + use_worker_versioning: false, + default_versioning_behavior: None, }) + }; + starter + .worker_config + .versioning_strategy(version_strat) .no_remote_activities(true); let core = starter.get_worker().await; starter.start_wf().await; @@ -576,7 +591,21 @@ async fn build_id_correct_in_wf_info() { let workflow_id = starter.get_task_queue().to_string(); let res = core.poll_workflow_activation().await.unwrap(); - assert_eq!(res.build_id_for_current_task, "1.0"); + assert_eq!( + res.deployment_version_for_current_task + .as_ref() + .unwrap() + .build_id, + "1.0" + ); + if !use_only_build_id { + assert_eq!( + res.deployment_version_for_current_task + .unwrap() + .deployment_name, + "deployment-1" + ); + } core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( res.run_id.clone(), vec![], @@ -606,7 +635,21 @@ async fn build_id_correct_in_wf_info() { variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)), }] => q ); - assert_eq!(task.build_id_for_current_task, "1.0"); + assert_eq!( + task.deployment_version_for_current_task + .as_ref() + .unwrap() + .build_id, + "1.0" + ); + if !use_only_build_id { + assert_eq!( + task.deployment_version_for_current_task + .unwrap() + .deployment_name, + "deployment-1" + ); + } core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( task.run_id, QueryResult { @@ -631,11 +674,21 @@ async fn build_id_correct_in_wf_info() { .unwrap(); let mut starter = starter.clone_no_worker(); - starter - .worker_config - .versioning_strategy(WorkerVersioningStrategy::None { + let version_strat = if use_only_build_id { + WorkerVersioningStrategy::None { build_id: "2.0".to_owned(), - }); + } + } else { + WorkerVersioningStrategy::WorkerDeploymentBased(WorkerDeploymentOptions { + version: WorkerDeploymentVersion { + deployment_name: "deployment-1".to_string(), + build_id: "2.0".to_string(), + }, + use_worker_versioning: false, + default_versioning_behavior: None, + }) + }; + starter.worker_config.versioning_strategy(version_strat); let core = starter.get_worker().await; @@ -654,7 +707,21 @@ async fn build_id_correct_in_wf_info() { }; let complete_fut = async { let res = core.poll_workflow_activation().await.unwrap(); - assert_eq!(res.build_id_for_current_task, "1.0"); + assert_eq!( + res.deployment_version_for_current_task + .as_ref() + .unwrap() + .build_id, + "1.0" + ); + if !use_only_build_id { + assert_eq!( + res.deployment_version_for_current_task + .unwrap() + .deployment_name, + "deployment-1" + ); + } core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( res.run_id.clone(), vec![], @@ -668,7 +735,21 @@ async fn build_id_correct_in_wf_info() { variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)), }] => q ); - assert_eq!(task.build_id_for_current_task, "1.0"); + assert_eq!( + task.deployment_version_for_current_task + .as_ref() + .unwrap() + .build_id, + "1.0" + ); + if !use_only_build_id { + assert_eq!( + task.deployment_version_for_current_task + .unwrap() + .deployment_name, + "deployment-1" + ); + } core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( task.run_id, QueryResult { @@ -699,7 +780,21 @@ async fn build_id_correct_in_wf_info() { .unwrap(); let res = core.poll_workflow_activation().await.unwrap(); - assert_eq!(res.build_id_for_current_task, "2.0"); + assert_eq!( + res.deployment_version_for_current_task + .as_ref() + .unwrap() + .build_id, + "2.0" + ); + if !use_only_build_id { + assert_eq!( + res.deployment_version_for_current_task + .unwrap() + .deployment_name, + "deployment-1" + ); + } core.complete_execution(&res.run_id).await; }