Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 101 additions & 23 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use std::{
sync::Arc,
time::Duration,
};
use temporal_sdk_core_protos::coresdk::{
ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo, WorkflowSlotInfo,
use temporal_sdk_core_protos::{
coresdk::{ActivitySlotInfo, LocalActivitySlotInfo, NexusSlotInfo, WorkflowSlotInfo},
temporal::api::enums::v1::VersioningBehavior,
};

/// Defines per-worker configuration options
Expand All @@ -19,9 +20,6 @@ pub struct WorkerConfig {
/// What task queue will this worker poll from? This task queue name will be used for both
/// workflow and activity polling.
pub task_queue: String,
/// A string that should be unique to the set of code this worker uses. IE: All the workflow,
/// activity, interceptor, and data converter code.
pub worker_build_id: String,
/// A human-readable string that can identify this worker. Using something like sdk version
/// and host name is a good default. If set, overrides the identity set (if any) on the client
/// used by this worker.
Expand Down Expand Up @@ -96,13 +94,6 @@ pub struct WorkerConfig {
#[builder(default)]
pub max_worker_activities_per_second: Option<f64>,

/// # UNDER DEVELOPMENT
/// If set to true this worker will opt-in to the whole-worker versioning feature.
/// `worker_build_id` will be used as the version.
/// todo: link to feature docs
#[builder(default = "false")]
pub use_worker_versioning: bool,

/// If set false (default), shutdown will not finish until all pending evictions have been
/// issued and replied to. If set true shutdown will be considered complete when the only
/// remaining work is pending evictions.
Expand Down Expand Up @@ -163,6 +154,9 @@ pub struct WorkerConfig {
/// Mutually exclusive with `tuner`
#[builder(setter(into, strip_option), default)]
pub max_outstanding_nexus_tasks: Option<usize>,

/// A versioning strategy for this worker.
pub versioning_strategy: WorkerVersioningStrategy,
}

impl WorkerConfig {
Expand All @@ -180,6 +174,10 @@ impl WorkerConfig {
.map(|s| s.contains(error_type))
.unwrap_or(false)
}

pub fn build_id(&self) -> &str {
self.versioning_strategy.build_id()
}
}

impl WorkerConfigBuilder {
Expand Down Expand Up @@ -218,18 +216,31 @@ impl WorkerConfigBuilder {
return Err("max_outstanding_* fields are mutually exclusive with `tuner`".to_owned());
}

if self.use_worker_versioning.unwrap_or_default()
&& self
.worker_build_id
.as_ref()
.map(|s| s.is_empty())
.unwrap_or_default()
{
return Err(
"`worker_build_id` must be non-empty when `use_worker_versioning` is true"
.to_owned(),
);
if let Some(wv) = self.versioning_strategy.as_ref() {
match wv {
WorkerVersioningStrategy::None { .. } => {}
WorkerVersioningStrategy::WorkerDeploymentBased(d) => {
if d.use_worker_versioning
&& (d.version.build_id.is_empty() || d.version.deployment_name.is_empty())
{
return Err(
"WorkerDeploymentVersion must have a non-empty build_id and \
deployment_name when deployment-based versioning is enabled"
.to_owned(),
);
}
}
WorkerVersioningStrategy::LegacyBuildIdBased { build_id } => {
if build_id.is_empty() {
return Err(
"Legacy build id-based versioning must have a non-empty build_id"
.to_owned(),
);
}
}
}
}

Ok(())
}
}
Expand Down Expand Up @@ -491,3 +502,70 @@ impl PollerBehavior {
Ok(())
}
}

#[derive(Clone, Debug)]
pub enum WorkerVersioningStrategy {
/// Don't enable any versioning
None {
/// Build ID may still be passed as a way to identify the worker, or may be left empty.
build_id: String,
},
/// Maybe use the modern deployment-based versioning, or just pass a deployment version.
WorkerDeploymentBased(WorkerDeploymentOptions),
/// Use the legacy build-id-based whole worker versioning.
LegacyBuildIdBased {
/// A Build ID to use, must be non-empty.
build_id: String,
},
Comment on lines +515 to +519
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would support a future where this option is deleted when versioning v2 is deleted

}

impl Default for WorkerVersioningStrategy {
fn default() -> Self {
WorkerVersioningStrategy::None {
build_id: String::new(),
}
}
}

impl WorkerVersioningStrategy {
pub fn build_id(&self) -> &str {
match self {
WorkerVersioningStrategy::None { build_id } => build_id,
WorkerVersioningStrategy::WorkerDeploymentBased(opts) => &opts.version.build_id,
WorkerVersioningStrategy::LegacyBuildIdBased { build_id } => build_id,
}
}

pub fn uses_build_id_based(&self) -> bool {
matches!(self, WorkerVersioningStrategy::LegacyBuildIdBased { .. })
}

pub fn default_versioning_behavior(&self) -> Option<VersioningBehavior> {
match self {
WorkerVersioningStrategy::WorkerDeploymentBased(opts) => {
opts.default_versioning_behavior
}
_ => None,
}
}
}

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct WorkerDeploymentOptions {
/// The deployment version of this worker.
pub version: WorkerDeploymentVersion,
/// If set, opts in to the Worker Deployment Versioning feature, meaning this worker will only
/// receive tasks for workflows it claims to be compatible with.
pub use_worker_versioning: bool,
/// The default versioning behavior to use for workflows that do not pass one to Core.
/// It is a startup-time error to specify `Some(Unspecified)` here.
pub default_versioning_behavior: Option<VersioningBehavior>,
}

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct WorkerDeploymentVersion {
/// Name of the deployment
pub deployment_name: String,
/// Build ID for the worker.
pub build_id: String,
}
6 changes: 4 additions & 2 deletions core/src/core_tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
collections::{HashMap, VecDeque},
time::Duration,
};
use temporal_sdk_core_api::Worker as WorkerTrait;
use temporal_sdk_core_api::{Worker as WorkerTrait, worker::WorkerVersioningStrategy};
use temporal_sdk_core_protos::{
TestHistoryBuilder,
coresdk::{
Expand Down Expand Up @@ -881,7 +881,9 @@ async fn build_id_set_properly_on_query_on_first_task() {
let mut mock = build_mock_pollers(mh);
mock.worker_cfg(|wc| {
wc.max_cached_workflows = 10;
wc.worker_build_id = "1.0".to_string();
wc.versioning_strategy = WorkerVersioningStrategy::None {
build_id: "1.0".to_owned(),
}
});
let core = mock_worker(mock);

Expand Down
6 changes: 4 additions & 2 deletions core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use temporal_sdk_core_api::{
errors::PollError,
worker::{
PollerBehavior, SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext,
SlotSupplier, SlotSupplierPermit, WorkflowSlotKind,
SlotSupplier, SlotSupplierPermit, WorkerVersioningStrategy, WorkflowSlotKind,
},
};
use temporal_sdk_core_protos::{
Expand Down Expand Up @@ -2977,7 +2977,9 @@ async fn sets_build_id_from_wft_complete() {
let mut worker = mock_sdk_cfg(
MockPollCfg::from_resp_batches(wfid, t, [ResponseType::AllHistory], mock),
|cfg| {
cfg.worker_build_id = "fierce-predator".to_string();
cfg.versioning_strategy = WorkerVersioningStrategy::None {
build_id: "fierce-predator".to_string(),
};
cfg.max_cached_workflows = 1;
},
);
Expand Down
3 changes: 1 addition & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ where
client,
worker_config.namespace.clone(),
client_ident,
worker_config.worker_build_id.clone(),
worker_config.use_worker_versioning,
worker_config.versioning_strategy.clone(),
));

Ok(Worker::new(
Expand Down
10 changes: 8 additions & 2 deletions core/src/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ use std::{
time::Duration,
};
use temporal_sdk::interceptors::FailOnNondeterminismInterceptor;
use temporal_sdk_core_api::{Worker as WorkerTrait, errors::PollError, worker::PollerBehavior};
use temporal_sdk_core_api::{
Worker as WorkerTrait,
errors::PollError,
worker::{PollerBehavior, WorkerVersioningStrategy},
};
use temporal_sdk_core_protos::{
coresdk::{
workflow_activation::{WorkflowActivation, workflow_activation_job},
Expand Down Expand Up @@ -62,7 +66,9 @@ pub(crate) fn test_worker_cfg() -> WorkerConfigBuilder {
let mut wcb = WorkerConfigBuilder::default();
wcb.namespace(NAMESPACE)
.task_queue(TEST_Q)
.worker_build_id("test_bin_id")
.versioning_strategy(WorkerVersioningStrategy::None {
build_id: "test_bin_id".to_string(),
})
.ignore_evicts_on_shutdown(true)
// Serial polling since it makes mocking much easier.
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize));
Expand Down
63 changes: 40 additions & 23 deletions core/src/worker/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use temporal_client::{
Client, IsWorkerTaskLongPoll, Namespace, NamespacedClient, NoRetryOnMatching, RetryClient,
SlotManager, WorkflowService,
};
use temporal_sdk_core_api::worker::WorkerVersioningStrategy;
use temporal_sdk_core_protos::{
TaskToken,
coresdk::workflow_commands::QueryResult,
Expand All @@ -16,7 +17,10 @@ use temporal_sdk_core_protos::{
MeteringMetadata, Payloads, WorkerVersionCapabilities, WorkerVersionStamp,
WorkflowExecution,
},
enums::v1::{TaskQueueKind, WorkflowTaskFailedCause},
deployment,
enums::v1::{
TaskQueueKind, VersioningBehavior, WorkerVersioningMode, WorkflowTaskFailedCause,
},
failure::v1::Failure,
nexus,
protocol::v1::Message as ProtocolMessage,
Expand All @@ -35,24 +39,21 @@ pub(crate) struct WorkerClientBag {
replaceable_client: RwLock<RetryClient<Client>>,
namespace: String,
identity: String,
worker_build_id: String,
use_versioning: bool,
worker_versioning_strategy: WorkerVersioningStrategy,
}

impl WorkerClientBag {
pub(crate) fn new(
client: RetryClient<Client>,
namespace: String,
identity: String,
worker_build_id: String,
use_versioning: bool,
worker_versioning_strategy: WorkerVersioningStrategy,
) -> Self {
Self {
replaceable_client: RwLock::new(client),
namespace,
identity,
worker_build_id,
use_versioning,
worker_versioning_strategy,
}
}

Expand All @@ -68,16 +69,34 @@ impl WorkerClientBag {
if self.default_capabilities().build_id_based_versioning {
"".to_string()
} else {
self.worker_build_id.clone()
self.worker_versioning_strategy.build_id().to_owned()
}
}

fn deployment_options(&self) -> Option<deployment::v1::WorkerDeploymentOptions> {
match &self.worker_versioning_strategy {
WorkerVersioningStrategy::WorkerDeploymentBased(dopts) => {
Some(deployment::v1::WorkerDeploymentOptions {
deployment_name: dopts.version.deployment_name.clone(),
build_id: dopts.version.build_id.clone(),
worker_versioning_mode: if dopts.use_worker_versioning {
WorkerVersioningMode::Versioned.into()
} else {
WorkerVersioningMode::Unversioned.into()
},
})
}
_ => None,
}
}

fn worker_version_capabilities(&self) -> Option<WorkerVersionCapabilities> {
if self.default_capabilities().build_id_based_versioning {
Some(WorkerVersionCapabilities {
build_id: self.worker_build_id.clone(),
use_versioning: self.use_versioning,
// TODO: https://github.com/temporalio/sdk-core/issues/866
build_id: self.worker_versioning_strategy.build_id().to_owned(),
use_versioning: self.worker_versioning_strategy.uses_build_id_based(),
// This will never be used, as it is the v3 version that we never supported in
// Core SDKs.
deployment_series_name: "".to_string(),
})
} else {
Expand All @@ -88,8 +107,8 @@ impl WorkerClientBag {
fn worker_version_stamp(&self) -> Option<WorkerVersionStamp> {
if self.default_capabilities().build_id_based_versioning {
Some(WorkerVersionStamp {
build_id: self.worker_build_id.clone(),
use_versioning: self.use_versioning,
build_id: self.worker_versioning_strategy.build_id().to_owned(),
use_versioning: self.worker_versioning_strategy.uses_build_id_based(),
})
} else {
None
Expand Down Expand Up @@ -221,8 +240,7 @@ impl WorkerClient for WorkerClientBag {
identity: self.identity.clone(),
binary_checksum: self.binary_checksum(),
worker_version_capabilities: self.worker_version_capabilities(),
// TODO: https://github.com/temporalio/sdk-core/issues/866
deployment_options: None,
deployment_options: self.deployment_options(),
}
.into_request();
request.extensions_mut().insert(IsWorkerTaskLongPoll);
Expand Down Expand Up @@ -258,8 +276,7 @@ impl WorkerClient for WorkerClientBag {
max_tasks_per_second: Some(tps),
}),
worker_version_capabilities: self.worker_version_capabilities(),
// TODO: https://github.com/temporalio/sdk-core/issues/866
deployment_options: None,
deployment_options: self.deployment_options(),
}
.into_request();
request.extensions_mut().insert(IsWorkerTaskLongPoll);
Expand Down Expand Up @@ -291,8 +308,7 @@ impl WorkerClient for WorkerClientBag {
}),
identity: self.identity.clone(),
worker_version_capabilities: self.worker_version_capabilities(),
// TODO: https://github.com/temporalio/sdk-core/issues/866
deployment_options: None,
deployment_options: self.deployment_options(),
}
.into_request();
request.extensions_mut().insert(IsWorkerTaskLongPoll);
Expand Down Expand Up @@ -348,11 +364,10 @@ impl WorkerClient for WorkerClientBag {
capabilities: Some(respond_workflow_task_completed_request::Capabilities {
discard_speculative_workflow_task_with_events: true,
}),
// TODO: https://github.com/temporalio/sdk-core/issues/866
// Will never be set, deprecated.
deployment: None,
versioning_behavior: 0,
// TODO: https://github.com/temporalio/sdk-core/issues/866
deployment_options: None,
versioning_behavior: request.versioning_behavior.into(),
deployment_options: self.deployment_options(),
};
Ok(self
.cloned_client()
Expand Down Expand Up @@ -638,4 +653,6 @@ pub(crate) struct WorkflowTaskCompletion {
pub(crate) sdk_metadata: WorkflowTaskCompletedMetadata,
/// Metering info
pub(crate) metering_metadata: MeteringMetadata,
/// Versioning behavior of the workflow, if any.
pub(crate) versioning_behavior: VersioningBehavior,
}
Loading
Loading