Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions core/src/worker/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,9 @@ impl WorkerClient for WorkerClientBag {
identity: self.identity.clone(),
namespace: self.namespace.clone(),
worker_version: self.worker_version_stamp(),
// TODO: https://github.com/temporalio/sdk-core/issues/866
// Will never be set, deprecated.
deployment: None,
deployment_options: None,
deployment_options: self.deployment_options(),
},
)
.await?
Expand Down Expand Up @@ -449,9 +449,9 @@ impl WorkerClient for WorkerClientBag {
identity: self.identity.clone(),
namespace: self.namespace.clone(),
worker_version: self.worker_version_stamp(),
// TODO: https://github.com/temporalio/sdk-core/issues/866
// Will never be set, deprecated.
deployment: None,
deployment_options: None,
deployment_options: self.deployment_options(),
},
)
.await?
Expand All @@ -475,9 +475,9 @@ impl WorkerClient for WorkerClientBag {
// TODO: Implement - https://github.com/temporalio/sdk-core/issues/293
last_heartbeat_details: None,
worker_version: self.worker_version_stamp(),
// TODO: https://github.com/temporalio/sdk-core/issues/866
// Will never be set, deprecated.
deployment: None,
deployment_options: None,
deployment_options: self.deployment_options(),
},
)
.await?
Expand All @@ -500,9 +500,9 @@ impl WorkerClient for WorkerClientBag {
namespace: self.namespace.clone(),
messages: vec![],
worker_version: self.worker_version_stamp(),
// TODO: https://github.com/temporalio/sdk-core/issues/866
// Will never be set, deprecated.
deployment: None,
deployment_options: None,
deployment_options: self.deployment_options(),
};
Ok(self
.cloned_client()
Expand Down
19 changes: 16 additions & 3 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub mod workflows;
pub use temporal_sdk_core::replay::HistoryForReplay;

use crate::stream::{Stream, TryStreamExt};
use anyhow::{Context, bail};
use anyhow::{Context, Error, bail};
use assert_matches::assert_matches;
use futures_util::{StreamExt, future, stream, stream::FuturesUnordered};
use parking_lot::Mutex;
Expand All @@ -27,8 +27,9 @@ use std::{
time::{Duration, Instant},
};
use temporal_client::{
Client, ClientTlsConfig, NamespacedClient, RetryClient, TlsConfig, WfClientExt,
WorkflowClientTrait, WorkflowExecutionInfo, WorkflowHandle, WorkflowOptions,
Client, ClientTlsConfig, GetWorkflowResultOpts, NamespacedClient, RetryClient, TlsConfig,
WfClientExt, WorkflowClientTrait, WorkflowExecutionInfo, WorkflowExecutionResult,
WorkflowHandle, WorkflowOptions,
};
use temporal_sdk::{
IntoActivityFunc, Worker, WorkflowFunction,
Expand Down Expand Up @@ -339,6 +340,18 @@ impl CoreWfStarter {
.unwrap()
}

pub async fn wait_for_default_wf_finish(
&self,
) -> Result<WorkflowExecutionResult<Vec<Payload>>, Error> {
self.initted_worker
.get()
.unwrap()
.client
.get_untyped_workflow_handle(self.get_wf_id().to_string(), "")
.get_workflow_result(GetWorkflowResultOpts { follow_runs: false })
.await
}

async fn get_or_init(&mut self) -> &InitializedWorker {
self.initted_worker
.get_or_init(|| async {
Expand Down
107 changes: 103 additions & 4 deletions tests/integ_tests/worker_versioning_tests.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::integ_tests::activity_functions::echo;
use std::time::Duration;
use temporal_client::{NamespacedClient, WorkflowService};
use temporal_client::{NamespacedClient, WorkflowOptions, WorkflowService};
use temporal_sdk::{ActivityOptions, WfContext};
use temporal_sdk_core_api::worker::{
WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerVersioningStrategy,
};
use temporal_sdk_core_protos::{
coresdk::{
workflow_commands::CompleteWorkflowExecution, workflow_completion,
AsJsonPayloadExt, workflow_commands::CompleteWorkflowExecution, workflow_completion,
workflow_completion::WorkflowActivationCompletion,
},
temporal::api::{
Expand Down Expand Up @@ -36,8 +38,7 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau
use_worker_versioning: true,
default_versioning_behavior: VersioningBehavior::AutoUpgrade.into(),
},
))
.no_remote_activities(true);
));
let core = starter.get_worker().await;
let client = starter.get_client().await;

Expand Down Expand Up @@ -129,3 +130,101 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau
format!("{}.1.0", deploy_name)
);
}

#[tokio::test]
async fn activity_has_deployment_stamp() {
let wf_name = "activity_has_deployment_stamp";
let mut starter = CoreWfStarter::new(wf_name);
let deploy_name = format!("deployment-{}", starter.get_task_queue());
starter
.worker_config
.versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
WorkerDeploymentOptions {
version: WorkerDeploymentVersion {
deployment_name: deploy_name.clone(),
build_id: "1.0".to_string(),
},
use_worker_versioning: true,
default_versioning_behavior: VersioningBehavior::AutoUpgrade.into(),
},
));
let mut worker = starter.worker().await;
let client = starter.get_client().await;
worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {
ctx.activity(ActivityOptions {
activity_type: "echo_activity".to_string(),
start_to_close_timeout: Some(Duration::from_secs(5)),
input: "hi!".as_json_payload().expect("serializes fine"),
..Default::default()
})
.await;
Ok(().into())
});
worker.register_activity("echo_activity", echo);
let submitter = worker.get_submitter_handle();
let shutdown_handle = worker.inner_mut().shutdown_handle();

let client_task = async {
let desc_resp = eventually(
async || {
client
.get_client()
.clone()
.describe_worker_deployment(DescribeWorkerDeploymentRequest {
namespace: client.namespace().to_string(),
deployment_name: deploy_name.clone(),
})
.await
},
Duration::from_secs(50),
)
.await
.unwrap()
.into_inner();

client
.get_client()
.clone()
.set_worker_deployment_current_version(SetWorkerDeploymentCurrentVersionRequest {
namespace: client.namespace().to_owned(),
deployment_name: deploy_name.clone(),
version: format!("{}.1.0", deploy_name),
conflict_token: desc_resp.conflict_token,
..Default::default()
})
.await
.unwrap();

submitter
.submit_wf(
starter.get_wf_id(),
wf_name.to_owned(),
vec![],
WorkflowOptions::default(),
)
.await
.unwrap();
starter.wait_for_default_wf_finish().await.unwrap();
shutdown_handle();
};
join!(
async {
worker.inner_mut().run().await.unwrap();
},
client_task
);
let hist = starter.get_history().await;
let _activity_completed = hist
.events
.into_iter()
.find_map(|e| {
if let Attributes::ActivityTaskCompletedEventAttributes(a) = e.attributes.unwrap() {
Some(a)
} else {
None
}
})
.unwrap();
// TODO: Can't actually verify this at the moment as the deployment options are not transferred
// to the event.
}
Loading