Skip to content

Commit 15ea20f

Browse files
authored
Attach deployment options in a few missing spots (#898)
1 parent 7d4c48a commit 15ea20f

3 files changed

Lines changed: 126 additions & 13 deletions

File tree

core/src/worker/client.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -391,9 +391,9 @@ impl WorkerClient for WorkerClientBag {
391391
identity: self.identity.clone(),
392392
namespace: self.namespace.clone(),
393393
worker_version: self.worker_version_stamp(),
394-
// TODO: https://github.com/temporalio/sdk-core/issues/866
394+
// Will never be set, deprecated.
395395
deployment: None,
396-
deployment_options: None,
396+
deployment_options: self.deployment_options(),
397397
},
398398
)
399399
.await?
@@ -449,9 +449,9 @@ impl WorkerClient for WorkerClientBag {
449449
identity: self.identity.clone(),
450450
namespace: self.namespace.clone(),
451451
worker_version: self.worker_version_stamp(),
452-
// TODO: https://github.com/temporalio/sdk-core/issues/866
452+
// Will never be set, deprecated.
453453
deployment: None,
454-
deployment_options: None,
454+
deployment_options: self.deployment_options(),
455455
},
456456
)
457457
.await?
@@ -475,9 +475,9 @@ impl WorkerClient for WorkerClientBag {
475475
// TODO: Implement - https://github.com/temporalio/sdk-core/issues/293
476476
last_heartbeat_details: None,
477477
worker_version: self.worker_version_stamp(),
478-
// TODO: https://github.com/temporalio/sdk-core/issues/866
478+
// Will never be set, deprecated.
479479
deployment: None,
480-
deployment_options: None,
480+
deployment_options: self.deployment_options(),
481481
},
482482
)
483483
.await?
@@ -500,9 +500,9 @@ impl WorkerClient for WorkerClientBag {
500500
namespace: self.namespace.clone(),
501501
messages: vec![],
502502
worker_version: self.worker_version_stamp(),
503-
// TODO: https://github.com/temporalio/sdk-core/issues/866
503+
// Will never be set, deprecated.
504504
deployment: None,
505-
deployment_options: None,
505+
deployment_options: self.deployment_options(),
506506
};
507507
Ok(self
508508
.cloned_client()

test-utils/src/lib.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub mod workflows;
1111
pub use temporal_sdk_core::replay::HistoryForReplay;
1212

1313
use crate::stream::{Stream, TryStreamExt};
14-
use anyhow::{Context, bail};
14+
use anyhow::{Context, Error, bail};
1515
use assert_matches::assert_matches;
1616
use futures_util::{StreamExt, future, stream, stream::FuturesUnordered};
1717
use parking_lot::Mutex;
@@ -27,8 +27,9 @@ use std::{
2727
time::{Duration, Instant},
2828
};
2929
use temporal_client::{
30-
Client, ClientTlsConfig, NamespacedClient, RetryClient, TlsConfig, WfClientExt,
31-
WorkflowClientTrait, WorkflowExecutionInfo, WorkflowHandle, WorkflowOptions,
30+
Client, ClientTlsConfig, GetWorkflowResultOpts, NamespacedClient, RetryClient, TlsConfig,
31+
WfClientExt, WorkflowClientTrait, WorkflowExecutionInfo, WorkflowExecutionResult,
32+
WorkflowHandle, WorkflowOptions,
3233
};
3334
use temporal_sdk::{
3435
IntoActivityFunc, Worker, WorkflowFunction,
@@ -339,6 +340,18 @@ impl CoreWfStarter {
339340
.unwrap()
340341
}
341342

343+
pub async fn wait_for_default_wf_finish(
344+
&self,
345+
) -> Result<WorkflowExecutionResult<Vec<Payload>>, Error> {
346+
self.initted_worker
347+
.get()
348+
.unwrap()
349+
.client
350+
.get_untyped_workflow_handle(self.get_wf_id().to_string(), "")
351+
.get_workflow_result(GetWorkflowResultOpts { follow_runs: false })
352+
.await
353+
}
354+
342355
async fn get_or_init(&mut self) -> &InitializedWorker {
343356
self.initted_worker
344357
.get_or_init(|| async {

tests/integ_tests/worker_versioning_tests.rs

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
use crate::integ_tests::activity_functions::echo;
12
use std::time::Duration;
2-
use temporal_client::{NamespacedClient, WorkflowService};
3+
use temporal_client::{NamespacedClient, WorkflowOptions, WorkflowService};
4+
use temporal_sdk::{ActivityOptions, WfContext};
35
use temporal_sdk_core_api::worker::{
46
WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerVersioningStrategy,
57
};
68
use temporal_sdk_core_protos::{
79
coresdk::{
8-
workflow_commands::CompleteWorkflowExecution, workflow_completion,
10+
AsJsonPayloadExt, workflow_commands::CompleteWorkflowExecution, workflow_completion,
911
workflow_completion::WorkflowActivationCompletion,
1012
},
1113
temporal::api::{
@@ -129,3 +131,101 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau
129131
format!("{}.1.0", deploy_name)
130132
);
131133
}
134+
135+
#[tokio::test]
136+
async fn activity_has_deployment_stamp() {
137+
let wf_name = "activity_has_deployment_stamp";
138+
let mut starter = CoreWfStarter::new(wf_name);
139+
let deploy_name = format!("deployment-{}", starter.get_task_queue());
140+
starter
141+
.worker_config
142+
.versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
143+
WorkerDeploymentOptions {
144+
version: WorkerDeploymentVersion {
145+
deployment_name: deploy_name.clone(),
146+
build_id: "1.0".to_string(),
147+
},
148+
use_worker_versioning: true,
149+
default_versioning_behavior: VersioningBehavior::AutoUpgrade.into(),
150+
},
151+
));
152+
let mut worker = starter.worker().await;
153+
let client = starter.get_client().await;
154+
worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {
155+
ctx.activity(ActivityOptions {
156+
activity_type: "echo_activity".to_string(),
157+
start_to_close_timeout: Some(Duration::from_secs(5)),
158+
input: "hi!".as_json_payload().expect("serializes fine"),
159+
..Default::default()
160+
})
161+
.await;
162+
Ok(().into())
163+
});
164+
worker.register_activity("echo_activity", echo);
165+
let submitter = worker.get_submitter_handle();
166+
let shutdown_handle = worker.inner_mut().shutdown_handle();
167+
168+
let client_task = async {
169+
let desc_resp = eventually(
170+
async || {
171+
client
172+
.get_client()
173+
.clone()
174+
.describe_worker_deployment(DescribeWorkerDeploymentRequest {
175+
namespace: client.namespace().to_string(),
176+
deployment_name: deploy_name.clone(),
177+
})
178+
.await
179+
},
180+
Duration::from_secs(50),
181+
)
182+
.await
183+
.unwrap()
184+
.into_inner();
185+
186+
client
187+
.get_client()
188+
.clone()
189+
.set_worker_deployment_current_version(SetWorkerDeploymentCurrentVersionRequest {
190+
namespace: client.namespace().to_owned(),
191+
deployment_name: deploy_name.clone(),
192+
version: format!("{}.1.0", deploy_name),
193+
conflict_token: desc_resp.conflict_token,
194+
..Default::default()
195+
})
196+
.await
197+
.unwrap();
198+
199+
submitter
200+
.submit_wf(
201+
starter.get_wf_id(),
202+
wf_name.to_owned(),
203+
vec![],
204+
WorkflowOptions::default(),
205+
)
206+
.await
207+
.unwrap();
208+
starter.wait_for_default_wf_finish().await.unwrap();
209+
shutdown_handle();
210+
};
211+
join!(
212+
async {
213+
worker.inner_mut().run().await.unwrap();
214+
},
215+
client_task
216+
);
217+
let hist = starter.get_history().await;
218+
let _activity_completed = hist
219+
.events
220+
.into_iter()
221+
.find_map(|e| {
222+
if let Attributes::ActivityTaskCompletedEventAttributes(a) = e.attributes.unwrap() {
223+
Some(a)
224+
} else {
225+
None
226+
}
227+
})
228+
.unwrap();
229+
// TODO: Can't actually verify this at the moment as the deployment options are not transferred
230+
// to the event.
231+
}

0 commit comments

Comments
 (0)