diff --git a/core/src/worker/client.rs b/core/src/worker/client.rs index 32741324d..184833560 100644 --- a/core/src/worker/client.rs +++ b/core/src/worker/client.rs @@ -391,9 +391,9 @@ impl WorkerClient for WorkerClientBag { identity: self.identity.clone(), namespace: self.namespace.clone(), worker_version: self.worker_version_stamp(), - // TODO: https://github.com/temporalio/sdk-core/issues/866 + // Will never be set, deprecated. deployment: None, - deployment_options: None, + deployment_options: self.deployment_options(), }, ) .await? @@ -449,9 +449,9 @@ impl WorkerClient for WorkerClientBag { identity: self.identity.clone(), namespace: self.namespace.clone(), worker_version: self.worker_version_stamp(), - // TODO: https://github.com/temporalio/sdk-core/issues/866 + // Will never be set, deprecated. deployment: None, - deployment_options: None, + deployment_options: self.deployment_options(), }, ) .await? @@ -475,9 +475,9 @@ impl WorkerClient for WorkerClientBag { // TODO: Implement - https://github.com/temporalio/sdk-core/issues/293 last_heartbeat_details: None, worker_version: self.worker_version_stamp(), - // TODO: https://github.com/temporalio/sdk-core/issues/866 + // Will never be set, deprecated. deployment: None, - deployment_options: None, + deployment_options: self.deployment_options(), }, ) .await? @@ -500,9 +500,9 @@ impl WorkerClient for WorkerClientBag { namespace: self.namespace.clone(), messages: vec![], worker_version: self.worker_version_stamp(), - // TODO: https://github.com/temporalio/sdk-core/issues/866 + // Will never be set, deprecated. deployment: None, - deployment_options: None, + deployment_options: self.deployment_options(), }; Ok(self .cloned_client() diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 3dc37e4f3..805d39648 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -11,7 +11,7 @@ pub mod workflows; pub use temporal_sdk_core::replay::HistoryForReplay; use crate::stream::{Stream, TryStreamExt}; -use anyhow::{Context, bail}; +use anyhow::{Context, Error, bail}; use assert_matches::assert_matches; use futures_util::{StreamExt, future, stream, stream::FuturesUnordered}; use parking_lot::Mutex; @@ -27,8 +27,9 @@ use std::{ time::{Duration, Instant}, }; use temporal_client::{ - Client, ClientTlsConfig, NamespacedClient, RetryClient, TlsConfig, WfClientExt, - WorkflowClientTrait, WorkflowExecutionInfo, WorkflowHandle, WorkflowOptions, + Client, ClientTlsConfig, GetWorkflowResultOpts, NamespacedClient, RetryClient, TlsConfig, + WfClientExt, WorkflowClientTrait, WorkflowExecutionInfo, WorkflowExecutionResult, + WorkflowHandle, WorkflowOptions, }; use temporal_sdk::{ IntoActivityFunc, Worker, WorkflowFunction, @@ -339,6 +340,18 @@ impl CoreWfStarter { .unwrap() } + pub async fn wait_for_default_wf_finish( + &self, + ) -> Result>, Error> { + self.initted_worker + .get() + .unwrap() + .client + .get_untyped_workflow_handle(self.get_wf_id().to_string(), "") + .get_workflow_result(GetWorkflowResultOpts { follow_runs: false }) + .await + } + async fn get_or_init(&mut self) -> &InitializedWorker { self.initted_worker .get_or_init(|| async { diff --git a/tests/integ_tests/worker_versioning_tests.rs b/tests/integ_tests/worker_versioning_tests.rs index 4f63d03d9..97051982d 100644 --- a/tests/integ_tests/worker_versioning_tests.rs +++ b/tests/integ_tests/worker_versioning_tests.rs @@ -1,11 +1,13 @@ +use crate::integ_tests::activity_functions::echo; use std::time::Duration; -use temporal_client::{NamespacedClient, WorkflowService}; +use temporal_client::{NamespacedClient, WorkflowOptions, WorkflowService}; +use temporal_sdk::{ActivityOptions, WfContext}; use temporal_sdk_core_api::worker::{ WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerVersioningStrategy, }; use temporal_sdk_core_protos::{ coresdk::{ - workflow_commands::CompleteWorkflowExecution, workflow_completion, + AsJsonPayloadExt, workflow_commands::CompleteWorkflowExecution, workflow_completion, workflow_completion::WorkflowActivationCompletion, }, temporal::api::{ @@ -129,3 +131,101 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau format!("{}.1.0", deploy_name) ); } + +#[tokio::test] +async fn activity_has_deployment_stamp() { + let wf_name = "activity_has_deployment_stamp"; + let mut starter = CoreWfStarter::new(wf_name); + let deploy_name = format!("deployment-{}", starter.get_task_queue()); + starter + .worker_config + .versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased( + WorkerDeploymentOptions { + version: WorkerDeploymentVersion { + deployment_name: deploy_name.clone(), + build_id: "1.0".to_string(), + }, + use_worker_versioning: true, + default_versioning_behavior: VersioningBehavior::AutoUpgrade.into(), + }, + )); + let mut worker = starter.worker().await; + let client = starter.get_client().await; + worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { + ctx.activity(ActivityOptions { + activity_type: "echo_activity".to_string(), + start_to_close_timeout: Some(Duration::from_secs(5)), + input: "hi!".as_json_payload().expect("serializes fine"), + ..Default::default() + }) + .await; + Ok(().into()) + }); + worker.register_activity("echo_activity", echo); + let submitter = worker.get_submitter_handle(); + let shutdown_handle = worker.inner_mut().shutdown_handle(); + + let client_task = async { + let desc_resp = eventually( + async || { + client + .get_client() + .clone() + .describe_worker_deployment(DescribeWorkerDeploymentRequest { + namespace: client.namespace().to_string(), + deployment_name: deploy_name.clone(), + }) + .await + }, + Duration::from_secs(50), + ) + .await + .unwrap() + .into_inner(); + + client + .get_client() + .clone() + .set_worker_deployment_current_version(SetWorkerDeploymentCurrentVersionRequest { + namespace: client.namespace().to_owned(), + deployment_name: deploy_name.clone(), + version: format!("{}.1.0", deploy_name), + conflict_token: desc_resp.conflict_token, + ..Default::default() + }) + .await + .unwrap(); + + submitter + .submit_wf( + starter.get_wf_id(), + wf_name.to_owned(), + vec![], + WorkflowOptions::default(), + ) + .await + .unwrap(); + starter.wait_for_default_wf_finish().await.unwrap(); + shutdown_handle(); + }; + join!( + async { + worker.inner_mut().run().await.unwrap(); + }, + client_task + ); + let hist = starter.get_history().await; + let _activity_completed = hist + .events + .into_iter() + .find_map(|e| { + if let Attributes::ActivityTaskCompletedEventAttributes(a) = e.attributes.unwrap() { + Some(a) + } else { + None + } + }) + .unwrap(); + // TODO: Can't actually verify this at the moment as the deployment options are not transferred + // to the event. +}