Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 55 additions & 5 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use crate::{errors::WorkflowErrorType, telemetry::metrics::TemporalMeter};
use std::{
any::Any,
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::Duration,
};
use temporal_sdk_core_protos::{
coresdk,
coresdk::{ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo, WorkflowSlotInfo},
temporal::api::enums::v1::VersioningBehavior,
};
Expand Down Expand Up @@ -175,8 +177,21 @@ impl WorkerConfig {
.unwrap_or(false)
}

pub fn build_id(&self) -> &str {
self.versioning_strategy.build_id()
pub fn computed_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
let wdv = match self.versioning_strategy {
WorkerVersioningStrategy::None { ref build_id } => WorkerDeploymentVersion {
deployment_name: "".to_owned(),
build_id: build_id.clone(),
},
WorkerVersioningStrategy::WorkerDeploymentBased(ref opts) => opts.version.clone(),
WorkerVersioningStrategy::LegacyBuildIdBased { ref build_id } => {
WorkerDeploymentVersion {
deployment_name: "".to_owned(),
build_id: build_id.clone(),
}
}
};
if wdv.is_empty() { None } else { Some(wdv) }
}
}

Expand Down Expand Up @@ -316,9 +331,6 @@ pub trait SlotReservationContext: Send + Sync {
/// Returns the identity of the worker
fn worker_identity(&self) -> &str;

/// Returns the build id of the worker
fn worker_build_id(&self) -> &str;

/// Returns the number of currently outstanding slot permits, whether used or un-used.
fn num_issued_slots(&self) -> usize;

Expand Down Expand Up @@ -569,3 +581,41 @@ pub struct WorkerDeploymentVersion {
/// Build ID for the worker.
pub build_id: String,
}

impl WorkerDeploymentVersion {
pub fn is_empty(&self) -> bool {
self.deployment_name.is_empty() && self.build_id.is_empty()
}
}

impl FromStr for WorkerDeploymentVersion {
type Err = ();

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.split_once('.') {
Some((name, build_id)) => Ok(WorkerDeploymentVersion {
deployment_name: name.to_owned(),
build_id: build_id.to_owned(),
}),
_ => Err(()),
}
}
}

impl From<WorkerDeploymentVersion> for coresdk::common::WorkerDeploymentVersion {
fn from(v: WorkerDeploymentVersion) -> coresdk::common::WorkerDeploymentVersion {
coresdk::common::WorkerDeploymentVersion {
deployment_name: v.deployment_name,
build_id: v.build_id,
}
}
}

impl From<coresdk::common::WorkerDeploymentVersion> for WorkerDeploymentVersion {
fn from(v: coresdk::common::WorkerDeploymentVersion) -> WorkerDeploymentVersion {
WorkerDeploymentVersion {
deployment_name: v.deployment_name,
build_id: v.build_id,
}
}
}
5 changes: 0 additions & 5 deletions core/src/abstractions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ pub(crate) struct MeteredPermitDealer<SK: SlotKind> {
pub(crate) struct PermitDealerContextData {
pub(crate) task_queue: String,
pub(crate) worker_identity: String,
pub(crate) worker_build_id: String,
}

impl<SK> MeteredPermitDealer<SK>
Expand Down Expand Up @@ -171,10 +170,6 @@ impl<SK: SlotKind> SlotReservationContext for MeteredPermitDealer<SK> {
&self.context_data.worker_identity
}

fn worker_build_id(&self) -> &str {
&self.context_data.worker_build_id
}

fn num_issued_slots(&self) -> usize {
*self.extant_permits.1.borrow()
}
Expand Down
16 changes: 14 additions & 2 deletions core/src/core_tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,12 +888,24 @@ async fn build_id_set_properly_on_query_on_first_task() {
let core = mock_worker(mock);

let task = core.poll_workflow_activation().await.unwrap();
assert_eq!(task.build_id_for_current_task, "1.0");
assert_eq!(
task.deployment_version_for_current_task
.as_ref()
.unwrap()
.build_id,
"1.0"
);
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
.await
.unwrap();
let task = core.poll_workflow_activation().await.unwrap();
assert_eq!(task.build_id_for_current_task, "1.0");
assert_eq!(
task.deployment_version_for_current_task
.as_ref()
.unwrap()
.build_id,
"1.0"
);
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
.await
.unwrap();
Expand Down
17 changes: 13 additions & 4 deletions core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2986,14 +2986,23 @@ async fn sets_build_id_from_wft_complete() {

worker.register_wf(DEFAULT_WORKFLOW_TYPE, |ctx: WfContext| async move {
// First task, it should be empty, since replaying and nothing in first WFT completed
assert_eq!(ctx.current_build_id(), None);
assert_eq!(ctx.current_deployment_version(), None);
ctx.timer(Duration::from_secs(1)).await;
assert_eq!(ctx.current_build_id(), Some("enchi-cat".to_string()));
assert_eq!(
ctx.current_deployment_version().unwrap().build_id,
"enchi-cat"
);
ctx.timer(Duration::from_secs(1)).await;
// Not replaying at this point, so we should see the worker's build id
assert_eq!(ctx.current_build_id(), Some("fierce-predator".to_string()));
assert_eq!(
ctx.current_deployment_version().unwrap().build_id,
"fierce-predator"
);
ctx.timer(Duration::from_secs(1)).await;
assert_eq!(ctx.current_build_id(), Some("fierce-predator".to_string()));
assert_eq!(
ctx.current_deployment_version().unwrap().build_id,
"fierce-predator"
);
Ok(().into())
});
worker
Expand Down
1 change: 0 additions & 1 deletion core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ impl Worker {
let slot_context_data = Arc::new(PermitDealerContextData {
task_queue: config.task_queue.clone(),
worker_identity: config.client_identity_override.clone().unwrap_or_default(),
worker_build_id: config.build_id().to_owned(),
});
let wft_slots = MeteredPermitDealer::new(
tuner.workflow_task_slot_supplier(),
Expand Down
41 changes: 28 additions & 13 deletions core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use std::{
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use temporal_sdk_core_api::worker::WorkerConfig;
use temporal_sdk_core_api::worker::{WorkerConfig, WorkerDeploymentVersion};
use temporal_sdk_core_protos::{
coresdk::{
common::{NamespacedWorkflowExecution, VersioningIntent},
Expand Down Expand Up @@ -123,8 +123,9 @@ pub(crate) struct WorkflowMachines {
history_size_bytes: u64,
/// Set on each WFT started event
continue_as_new_suggested: bool,
/// Set if the current WFT is already complete and that completion event had a build id in it.
current_wft_build_id: Option<String>,
/// Set if the current WFT is already complete and that completion event had legacy build-id
/// or a deployment version in it. Will use an empty deployment name if it's legacy build-id.
current_wft_deployment_info: Option<WorkerDeploymentVersion>,

all_machines: SlotMap<MachineKey, Machines>,
/// If a machine key is in this map, that machine was created internally by core, not as a
Expand Down Expand Up @@ -286,7 +287,7 @@ impl WorkflowMachines {
observed_internal_flags: Rc::new(RefCell::new(observed_internal_flags)),
history_size_bytes: 0,
continue_as_new_suggested: false,
current_wft_build_id: None,
current_wft_deployment_info: None,
all_machines: Default::default(),
machine_is_core_created: Default::default(),
machines_by_event_id: Default::default(),
Expand Down Expand Up @@ -442,11 +443,11 @@ impl WorkflowMachines {
)
});
let is_replaying = self.replaying || all_query;
let build_id_for_current_task = if is_replaying {
self.current_wft_build_id.clone().unwrap_or_default()
let deployment_version_for_current_task = if is_replaying {
self.current_wft_deployment_info.clone()
} else {
self.current_wft_build_id = Some(self.worker_config.build_id().to_owned());
self.worker_config.build_id().to_owned()
self.current_wft_deployment_info = self.worker_config.computed_deployment_version();
self.current_wft_deployment_info.clone()
};
WorkflowActivation {
timestamp: self.current_wf_time.map(Into::into),
Expand All @@ -460,7 +461,8 @@ impl WorkflowMachines {
.collect(),
history_size_bytes: self.history_size_bytes,
continue_as_new_suggested: self.continue_as_new_suggested,
build_id_for_current_task,
deployment_version_for_current_task: deployment_version_for_current_task
.map(Into::into),
}
}

Expand All @@ -479,9 +481,7 @@ impl WorkflowMachines {
// If this worker has a build ID and we're completing the task, we want to say our ID is the
// current build ID, so that if we get a query before any new history, we properly can
// report that our ID was the one used for the completion.
if !self.worker_config.build_id().is_empty() {
self.current_wft_build_id = Some(self.worker_config.build_id().to_owned());
}
self.current_wft_deployment_info = self.worker_config.computed_deployment_version();
(*self.observed_internal_flags)
.borrow_mut()
.gather_for_wft_complete()
Expand Down Expand Up @@ -587,8 +587,23 @@ impl WorkflowMachines {
(*$me.observed_internal_flags)
.borrow_mut()
.add_from_complete($wtc);
let mut combined_ver = WorkerDeploymentVersion {
deployment_name: "".to_string(),
build_id: "".to_string(),
};
if let Some(bid) = $wtc.worker_version.as_ref().map(|wv| &wv.build_id) {
$me.current_wft_build_id = Some(bid.to_string());
combined_ver.build_id = bid.to_string();
}
if !$wtc.worker_deployment_name.is_empty() {
combined_ver.deployment_name = $wtc.worker_deployment_name.clone();
}
if !$wtc.worker_deployment_version.is_empty() {
if let Ok(ver) = $wtc.worker_deployment_version.parse() {
combined_ver = ver;
}
}
if !combined_ver.is_empty() {
$me.current_wft_deployment_info = Some(combined_ver);
}
}};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ enum VersioningIntent {
// Indicates that the command should run on the target task queue's current overall-default
// build ID.
DEFAULT = 2;
}

message WorkerDeploymentVersion {
string deployment_name = 1;
string build_id = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,14 @@ message WorkflowActivation {
uint64 history_size_bytes = 7;
// Set true if the most recent WFT started event had this suggestion
bool continue_as_new_suggested = 8;
// Set to the Build ID of the worker that processed this task, which may be empty. During replay
// this id may not equal the id of the replaying worker. If not replaying and this worker has
// a defined Build ID, it will equal that ID. It will also be empty for evict-only activations.
string build_id_for_current_task = 9;
// Set to the deployment version of the worker that processed this task,
// which may be empty. During replay this version may not equal the version
// of the replaying worker. If not replaying and this worker has a defined
// Deployment Version, it will equal that. It will also be empty for
// evict-only activations. The deployment name may be empty, but not the
// build id, if this worker was using the deprecated Build ID-only
// feature(s).
common.WorkerDeploymentVersion deployment_version_for_current_task = 9;
}

message WorkflowActivationJob {
Expand Down
2 changes: 1 addition & 1 deletion sdk-core-protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ pub mod coresdk {
available_internal_flags: vec![],
history_size_bytes: 0,
continue_as_new_suggested: false,
build_id_for_current_task: "".to_string(),
deployment_version_for_current_task: None,
}
}

Expand Down
12 changes: 7 additions & 5 deletions sdk/src/workflow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::{
task::Poll,
time::{Duration, SystemTime},
};
use temporal_sdk_core_api::worker::WorkerDeploymentVersion;
use temporal_sdk_core_protos::{
coresdk::{
activity_result::{ActivityResolution, activity_resolution},
Expand Down Expand Up @@ -132,10 +133,11 @@ impl WfContext {
self.shared.read().history_length
}

/// Return the Build ID as it was when this point in the workflow was first reached. If this
/// code is being executed for the first time, return this Worker's Build ID if it has one.
pub fn current_build_id(&self) -> Option<String> {
self.shared.read().current_build_id.clone()
/// Return the deployment version, if any, as it was when this point in the workflow was first
/// reached. If this code is being executed for the first time, return this Worker's deployment
/// version if it has one.
pub fn current_deployment_version(&self) -> Option<WorkerDeploymentVersion> {
self.shared.read().current_deployment_version.clone()
}

/// Return current values for workflow search attributes
Expand Down Expand Up @@ -499,7 +501,7 @@ pub(crate) struct WfContextSharedData {
pub(crate) is_replaying: bool,
pub(crate) wf_time: Option<SystemTime>,
pub(crate) history_length: u32,
pub(crate) current_build_id: Option<String>,
pub(crate) current_deployment_version: Option<WorkerDeploymentVersion>,
pub(crate) search_attributes: SearchAttributes,
pub(crate) random_seed: u64,
}
Expand Down
8 changes: 3 additions & 5 deletions sdk/src/workflow_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,9 @@ impl Future for WorkflowFuture {
wlock.is_replaying = activation.is_replaying;
wlock.wf_time = activation.timestamp.try_into_or_none();
wlock.history_length = activation.history_length;
wlock.current_build_id = if activation.build_id_for_current_task.is_empty() {
None
} else {
Some(activation.build_id_for_current_task)
};
wlock.current_deployment_version = activation
.deployment_version_for_current_task
.map(Into::into);
}

let mut die_of_eviction_when_done = false;
Expand Down
14 changes: 9 additions & 5 deletions tests/integ_tests/worker_versioning_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau
let wf_type = "sets_deployment_info_on_task_responses";
let mut starter = CoreWfStarter::new(wf_type);
let deploy_name = format!("deployment-{}", starter.get_task_queue());
let version = WorkerDeploymentVersion {
deployment_name: deploy_name.clone(),
build_id: "1.0".to_string(),
};
starter
.worker_config
.versioning_strategy(WorkerVersioningStrategy::WorkerDeploymentBased(
WorkerDeploymentOptions {
version: WorkerDeploymentVersion {
deployment_name: deploy_name.clone(),
build_id: "1.0".to_string(),
},
version: version.clone(),
use_worker_versioning: true,
default_versioning_behavior: VersioningBehavior::AutoUpgrade.into(),
},
Expand All @@ -47,7 +48,10 @@ async fn sets_deployment_info_on_task_responses(#[values(true, false)] use_defau
// we can describe it and then set the current version.
let worker_task = async {
let res = core.poll_workflow_activation().await.unwrap();
assert_eq!(res.build_id_for_current_task, "1.0");
assert_eq!(
version,
res.deployment_version_for_current_task.unwrap().into(),
);

let mut success_complete = workflow_completion::Success::from_variants(vec![
CompleteWorkflowExecution { result: None }.into(),
Expand Down
Loading
Loading