Skip to content

Commit 715bd60

Browse files
committed
Attach deployment options in a few missing spots
1 parent 7d4c48a commit 715bd60

3 files changed

Lines changed: 127 additions & 15 deletions

File tree

core/src/worker/client.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -391,9 +391,9 @@ impl WorkerClient for WorkerClientBag {
391391
identity: self.identity.clone(),
392392
namespace: self.namespace.clone(),
393393
worker_version: self.worker_version_stamp(),
394-
// TODO: https://github.com/temporalio/sdk-core/issues/866
394+
// Will never be set, deprecated.
395395
deployment: None,
396-
deployment_options: None,
396+
deployment_options: self.deployment_options(),
397397
},
398398
)
399399
.await?
@@ -449,9 +449,9 @@ impl WorkerClient for WorkerClientBag {
449449
identity: self.identity.clone(),
450450
namespace: self.namespace.clone(),
451451
worker_version: self.worker_version_stamp(),
452-
// TODO: https://github.com/temporalio/sdk-core/issues/866
452+
// Will never be set, deprecated.
453453
deployment: None,
454-
deployment_options: None,
454+
deployment_options: self.deployment_options(),
455455
},
456456
)
457457
.await?
@@ -475,9 +475,9 @@ impl WorkerClient for WorkerClientBag {
475475
// TODO: Implement - https://github.com/temporalio/sdk-core/issues/293
476476
last_heartbeat_details: None,
477477
worker_version: self.worker_version_stamp(),
478-
// TODO: https://github.com/temporalio/sdk-core/issues/866
478+
// Will never be set, deprecated.
479479
deployment: None,
480-
deployment_options: None,
480+
deployment_options: self.deployment_options(),
481481
},
482482
)
483483
.await?
@@ -500,9 +500,9 @@ impl WorkerClient for WorkerClientBag {
500500
namespace: self.namespace.clone(),
501501
messages: vec![],
502502
worker_version: self.worker_version_stamp(),
503-
// TODO: https://github.com/temporalio/sdk-core/issues/866
503+
// Will never be set, deprecated.
504504
deployment: None,
505-
deployment_options: None,
505+
deployment_options: self.deployment_options(),
506506
};
507507
Ok(self
508508
.cloned_client()

test-utils/src/lib.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub mod workflows;
1111
pub use temporal_sdk_core::replay::HistoryForReplay;
1212

1313
use crate::stream::{Stream, TryStreamExt};
14-
use anyhow::{Context, bail};
14+
use anyhow::{Context, Error, bail};
1515
use assert_matches::assert_matches;
1616
use futures_util::{StreamExt, future, stream, stream::FuturesUnordered};
1717
use parking_lot::Mutex;
@@ -27,8 +27,9 @@ use std::{
2727
time::{Duration, Instant},
2828
};
2929
use temporal_client::{
30-
Client, ClientTlsConfig, NamespacedClient, RetryClient, TlsConfig, WfClientExt,
31-
WorkflowClientTrait, WorkflowExecutionInfo, WorkflowHandle, WorkflowOptions,
30+
Client, ClientTlsConfig, GetWorkflowResultOpts, NamespacedClient, RetryClient, TlsConfig,
31+
WfClientExt, WorkflowClientTrait, WorkflowExecutionInfo, WorkflowExecutionResult,
32+
WorkflowHandle, WorkflowOptions,
3233
};
3334
use temporal_sdk::{
3435
IntoActivityFunc, Worker, WorkflowFunction,
@@ -339,6 +340,18 @@ impl CoreWfStarter {
339340
.unwrap()
340341
}
341342

343+
pub async fn wait_for_default_wf_finish(
344+
&self,
345+
) -> Result<WorkflowExecutionResult<Vec<Payload>>, Error> {
346+
self.initted_worker
347+
.get()
348+
.unwrap()
349+
.client
350+
.get_untyped_workflow_handle(self.get_wf_id().to_string(), "")
351+
.get_workflow_result(GetWorkflowResultOpts { follow_runs: false })
352+
.await
353+
}
354+
342355
async fn get_or_init(&mut self) -> &InitializedWorker {
343356
self.initted_worker
344357
.get_or_init(|| async {

tests/integ_tests/worker_versioning_tests.rs

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
use crate::integ_tests::activity_functions::echo;
12
use std::time::Duration;
2-
use temporal_client::{NamespacedClient, WorkflowService};
3+
use temporal_client::{NamespacedClient, WorkflowOptions, WorkflowService};
4+
use temporal_sdk::{ActivityOptions, WfContext};
35
use temporal_sdk_core_api::worker::{
46
WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerVersioningStrategy,
57
};
68
use temporal_sdk_core_protos::{
79
coresdk::{
8-
workflow_commands::CompleteWorkflowExecution, workflow_completion,
10+
AsJsonPayloadExt, workflow_commands::CompleteWorkflowExecution, workflow_completion,
911
workflow_completion::WorkflowActivationCompletion,
1012
},
1113
temporal::api::{
@@ -36,8 +38,7 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau
3638
use_worker_versioning: true,
3739
default_versioning_behavior: VersioningBehavior::AutoUpgrade.into(),
3840
},
39-
))
40-
.no_remote_activities(true);
41+
));
4142
let core = starter.get_worker().await;
4243
let client = starter.get_client().await;
4344

@@ -129,3 +130,101 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau
129130
format!("{}.1.0", deploy_name)
130131
);
131132
}
133+
134+
#[tokio::test]
135+
async fn activity_has_deployment_stamp() {
136+
let wf_name = "activity_has_deployment_stamp";
137+
let mut starter = CoreWfStarter::new(wf_name);
138+
let deploy_name = format!("deployment-{}", starter.get_task_queue());
139+
starter
140+
.worker_config
141+
.versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
142+
WorkerDeploymentOptions {
143+
version: WorkerDeploymentVersion {
144+
deployment_name: deploy_name.clone(),
145+
build_id: "1.0".to_string(),
146+
},
147+
use_worker_versioning: true,
148+
default_versioning_behavior: VersioningBehavior::AutoUpgrade.into(),
149+
},
150+
));
151+
let mut worker = starter.worker().await;
152+
let client = starter.get_client().await;
153+
worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {
154+
ctx.activity(ActivityOptions {
155+
activity_type: "echo_activity".to_string(),
156+
start_to_close_timeout: Some(Duration::from_secs(5)),
157+
input: "hi!".as_json_payload().expect("serializes fine"),
158+
..Default::default()
159+
})
160+
.await;
161+
Ok(().into())
162+
});
163+
worker.register_activity("echo_activity", echo);
164+
let submitter = worker.get_submitter_handle();
165+
let shutdown_handle = worker.inner_mut().shutdown_handle();
166+
167+
let client_task = async {
168+
let desc_resp = eventually(
169+
async || {
170+
client
171+
.get_client()
172+
.clone()
173+
.describe_worker_deployment(DescribeWorkerDeploymentRequest {
174+
namespace: client.namespace().to_string(),
175+
deployment_name: deploy_name.clone(),
176+
})
177+
.await
178+
},
179+
Duration::from_secs(50),
180+
)
181+
.await
182+
.unwrap()
183+
.into_inner();
184+
185+
client
186+
.get_client()
187+
.clone()
188+
.set_worker_deployment_current_version(SetWorkerDeploymentCurrentVersionRequest {
189+
namespace: client.namespace().to_owned(),
190+
deployment_name: deploy_name.clone(),
191+
version: format!("{}.1.0", deploy_name),
192+
conflict_token: desc_resp.conflict_token,
193+
..Default::default()
194+
})
195+
.await
196+
.unwrap();
197+
198+
submitter
199+
.submit_wf(
200+
starter.get_wf_id(),
201+
wf_name.to_owned(),
202+
vec![],
203+
WorkflowOptions::default(),
204+
)
205+
.await
206+
.unwrap();
207+
starter.wait_for_default_wf_finish().await.unwrap();
208+
shutdown_handle();
209+
};
210+
join!(
211+
async {
212+
worker.inner_mut().run().await.unwrap();
213+
},
214+
client_task
215+
);
216+
let hist = starter.get_history().await;
217+
let _activity_completed = hist
218+
.events
219+
.into_iter()
220+
.find_map(|e| {
221+
if let Attributes::ActivityTaskCompletedEventAttributes(a) = e.attributes.unwrap() {
222+
Some(a)
223+
} else {
224+
None
225+
}
226+
})
227+
.unwrap();
228+
// TODO: Can't actually verify this at the moment as the deployment options are not transferred
229+
// to the event.
230+
}

0 commit comments

Comments
 (0)