Skip to content

Commit 240ad27

Browse files
committed
Attach deployment version to activations instead of just build id
1 parent 15ea20f commit 240ad27

13 files changed

Lines changed: 253 additions & 65 deletions

File tree

core-api/src/worker.rs

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ use crate::{errors::WorkflowErrorType, telemetry::metrics::TemporalMeter};
22
use std::{
33
any::Any,
44
collections::{HashMap, HashSet},
5+
str::FromStr,
56
sync::Arc,
67
time::Duration,
78
};
89
use temporal_sdk_core_protos::{
10+
coresdk,
911
coresdk::{ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo, WorkflowSlotInfo},
1012
temporal::api::enums::v1::VersioningBehavior,
1113
};
@@ -175,8 +177,21 @@ impl WorkerConfig {
175177
.unwrap_or(false)
176178
}
177179

178-
pub fn build_id(&self) -> &str {
179-
self.versioning_strategy.build_id()
180+
pub fn computed_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
181+
let wdv = match self.versioning_strategy {
182+
WorkerVersioningStrategy::None { ref build_id } => WorkerDeploymentVersion {
183+
deployment_name: "".to_owned(),
184+
build_id: build_id.clone(),
185+
},
186+
WorkerVersioningStrategy::WorkerDeploymentBased(ref opts) => opts.version.clone(),
187+
WorkerVersioningStrategy::LegacyBuildIdBased { ref build_id } => {
188+
WorkerDeploymentVersion {
189+
deployment_name: "".to_owned(),
190+
build_id: build_id.clone(),
191+
}
192+
}
193+
};
194+
if wdv.is_empty() { None } else { Some(wdv) }
180195
}
181196
}
182197

@@ -316,9 +331,6 @@ pub trait SlotReservationContext: Send + Sync {
316331
/// Returns the identity of the worker
317332
fn worker_identity(&self) -> &str;
318333

319-
/// Returns the build id of the worker
320-
fn worker_build_id(&self) -> &str;
321-
322334
/// Returns the number of currently outstanding slot permits, whether used or un-used.
323335
fn num_issued_slots(&self) -> usize;
324336

@@ -569,3 +581,41 @@ pub struct WorkerDeploymentVersion {
569581
/// Build ID for the worker.
570582
pub build_id: String,
571583
}
584+
585+
impl WorkerDeploymentVersion {
586+
pub fn is_empty(&self) -> bool {
587+
self.deployment_name.is_empty() && self.build_id.is_empty()
588+
}
589+
}
590+
591+
impl FromStr for WorkerDeploymentVersion {
592+
type Err = ();
593+
594+
fn from_str(s: &str) -> Result<Self, Self::Err> {
595+
match s.split_once('.') {
596+
Some((name, build_id)) => Ok(WorkerDeploymentVersion {
597+
deployment_name: name.to_owned(),
598+
build_id: build_id.to_owned(),
599+
}),
600+
_ => Err(()),
601+
}
602+
}
603+
}
604+
605+
impl From<WorkerDeploymentVersion> for coresdk::common::WorkerDeploymentVersion {
606+
fn from(v: WorkerDeploymentVersion) -> coresdk::common::WorkerDeploymentVersion {
607+
coresdk::common::WorkerDeploymentVersion {
608+
deployment_name: v.deployment_name,
609+
build_id: v.build_id,
610+
}
611+
}
612+
}
613+
614+
impl From<coresdk::common::WorkerDeploymentVersion> for WorkerDeploymentVersion {
615+
fn from(v: coresdk::common::WorkerDeploymentVersion) -> WorkerDeploymentVersion {
616+
WorkerDeploymentVersion {
617+
deployment_name: v.deployment_name,
618+
build_id: v.build_id,
619+
}
620+
}
621+
}

core/src/abstractions.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ pub(crate) struct MeteredPermitDealer<SK: SlotKind> {
4747
pub(crate) struct PermitDealerContextData {
4848
pub(crate) task_queue: String,
4949
pub(crate) worker_identity: String,
50-
pub(crate) worker_build_id: String,
5150
}
5251

5352
impl<SK> MeteredPermitDealer<SK>
@@ -171,10 +170,6 @@ impl<SK: SlotKind> SlotReservationContext for MeteredPermitDealer<SK> {
171170
&self.context_data.worker_identity
172171
}
173172

174-
fn worker_build_id(&self) -> &str {
175-
&self.context_data.worker_build_id
176-
}
177-
178173
fn num_issued_slots(&self) -> usize {
179174
*self.extant_permits.1.borrow()
180175
}

core/src/core_tests/queries.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -888,12 +888,24 @@ async fn build_id_set_properly_on_query_on_first_task() {
888888
let core = mock_worker(mock);
889889

890890
let task = core.poll_workflow_activation().await.unwrap();
891-
assert_eq!(task.build_id_for_current_task, "1.0");
891+
assert_eq!(
892+
task.deployment_version_for_current_task
893+
.as_ref()
894+
.unwrap()
895+
.build_id,
896+
"1.0"
897+
);
892898
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
893899
.await
894900
.unwrap();
895901
let task = core.poll_workflow_activation().await.unwrap();
896-
assert_eq!(task.build_id_for_current_task, "1.0");
902+
assert_eq!(
903+
task.deployment_version_for_current_task
904+
.as_ref()
905+
.unwrap()
906+
.build_id,
907+
"1.0"
908+
);
897909
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
898910
.await
899911
.unwrap();

core/src/core_tests/workflow_tasks.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2986,14 +2986,23 @@ async fn sets_build_id_from_wft_complete() {
29862986

29872987
worker.register_wf(DEFAULT_WORKFLOW_TYPE, |ctx: WfContext| async move {
29882988
// First task, it should be empty, since replaying and nothing in first WFT completed
2989-
assert_eq!(ctx.current_build_id(), None);
2989+
assert_eq!(ctx.current_deployment_version(), None);
29902990
ctx.timer(Duration::from_secs(1)).await;
2991-
assert_eq!(ctx.current_build_id(), Some("enchi-cat".to_string()));
2991+
assert_eq!(
2992+
ctx.current_deployment_version().unwrap().build_id,
2993+
"enchi-cat"
2994+
);
29922995
ctx.timer(Duration::from_secs(1)).await;
29932996
// Not replaying at this point, so we should see the worker's build id
2994-
assert_eq!(ctx.current_build_id(), Some("fierce-predator".to_string()));
2997+
assert_eq!(
2998+
ctx.current_deployment_version().unwrap().build_id,
2999+
"fierce-predator"
3000+
);
29953001
ctx.timer(Duration::from_secs(1)).await;
2996-
assert_eq!(ctx.current_build_id(), Some("fierce-predator".to_string()));
3002+
assert_eq!(
3003+
ctx.current_deployment_version().unwrap().build_id,
3004+
"fierce-predator"
3005+
);
29973006
Ok(().into())
29983007
});
29993008
worker

core/src/worker/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,6 @@ impl Worker {
325325
let slot_context_data = Arc::new(PermitDealerContextData {
326326
task_queue: config.task_queue.clone(),
327327
worker_identity: config.client_identity_override.clone().unwrap_or_default(),
328-
worker_build_id: config.build_id().to_owned(),
329328
});
330329
let wft_slots = MeteredPermitDealer::new(
331330
tuner.workflow_task_slot_supplier(),

core/src/worker/workflow/machines/workflow_machines.rs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use std::{
5151
sync::Arc,
5252
time::{Duration, Instant, SystemTime},
5353
};
54-
use temporal_sdk_core_api::worker::WorkerConfig;
54+
use temporal_sdk_core_api::worker::{WorkerConfig, WorkerDeploymentVersion};
5555
use temporal_sdk_core_protos::{
5656
coresdk::{
5757
common::{NamespacedWorkflowExecution, VersioningIntent},
@@ -123,8 +123,9 @@ pub(crate) struct WorkflowMachines {
123123
history_size_bytes: u64,
124124
/// Set on each WFT started event
125125
continue_as_new_suggested: bool,
126-
/// Set if the current WFT is already complete and that completion event had a build id in it.
127-
current_wft_build_id: Option<String>,
126+
/// Set if the current WFT is already complete and that completion event had legacy build-id
127+
/// or a deployment version in it. Will use an empty deployment name if it's legacy build-id.
128+
current_wft_deployment_info: Option<WorkerDeploymentVersion>,
128129

129130
all_machines: SlotMap<MachineKey, Machines>,
130131
/// If a machine key is in this map, that machine was created internally by core, not as a
@@ -286,7 +287,7 @@ impl WorkflowMachines {
286287
observed_internal_flags: Rc::new(RefCell::new(observed_internal_flags)),
287288
history_size_bytes: 0,
288289
continue_as_new_suggested: false,
289-
current_wft_build_id: None,
290+
current_wft_deployment_info: None,
290291
all_machines: Default::default(),
291292
machine_is_core_created: Default::default(),
292293
machines_by_event_id: Default::default(),
@@ -442,11 +443,11 @@ impl WorkflowMachines {
442443
)
443444
});
444445
let is_replaying = self.replaying || all_query;
445-
let build_id_for_current_task = if is_replaying {
446-
self.current_wft_build_id.clone().unwrap_or_default()
446+
let deployment_version_for_current_task = if is_replaying {
447+
self.current_wft_deployment_info.clone()
447448
} else {
448-
self.current_wft_build_id = Some(self.worker_config.build_id().to_owned());
449-
self.worker_config.build_id().to_owned()
449+
self.current_wft_deployment_info = self.worker_config.computed_deployment_version();
450+
self.current_wft_deployment_info.clone()
450451
};
451452
WorkflowActivation {
452453
timestamp: self.current_wf_time.map(Into::into),
@@ -460,7 +461,8 @@ impl WorkflowMachines {
460461
.collect(),
461462
history_size_bytes: self.history_size_bytes,
462463
continue_as_new_suggested: self.continue_as_new_suggested,
463-
build_id_for_current_task,
464+
deployment_version_for_current_task: deployment_version_for_current_task
465+
.map(Into::into),
464466
}
465467
}
466468

@@ -479,9 +481,7 @@ impl WorkflowMachines {
479481
// If this worker has a build ID and we're completing the task, we want to say our ID is the
480482
// current build ID, so that if we get a query before any new history, we properly can
481483
// report that our ID was the one used for the completion.
482-
if !self.worker_config.build_id().is_empty() {
483-
self.current_wft_build_id = Some(self.worker_config.build_id().to_owned());
484-
}
484+
self.current_wft_deployment_info = self.worker_config.computed_deployment_version();
485485
(*self.observed_internal_flags)
486486
.borrow_mut()
487487
.gather_for_wft_complete()
@@ -587,8 +587,23 @@ impl WorkflowMachines {
587587
(*$me.observed_internal_flags)
588588
.borrow_mut()
589589
.add_from_complete($wtc);
590+
let mut combined_ver = WorkerDeploymentVersion {
591+
deployment_name: "".to_string(),
592+
build_id: "".to_string(),
593+
};
590594
if let Some(bid) = $wtc.worker_version.as_ref().map(|wv| &wv.build_id) {
591-
$me.current_wft_build_id = Some(bid.to_string());
595+
combined_ver.build_id = bid.to_string();
596+
}
597+
if !$wtc.worker_deployment_name.is_empty() {
598+
combined_ver.deployment_name = $wtc.worker_deployment_name.clone();
599+
}
600+
if !$wtc.worker_deployment_version.is_empty() {
601+
if let Ok(ver) = $wtc.worker_deployment_version.parse() {
602+
combined_ver = ver;
603+
}
604+
}
605+
if !combined_ver.is_empty() {
606+
$me.current_wft_deployment_info = Some(combined_ver);
592607
}
593608
}};
594609
}

sdk-core-protos/protos/local/temporal/sdk/core/common/common.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,9 @@ enum VersioningIntent {
2929
// Indicates that the command should run on the target task queue's current overall-default
3030
// build ID.
3131
DEFAULT = 2;
32+
}
33+
34+
message WorkerDeploymentVersion {
35+
string deployment_name = 1;
36+
string build_id = 2;
3237
}

sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,14 @@ message WorkflowActivation {
8484
uint64 history_size_bytes = 7;
8585
// Set true if the most recent WFT started event had this suggestion
8686
bool continue_as_new_suggested = 8;
87-
// Set to the Build ID of the worker that processed this task, which may be empty. During replay
88-
// this id may not equal the id of the replaying worker. If not replaying and this worker has
89-
// a defined Build ID, it will equal that ID. It will also be empty for evict-only activations.
90-
string build_id_for_current_task = 9;
87+
// Set to the deployment version of the worker that processed this task,
88+
// which may be empty. During replay this version may not equal the version
89+
// of the replaying worker. If not replaying and this worker has a defined
90+
// Deployment Version, it will equal that. It will also be empty for
91+
// evict-only activations. The deployment name may be empty, but not the
92+
// build id, if this worker was using the deprecated Build ID-only
93+
// feature(s).
94+
common.WorkerDeploymentVersion deployment_version_for_current_task = 9;
9195
}
9296

9397
message WorkflowActivationJob {

sdk-core-protos/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ pub mod coresdk {
489489
available_internal_flags: vec![],
490490
history_size_bytes: 0,
491491
continue_as_new_suggested: false,
492-
build_id_for_current_task: "".to_string(),
492+
deployment_version_for_current_task: None,
493493
}
494494
}
495495

sdk/src/workflow_context.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use std::{
2727
task::Poll,
2828
time::{Duration, SystemTime},
2929
};
30+
use temporal_sdk_core_api::worker::WorkerDeploymentVersion;
3031
use temporal_sdk_core_protos::{
3132
coresdk::{
3233
activity_result::{ActivityResolution, activity_resolution},
@@ -132,10 +133,11 @@ impl WfContext {
132133
self.shared.read().history_length
133134
}
134135

135-
/// Return the Build ID as it was when this point in the workflow was first reached. If this
136-
/// code is being executed for the first time, return this Worker's Build ID if it has one.
137-
pub fn current_build_id(&self) -> Option<String> {
138-
self.shared.read().current_build_id.clone()
136+
/// Return the deployment version, if any, as it was when this point in the workflow was first
137+
/// reached. If this code is being executed for the first time, return this Worker's deployment
138+
/// version if it has one.
139+
pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
140+
self.shared.read().current_deployment_version.clone()
139141
}
140142

141143
/// Return current values for workflow search attributes
@@ -499,7 +501,7 @@ pub(crate) struct WfContextSharedData {
499501
pub(crate) is_replaying: bool,
500502
pub(crate) wf_time: Option<SystemTime>,
501503
pub(crate) history_length: u32,
502-
pub(crate) current_build_id: Option<String>,
504+
pub(crate) current_deployment_version: Option<WorkerDeploymentVersion>,
503505
pub(crate) search_attributes: SearchAttributes,
504506
pub(crate) random_seed: u64,
505507
}

0 commit comments

Comments
 (0)