Skip to content

Commit 7d4c48a

Browse files
authored
Worker Deployment Versioning (#896)
1 parent 93471ac commit 7d4c48a

24 files changed

Lines changed: 418 additions & 101 deletions

File tree

core-api/src/worker.rs

Lines changed: 101 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use std::{
55
sync::Arc,
66
time::Duration,
77
};
8-
use temporal_sdk_core_protos::coresdk::{
9-
ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo, WorkflowSlotInfo,
8+
use temporal_sdk_core_protos::{
9+
coresdk::{ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo, WorkflowSlotInfo},
10+
temporal::api::enums::v1::VersioningBehavior,
1011
};
1112

1213
/// Defines per-worker configuration options
@@ -19,9 +20,6 @@ pub struct WorkerConfig {
1920
/// What task queue will this worker poll from? This task queue name will be used for both
2021
/// workflow and activity polling.
2122
pub task_queue: String,
22-
/// A string that should be unique to the set of code this worker uses. IE: All the workflow,
23-
/// activity, interceptor, and data converter code.
24-
pub worker_build_id: String,
2523
/// A human-readable string that can identify this worker. Using something like sdk version
2624
/// and host name is a good default. If set, overrides the identity set (if any) on the client
2725
/// used by this worker.
@@ -96,13 +94,6 @@ pub struct WorkerConfig {
9694
#[builder(default)]
9795
pub max_worker_activities_per_second: Option<f64>,
9896

99-
/// # UNDER DEVELOPMENT
100-
/// If set to true this worker will opt-in to the whole-worker versioning feature.
101-
/// `worker_build_id` will be used as the version.
102-
/// todo: link to feature docs
103-
#[builder(default = "false")]
104-
pub use_worker_versioning: bool,
105-
10697
/// If set false (default), shutdown will not finish until all pending evictions have been
10798
/// issued and replied to. If set true shutdown will be considered complete when the only
10899
/// remaining work is pending evictions.
@@ -163,6 +154,9 @@ pub struct WorkerConfig {
163154
/// Mutually exclusive with `tuner`
164155
#[builder(setter(into, strip_option), default)]
165156
pub max_outstanding_nexus_tasks: Option<usize>,
157+
158+
/// A versioning strategy for this worker.
159+
pub versioning_strategy: WorkerVersioningStrategy,
166160
}
167161

168162
impl WorkerConfig {
@@ -180,6 +174,10 @@ impl WorkerConfig {
180174
.map(|s| s.contains(error_type))
181175
.unwrap_or(false)
182176
}
177+
178+
pub fn build_id(&self) -> &str {
179+
self.versioning_strategy.build_id()
180+
}
183181
}
184182

185183
impl WorkerConfigBuilder {
@@ -218,18 +216,31 @@ impl WorkerConfigBuilder {
218216
return Err("max_outstanding_* fields are mutually exclusive with `tuner`".to_owned());
219217
}
220218

221-
if self.use_worker_versioning.unwrap_or_default()
222-
&& self
223-
.worker_build_id
224-
.as_ref()
225-
.map(|s| s.is_empty())
226-
.unwrap_or_default()
227-
{
228-
return Err(
229-
"`worker_build_id` must be non-empty when `use_worker_versioning` is true"
230-
.to_owned(),
231-
);
219+
if let Some(wv) = self.versioning_strategy.as_ref() {
220+
match wv {
221+
WorkerVersioningStrategy::None { .. } => {}
222+
WorkerVersioningStrategy::WorkerDeploymentBased(d) => {
223+
if d.use_worker_versioning
224+
&& (d.version.build_id.is_empty() || d.version.deployment_name.is_empty())
225+
{
226+
return Err(
227+
"WorkerDeploymentVersion must have a non-empty build_id and \
228+
deployment_name when deployment-based versioning is enabled"
229+
.to_owned(),
230+
);
231+
}
232+
}
233+
WorkerVersioningStrategy::LegacyBuildIdBased { build_id } => {
234+
if build_id.is_empty() {
235+
return Err(
236+
"Legacy build id-based versioning must have a non-empty build_id"
237+
.to_owned(),
238+
);
239+
}
240+
}
241+
}
232242
}
243+
233244
Ok(())
234245
}
235246
}
@@ -491,3 +502,70 @@ impl PollerBehavior {
491502
Ok(())
492503
}
493504
}
505+
506+
#[derive(Clone, Debug)]
507+
pub enum WorkerVersioningStrategy {
508+
/// Don't enable any versioning
509+
None {
510+
/// Build ID may still be passed as a way to identify the worker, or may be left empty.
511+
build_id: String,
512+
},
513+
/// Maybe use the modern deployment-based versioning, or just pass a deployment version.
514+
WorkerDeploymentBased(WorkerDeploymentOptions),
515+
/// Use the legacy build-id-based whole worker versioning.
516+
LegacyBuildIdBased {
517+
/// A Build ID to use, must be non-empty.
518+
build_id: String,
519+
},
520+
}
521+
522+
impl Default for WorkerVersioningStrategy {
523+
fn default() -> Self {
524+
WorkerVersioningStrategy::None {
525+
build_id: String::new(),
526+
}
527+
}
528+
}
529+
530+
impl WorkerVersioningStrategy {
531+
pub fn build_id(&self) -> &str {
532+
match self {
533+
WorkerVersioningStrategy::None { build_id } => build_id,
534+
WorkerVersioningStrategy::WorkerDeploymentBased(opts) => &opts.version.build_id,
535+
WorkerVersioningStrategy::LegacyBuildIdBased { build_id } => build_id,
536+
}
537+
}
538+
539+
pub fn uses_build_id_based(&self) -> bool {
540+
matches!(self, WorkerVersioningStrategy::LegacyBuildIdBased { .. })
541+
}
542+
543+
pub fn default_versioning_behavior(&self) -> Option<VersioningBehavior> {
544+
match self {
545+
WorkerVersioningStrategy::WorkerDeploymentBased(opts) => {
546+
opts.default_versioning_behavior
547+
}
548+
_ => None,
549+
}
550+
}
551+
}
552+
553+
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
554+
pub struct WorkerDeploymentOptions {
555+
/// The deployment version of this worker.
556+
pub version: WorkerDeploymentVersion,
557+
/// If set, opts in to the Worker Deployment Versioning feature, meaning this worker will only
558+
/// receive tasks for workflows it claims to be compatible with.
559+
pub use_worker_versioning: bool,
560+
/// The default versioning behavior to use for workflows that do not pass one to Core.
561+
/// It is a startup-time error to specify `Some(Unspecified)` here.
562+
pub default_versioning_behavior: Option<VersioningBehavior>,
563+
}
564+
565+
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
566+
pub struct WorkerDeploymentVersion {
567+
/// Name of the deployment
568+
pub deployment_name: String,
569+
/// Build ID for the worker.
570+
pub build_id: String,
571+
}

core/src/core_tests/queries.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
collections::{HashMap, VecDeque},
1111
time::Duration,
1212
};
13-
use temporal_sdk_core_api::Worker as WorkerTrait;
13+
use temporal_sdk_core_api::{Worker as WorkerTrait, worker::WorkerVersioningStrategy};
1414
use temporal_sdk_core_protos::{
1515
TestHistoryBuilder,
1616
coresdk::{
@@ -881,7 +881,9 @@ async fn build_id_set_properly_on_query_on_first_task() {
881881
let mut mock = build_mock_pollers(mh);
882882
mock.worker_cfg(|wc| {
883883
wc.max_cached_workflows = 10;
884-
wc.worker_build_id = "1.0".to_string();
884+
wc.versioning_strategy = WorkerVersioningStrategy::None {
885+
build_id: "1.0".to_owned(),
886+
}
885887
});
886888
let core = mock_worker(mock);
887889

core/src/core_tests/workflow_tasks.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use temporal_sdk_core_api::{
3535
errors::PollError,
3636
worker::{
3737
PollerBehavior, SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext,
38-
SlotSupplier, SlotSupplierPermit, WorkflowSlotKind,
38+
SlotSupplier, SlotSupplierPermit, WorkerVersioningStrategy, WorkflowSlotKind,
3939
},
4040
};
4141
use temporal_sdk_core_protos::{
@@ -2977,7 +2977,9 @@ async fn sets_build_id_from_wft_complete() {
29772977
let mut worker = mock_sdk_cfg(
29782978
MockPollCfg::from_resp_batches(wfid, t, [ResponseType::AllHistory], mock),
29792979
|cfg| {
2980-
cfg.worker_build_id = "fierce-predator".to_string();
2980+
cfg.versioning_strategy = WorkerVersioningStrategy::None {
2981+
build_id: "fierce-predator".to_string(),
2982+
};
29812983
cfg.max_cached_workflows = 1;
29822984
},
29832985
);

core/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,7 @@ where
9898
client,
9999
worker_config.namespace.clone(),
100100
client_ident,
101-
worker_config.worker_build_id.clone(),
102-
worker_config.use_worker_versioning,
101+
worker_config.versioning_strategy.clone(),
103102
));
104103

105104
Ok(Worker::new(

core/src/test_help/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@ use std::{
3131
time::Duration,
3232
};
3333
use temporal_sdk::interceptors::FailOnNondeterminismInterceptor;
34-
use temporal_sdk_core_api::{Worker as WorkerTrait, errors::PollError, worker::PollerBehavior};
34+
use temporal_sdk_core_api::{
35+
Worker as WorkerTrait,
36+
errors::PollError,
37+
worker::{PollerBehavior, WorkerVersioningStrategy},
38+
};
3539
use temporal_sdk_core_protos::{
3640
coresdk::{
3741
workflow_activation::{WorkflowActivation, workflow_activation_job},
@@ -62,7 +66,9 @@ pub(crate) fn test_worker_cfg() -> WorkerConfigBuilder {
6266
let mut wcb = WorkerConfigBuilder::default();
6367
wcb.namespace(NAMESPACE)
6468
.task_queue(TEST_Q)
65-
.worker_build_id("test_bin_id")
69+
.versioning_strategy(WorkerVersioningStrategy::None {
70+
build_id: "test_bin_id".to_string(),
71+
})
6672
.ignore_evicts_on_shutdown(true)
6773
// Serial polling since it makes mocking much easier.
6874
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize));

core/src/worker/client.rs

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use temporal_client::{
77
Client, IsWorkerTaskLongPoll, Namespace, NamespacedClient, NoRetryOnMatching, RetryClient,
88
SlotManager, WorkflowService,
99
};
10+
use temporal_sdk_core_api::worker::WorkerVersioningStrategy;
1011
use temporal_sdk_core_protos::{
1112
TaskToken,
1213
coresdk::workflow_commands::QueryResult,
@@ -16,7 +17,10 @@ use temporal_sdk_core_protos::{
1617
MeteringMetadata, Payloads, WorkerVersionCapabilities, WorkerVersionStamp,
1718
WorkflowExecution,
1819
},
19-
enums::v1::{TaskQueueKind, WorkflowTaskFailedCause},
20+
deployment,
21+
enums::v1::{
22+
TaskQueueKind, VersioningBehavior, WorkerVersioningMode, WorkflowTaskFailedCause,
23+
},
2024
failure::v1::Failure,
2125
nexus,
2226
protocol::v1::Message as ProtocolMessage,
@@ -35,24 +39,21 @@ pub(crate) struct WorkerClientBag {
3539
replaceable_client: RwLock<RetryClient<Client>>,
3640
namespace: String,
3741
identity: String,
38-
worker_build_id: String,
39-
use_versioning: bool,
42+
worker_versioning_strategy: WorkerVersioningStrategy,
4043
}
4144

4245
impl WorkerClientBag {
4346
pub(crate) fn new(
4447
client: RetryClient<Client>,
4548
namespace: String,
4649
identity: String,
47-
worker_build_id: String,
48-
use_versioning: bool,
50+
worker_versioning_strategy: WorkerVersioningStrategy,
4951
) -> Self {
5052
Self {
5153
replaceable_client: RwLock::new(client),
5254
namespace,
5355
identity,
54-
worker_build_id,
55-
use_versioning,
56+
worker_versioning_strategy,
5657
}
5758
}
5859

@@ -68,16 +69,34 @@ impl WorkerClientBag {
6869
if self.default_capabilities().build_id_based_versioning {
6970
"".to_string()
7071
} else {
71-
self.worker_build_id.clone()
72+
self.worker_versioning_strategy.build_id().to_owned()
73+
}
74+
}
75+
76+
fn deployment_options(&self) -> Option<deployment::v1::WorkerDeploymentOptions> {
77+
match &self.worker_versioning_strategy {
78+
WorkerVersioningStrategy::WorkerDeploymentBased(dopts) => {
79+
Some(deployment::v1::WorkerDeploymentOptions {
80+
deployment_name: dopts.version.deployment_name.clone(),
81+
build_id: dopts.version.build_id.clone(),
82+
worker_versioning_mode: if dopts.use_worker_versioning {
83+
WorkerVersioningMode::Versioned.into()
84+
} else {
85+
WorkerVersioningMode::Unversioned.into()
86+
},
87+
})
88+
}
89+
_ => None,
7290
}
7391
}
7492

7593
fn worker_version_capabilities(&self) -> Option<WorkerVersionCapabilities> {
7694
if self.default_capabilities().build_id_based_versioning {
7795
Some(WorkerVersionCapabilities {
78-
build_id: self.worker_build_id.clone(),
79-
use_versioning: self.use_versioning,
80-
// TODO: https://github.com/temporalio/sdk-core/issues/866
96+
build_id: self.worker_versioning_strategy.build_id().to_owned(),
97+
use_versioning: self.worker_versioning_strategy.uses_build_id_based(),
98+
// This will never be used, as it is the v3 version that we never supported in
99+
// Core SDKs.
81100
deployment_series_name: "".to_string(),
82101
})
83102
} else {
@@ -88,8 +107,8 @@ impl WorkerClientBag {
88107
fn worker_version_stamp(&self) -> Option<WorkerVersionStamp> {
89108
if self.default_capabilities().build_id_based_versioning {
90109
Some(WorkerVersionStamp {
91-
build_id: self.worker_build_id.clone(),
92-
use_versioning: self.use_versioning,
110+
build_id: self.worker_versioning_strategy.build_id().to_owned(),
111+
use_versioning: self.worker_versioning_strategy.uses_build_id_based(),
93112
})
94113
} else {
95114
None
@@ -221,8 +240,7 @@ impl WorkerClient for WorkerClientBag {
221240
identity: self.identity.clone(),
222241
binary_checksum: self.binary_checksum(),
223242
worker_version_capabilities: self.worker_version_capabilities(),
224-
// TODO: https://github.com/temporalio/sdk-core/issues/866
225-
deployment_options: None,
243+
deployment_options: self.deployment_options(),
226244
}
227245
.into_request();
228246
request.extensions_mut().insert(IsWorkerTaskLongPoll);
@@ -258,8 +276,7 @@ impl WorkerClient for WorkerClientBag {
258276
max_tasks_per_second: Some(tps),
259277
}),
260278
worker_version_capabilities: self.worker_version_capabilities(),
261-
// TODO: https://github.com/temporalio/sdk-core/issues/866
262-
deployment_options: None,
279+
deployment_options: self.deployment_options(),
263280
}
264281
.into_request();
265282
request.extensions_mut().insert(IsWorkerTaskLongPoll);
@@ -291,8 +308,7 @@ impl WorkerClient for WorkerClientBag {
291308
}),
292309
identity: self.identity.clone(),
293310
worker_version_capabilities: self.worker_version_capabilities(),
294-
// TODO: https://github.com/temporalio/sdk-core/issues/866
295-
deployment_options: None,
311+
deployment_options: self.deployment_options(),
296312
}
297313
.into_request();
298314
request.extensions_mut().insert(IsWorkerTaskLongPoll);
@@ -348,11 +364,10 @@ impl WorkerClient for WorkerClientBag {
348364
capabilities: Some(respond_workflow_task_completed_request::Capabilities {
349365
discard_speculative_workflow_task_with_events: true,
350366
}),
351-
// TODO: https://github.com/temporalio/sdk-core/issues/866
367+
// Will never be set, deprecated.
352368
deployment: None,
353-
versioning_behavior: 0,
354-
// TODO: https://github.com/temporalio/sdk-core/issues/866
355-
deployment_options: None,
369+
versioning_behavior: request.versioning_behavior.into(),
370+
deployment_options: self.deployment_options(),
356371
};
357372
Ok(self
358373
.cloned_client()
@@ -638,4 +653,6 @@ pub(crate) struct WorkflowTaskCompletion {
638653
pub(crate) sdk_metadata: WorkflowTaskCompletedMetadata,
639654
/// Metering info
640655
pub(crate) metering_metadata: MeteringMetadata,
656+
/// Versioning behavior of the workflow, if any.
657+
pub(crate) versioning_behavior: VersioningBehavior,
641658
}

0 commit comments

Comments
 (0)