Skip to content

Commit 52e0923

Browse files
committed
Attach deployment version to activations instead of just build id
1 parent 15ea20f commit 52e0923

11 files changed

Lines changed: 226 additions & 59 deletions

File tree

core-api/src/worker.rs

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ use crate::{errors::WorkflowErrorType, telemetry::metrics::TemporalMeter};
22
use std::{
33
any::Any,
44
collections::{HashMap, HashSet},
5+
str::FromStr,
56
sync::Arc,
67
time::Duration,
78
};
89
use temporal_sdk_core_protos::{
10+
coresdk,
911
coresdk::{ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo, WorkflowSlotInfo},
1012
temporal::api::enums::v1::VersioningBehavior,
1113
};
@@ -175,8 +177,21 @@ impl WorkerConfig {
175177
.unwrap_or(false)
176178
}
177179

178-
pub fn build_id(&self) -> &str {
179-
self.versioning_strategy.build_id()
180+
pub fn computed_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
181+
let wdv = match self.versioning_strategy {
182+
WorkerVersioningStrategy::None { ref build_id } => WorkerDeploymentVersion {
183+
deployment_name: "".to_owned(),
184+
build_id: build_id.clone(),
185+
},
186+
WorkerVersioningStrategy::WorkerDeploymentBased(ref opts) => opts.version.clone(),
187+
WorkerVersioningStrategy::LegacyBuildIdBased { ref build_id } => {
188+
WorkerDeploymentVersion {
189+
deployment_name: "".to_owned(),
190+
build_id: build_id.clone(),
191+
}
192+
}
193+
};
194+
if wdv.is_empty() { None } else { Some(wdv) }
180195
}
181196
}
182197

@@ -316,9 +331,6 @@ pub trait SlotReservationContext: Send + Sync {
316331
/// Returns the identity of the worker
317332
fn worker_identity(&self) -> &str;
318333

319-
/// Returns the build id of the worker
320-
fn worker_build_id(&self) -> &str;
321-
322334
/// Returns the number of currently outstanding slot permits, whether used or un-used.
323335
fn num_issued_slots(&self) -> usize;
324336

@@ -569,3 +581,41 @@ pub struct WorkerDeploymentVersion {
569581
/// Build ID for the worker.
570582
pub build_id: String,
571583
}
584+
585+
impl WorkerDeploymentVersion {
586+
pub fn is_empty(&self) -> bool {
587+
self.deployment_name.is_empty() && self.build_id.is_empty()
588+
}
589+
}
590+
591+
impl FromStr for WorkerDeploymentVersion {
592+
type Err = ();
593+
594+
fn from_str(s: &str) -> Result<Self, Self::Err> {
595+
match s.split_once('.') {
596+
Some((name, build_id)) => Ok(WorkerDeploymentVersion {
597+
deployment_name: name.to_owned(),
598+
build_id: build_id.to_owned(),
599+
}),
600+
_ => Err(()),
601+
}
602+
}
603+
}
604+
605+
impl From<WorkerDeploymentVersion> for coresdk::common::WorkerDeploymentVersion {
606+
fn from(v: WorkerDeploymentVersion) -> coresdk::common::WorkerDeploymentVersion {
607+
coresdk::common::WorkerDeploymentVersion {
608+
deployment_name: v.deployment_name,
609+
build_id: v.build_id,
610+
}
611+
}
612+
}
613+
614+
impl From<coresdk::common::WorkerDeploymentVersion> for WorkerDeploymentVersion {
615+
fn from(v: coresdk::common::WorkerDeploymentVersion) -> WorkerDeploymentVersion {
616+
WorkerDeploymentVersion {
617+
deployment_name: v.deployment_name,
618+
build_id: v.build_id,
619+
}
620+
}
621+
}

core/src/abstractions.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ pub(crate) struct MeteredPermitDealer<SK: SlotKind> {
4747
pub(crate) struct PermitDealerContextData {
4848
pub(crate) task_queue: String,
4949
pub(crate) worker_identity: String,
50-
pub(crate) worker_build_id: String,
5150
}
5251

5352
impl<SK> MeteredPermitDealer<SK>
@@ -171,10 +170,6 @@ impl<SK: SlotKind> SlotReservationContext for MeteredPermitDealer<SK> {
171170
&self.context_data.worker_identity
172171
}
173172

174-
fn worker_build_id(&self) -> &str {
175-
&self.context_data.worker_build_id
176-
}
177-
178173
fn num_issued_slots(&self) -> usize {
179174
*self.extant_permits.1.borrow()
180175
}

core/src/worker/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,6 @@ impl Worker {
325325
let slot_context_data = Arc::new(PermitDealerContextData {
326326
task_queue: config.task_queue.clone(),
327327
worker_identity: config.client_identity_override.clone().unwrap_or_default(),
328-
worker_build_id: config.build_id().to_owned(),
329328
});
330329
let wft_slots = MeteredPermitDealer::new(
331330
tuner.workflow_task_slot_supplier(),

core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use std::{
5151
sync::Arc,
5252
time::{Duration, Instant, SystemTime},
5353
};
54-
use temporal_sdk_core_api::worker::WorkerConfig;
54+
use temporal_sdk_core_api::worker::{WorkerConfig, WorkerDeploymentVersion};
5555
use temporal_sdk_core_protos::{
5656
coresdk::{
5757
common::{NamespacedWorkflowExecution, VersioningIntent},
@@ -123,8 +123,9 @@ pub(crate) struct WorkflowMachines {
123123
history_size_bytes: u64,
124124
/// Set on each WFT started event
125125
continue_as_new_suggested: bool,
126-
/// Set if the current WFT is already complete and that completion event had a build id in it.
127-
current_wft_build_id: Option<String>,
126+
/// Set if the current WFT is already complete and that completion event had legacy build-id
127+
/// or a deployment version in it. Will use an empty deployment name if it's legacy build-id.
128+
current_wft_deployment_info: Option<WorkerDeploymentVersion>,
128129

129130
all_machines: SlotMap<MachineKey, Machines>,
130131
/// If a machine key is in this map, that machine was created internally by core, not as a
@@ -286,7 +287,7 @@ impl WorkflowMachines {
286287
observed_internal_flags: Rc::new(RefCell::new(observed_internal_flags)),
287288
history_size_bytes: 0,
288289
continue_as_new_suggested: false,
289-
current_wft_build_id: None,
290+
current_wft_deployment_info: None,
290291
all_machines: Default::default(),
291292
machine_is_core_created: Default::default(),
292293
machines_by_event_id: Default::default(),
@@ -442,11 +443,11 @@ impl WorkflowMachines {
442443
)
443444
});
444445
let is_replaying = self.replaying || all_query;
445-
let build_id_for_current_task = if is_replaying {
446-
self.current_wft_build_id.clone().unwrap_or_default()
446+
let deployment_version_for_current_task = if is_replaying {
447+
self.current_wft_deployment_info.clone()
447448
} else {
448-
self.current_wft_build_id = Some(self.worker_config.build_id().to_owned());
449-
self.worker_config.build_id().to_owned()
449+
self.current_wft_deployment_info = self.worker_config.computed_deployment_version();
450+
self.current_wft_deployment_info.clone()
450451
};
451452
WorkflowActivation {
452453
timestamp: self.current_wf_time.map(Into::into),
@@ -460,7 +461,8 @@ impl WorkflowMachines {
460461
.collect(),
461462
history_size_bytes: self.history_size_bytes,
462463
continue_as_new_suggested: self.continue_as_new_suggested,
463-
build_id_for_current_task,
464+
deployment_version_for_current_task: deployment_version_for_current_task
465+
.map(Into::into),
464466
}
465467
}
466468

@@ -479,9 +481,7 @@ impl WorkflowMachines {
479481
// If this worker has a build ID and we're completing the task, we want to say our ID is the
480482
// current build ID, so that if we get a query before any new history, we properly can
481483
// report that our ID was the one used for the completion.
482-
if !self.worker_config.build_id().is_empty() {
483-
self.current_wft_build_id = Some(self.worker_config.build_id().to_owned());
484-
}
484+
self.current_wft_deployment_info = self.worker_config.computed_deployment_version();
485485
(*self.observed_internal_flags)
486486
.borrow_mut()
487487
.gather_for_wft_complete()
@@ -587,8 +587,23 @@ impl WorkflowMachines {
587587
(*$me.observed_internal_flags)
588588
.borrow_mut()
589589
.add_from_complete($wtc);
590+
let mut combined_ver = WorkerDeploymentVersion {
591+
deployment_name: "".to_string(),
592+
build_id: "".to_string(),
593+
};
590594
if let Some(bid) = $wtc.worker_version.as_ref().map(|wv| &wv.build_id) {
591-
$me.current_wft_build_id = Some(bid.to_string());
595+
combined_ver.build_id = bid.to_string();
596+
}
597+
if !$wtc.worker_deployment_name.is_empty() {
598+
combined_ver.deployment_name = $wtc.worker_deployment_name.clone();
599+
}
600+
if !$wtc.worker_deployment_version.is_empty() {
601+
if let Ok(ver) = $wtc.worker_deployment_version.parse() {
602+
combined_ver = ver;
603+
}
604+
}
605+
if !combined_ver.is_empty() {
606+
$me.current_wft_deployment_info = Some(combined_ver);
592607
}
593608
}};
594609
}

sdk-core-protos/protos/local/temporal/sdk/core/common/common.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,9 @@ enum VersioningIntent {
2929
// Indicates that the command should run on the target task queue's current overall-default
3030
// build ID.
3131
DEFAULT = 2;
32+
}
33+
34+
message WorkerDeploymentVersion {
35+
string deployment_name = 1;
36+
string build_id = 2;
3237
}

sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,14 @@ message WorkflowActivation {
8484
uint64 history_size_bytes = 7;
8585
// Set true if the most recent WFT started event had this suggestion
8686
bool continue_as_new_suggested = 8;
87-
// Set to the Build ID of the worker that processed this task, which may be empty. During replay
88-
// this id may not equal the id of the replaying worker. If not replaying and this worker has
89-
// a defined Build ID, it will equal that ID. It will also be empty for evict-only activations.
90-
string build_id_for_current_task = 9;
87+
// Set to the deployment version of the worker that processed this task,
88+
// which may be empty. During replay this version may not equal the version
89+
// of the replaying worker. If not replaying and this worker has a defined
90+
// Deployment Version, it will equal that. It will also be empty for
91+
// evict-only activations. The deployment name may be empty, but not the
92+
// build id, if this worker was using the deprecated Build ID-only
93+
// feature(s).
94+
common.WorkerDeploymentVersion deployment_version_for_current_task = 9;
9195
}
9296

9397
message WorkflowActivationJob {

sdk-core-protos/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ pub mod coresdk {
489489
available_internal_flags: vec![],
490490
history_size_bytes: 0,
491491
continue_as_new_suggested: false,
492-
build_id_for_current_task: "".to_string(),
492+
deployment_version_for_current_task: None,
493493
}
494494
}
495495

sdk/src/workflow_context.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use std::{
2727
task::Poll,
2828
time::{Duration, SystemTime},
2929
};
30+
use temporal_sdk_core_api::worker::WorkerDeploymentVersion;
3031
use temporal_sdk_core_protos::{
3132
coresdk::{
3233
activity_result::{ActivityResolution, activity_resolution},
@@ -132,10 +133,11 @@ impl WfContext {
132133
self.shared.read().history_length
133134
}
134135

135-
/// Return the Build ID as it was when this point in the workflow was first reached. If this
136-
/// code is being executed for the first time, return this Worker's Build ID if it has one.
137-
pub fn current_build_id(&self) -> Option<String> {
138-
self.shared.read().current_build_id.clone()
136+
/// Return the deployment version, if any, as it was when this point in the workflow was first
137+
/// reached. If this code is being executed for the first time, return this Worker's deployment
138+
/// version if it has one.
139+
pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
140+
self.shared.read().current_deployment_version.clone()
139141
}
140142

141143
/// Return current values for workflow search attributes
@@ -499,7 +501,7 @@ pub(crate) struct WfContextSharedData {
499501
pub(crate) is_replaying: bool,
500502
pub(crate) wf_time: Option<SystemTime>,
501503
pub(crate) history_length: u32,
502-
pub(crate) current_build_id: Option<String>,
504+
pub(crate) current_deployment_version: Option<WorkerDeploymentVersion>,
503505
pub(crate) search_attributes: SearchAttributes,
504506
pub(crate) random_seed: u64,
505507
}

sdk/src/workflow_future.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -365,11 +365,9 @@ impl Future for WorkflowFuture {
365365
wlock.is_replaying = activation.is_replaying;
366366
wlock.wf_time = activation.timestamp.try_into_or_none();
367367
wlock.history_length = activation.history_length;
368-
wlock.current_build_id = if activation.build_id_for_current_task.is_empty() {
369-
None
370-
} else {
371-
Some(activation.build_id_for_current_task)
372-
};
368+
wlock.current_deployment_version = activation
369+
.deployment_version_for_current_task
370+
.map(Into::into);
373371
}
374372

375373
let mut die_of_eviction_when_done = false;

tests/integ_tests/worker_versioning_tests.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,15 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau
2727
let wf_type = "sets_deployment_info_on_task_responses";
2828
let mut starter = CoreWfStarter::new(wf_type);
2929
let deploy_name = format!("deployment-{}", starter.get_task_queue());
30+
let version = WorkerDeploymentVersion {
31+
deployment_name: deploy_name.clone(),
32+
build_id: "1.0".to_string(),
33+
};
3034
starter
3135
.worker_config
3236
.versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
3337
WorkerDeploymentOptions {
34-
version: WorkerDeploymentVersion {
35-
deployment_name: deploy_name.clone(),
36-
build_id: "1.0".to_string(),
37-
},
38+
version: version.clone(),
3839
use_worker_versioning: true,
3940
default_versioning_behavior: VersioningBehavior::AutoUpgrade.into(),
4041
},
@@ -47,7 +48,10 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau
4748
// we can describe it and then set the current version.
4849
let worker_task = async {
4950
let res = core.poll_workflow_activation().await.unwrap();
50-
assert_eq!(res.build_id_for_current_task, "1.0");
51+
assert_eq!(
52+
version,
53+
res.deployment_version_for_current_task.unwrap().into(),
54+
);
5155

5256
let mut success_complete = workflow_completion::Success::from_variants(vec![
5357
CompleteWorkflowExecution { result: None }.into(),

0 commit comments

Comments
 (0)