Skip to content

Commit 61bc366

Browse files
committed
Switch to worker_set_key
1 parent c84987c commit 61bc366

15 files changed

Lines changed: 50 additions & 73 deletions

File tree

client/src/lib.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,11 @@ impl<C> ConfiguredClient<C> {
443443
pub fn workers(&self) -> Arc<ClientWorkerSet> {
444444
self.workers.clone()
445445
}
446+
447+
/// Returns the worker set key, this should be unique across each client
448+
pub fn worker_set_key(&self) -> Uuid {
449+
self.workers.worker_set_key()
450+
}
446451
}
447452

448453
#[derive(Debug)]
@@ -499,10 +504,9 @@ impl ClientOptions {
499504
&self,
500505
namespace: impl Into<String>,
501506
metrics_meter: Option<TemporalMeter>,
502-
process_key: Uuid,
503507
) -> Result<RetryClient<Client>, ClientInitError> {
504508
let client = self.connect_no_namespace(metrics_meter).await?.into_inner();
505-
let client = Client::new(client, namespace.into(), process_key);
509+
let client = Client::new(client, namespace.into());
506510
let retry_client = RetryClient::new(client, self.retry_config.clone());
507511
Ok(retry_client)
508512
}
@@ -851,21 +855,17 @@ pub struct Client {
851855
inner: ConfiguredClient<TemporalServiceClientWithMetrics>,
852856
/// The namespace this client interacts with
853857
namespace: String,
854-
/// Process-wide key, used for worker heartbeating
855-
process_key: Uuid,
856858
}
857859

858860
impl Client {
859861
/// Create a new client from an existing configured lower level client and a namespace
860862
pub fn new(
861863
client: ConfiguredClient<TemporalServiceClientWithMetrics>,
862864
namespace: String,
863-
process_key: Uuid,
864865
) -> Self {
865866
Client {
866867
inner: client,
867868
namespace,
868-
process_key,
869869
}
870870
}
871871

@@ -910,8 +910,8 @@ impl Client {
910910
}
911911

912912
/// Returns the process-wide key
913-
pub fn process_key(&self) -> Uuid {
914-
self.process_key
913+
pub fn worker_set_key(&self) -> Uuid {
914+
self.inner.worker_set_key()
915915
}
916916
}
917917

client/src/worker_registry/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use slotmap::SlotMap;
77
use std::collections::{HashMap, hash_map::Entry::Vacant};
88
use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHeartbeat;
99
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse;
10+
use uuid::Uuid;
1011

1112
slotmap::new_key_type! {
1213
/// Registration key for a worker
@@ -131,12 +132,15 @@ pub trait SharedNamespaceWorkerTrait: std::fmt::Debug {
131132
}
132133

133134
/// Enables local workers to make themselves visible to a shared client instance.
134-
/// There can only be one worker registered per namespace+queue_name+client, others will get ignored.
135+
///
136+
/// For slot managing, there can only be one worker registered per
137+
/// namespace+queue_name+client, others will get ignored.
135138
/// It also provides a convenient method to find compatible slots within the collection.
136139
#[derive(Default, Debug)]
137140
pub struct ClientWorkerSet {
138141
slot_manager: RwLock<SlotManagerImpl>,
139142
heartbeat_manager: Mutex<HashMap<String, Box<dyn SharedNamespaceWorkerTrait + Send + Sync>>>,
143+
worker_set_key: Uuid,
140144
}
141145

142146
impl ClientWorkerSet {
@@ -145,6 +149,7 @@ impl ClientWorkerSet {
145149
Self {
146150
slot_manager: RwLock::new(SlotManagerImpl::new()),
147151
heartbeat_manager: Mutex::new(HashMap::new()),
152+
worker_set_key: Uuid::new_v4(),
148153
}
149154
}
150155

@@ -208,6 +213,11 @@ impl ClientWorkerSet {
208213
}
209214
}
210215

216+
/// Returns the worker set key, which is unique for each client. Used for worker heartbeating
217+
pub fn worker_set_key(&self) -> Uuid {
218+
self.worker_set_key
219+
}
220+
211221
#[cfg(test)]
212222
/// Returns (num_providers, num_buckets), where a bucket key is namespace+task_queue.
213223
/// There is only one provider per bucket so `num_providers` should be equal to `num_buckets`.

core/src/core_tests/workers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool)
315315
mock.expect_is_mock().returning(|| true);
316316
mock.expect_sdk_name_and_version()
317317
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
318-
mock.expect_get_identity()
318+
mock.expect_identity()
319319
.returning(|| "test-identity".to_string());
320320
if use_cache {
321321
if api_success {

core/src/lib.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ where
9696
worker_config.namespace.clone(),
9797
worker_config.client_identity_override.clone(),
9898
client_inner,
99-
runtime.process_key(),
10099
);
101100
let namespace = worker_config.namespace.clone();
102101
if client.namespace() != namespace {
@@ -151,9 +150,8 @@ pub(crate) fn init_worker_client(
151150
namespace: String,
152151
client_identity_override: Option<String>,
153152
client: ConfiguredClient<TemporalServiceClientWithMetrics>,
154-
process_key: Uuid,
155153
) -> RetryClient<Client> {
156-
let mut client = Client::new(client, namespace, process_key);
154+
let mut client = Client::new(client, namespace);
157155
if let Some(ref id_override) = client_identity_override {
158156
client.options_mut().identity.clone_from(id_override);
159157
}
@@ -228,7 +226,6 @@ pub struct CoreRuntime {
228226
telemetry: TelemetryInstance,
229227
runtime: Option<tokio::runtime::Runtime>,
230228
runtime_handle: tokio::runtime::Handle,
231-
process_key: Uuid,
232229
heartbeat_interval: Option<Duration>,
233230
}
234231

@@ -335,7 +332,6 @@ impl CoreRuntime {
335332
telemetry,
336333
runtime: None,
337334
runtime_handle,
338-
process_key: Uuid::new_v4(),
339335
heartbeat_interval,
340336
}
341337
}
@@ -350,11 +346,6 @@ impl CoreRuntime {
350346
&self.telemetry
351347
}
352348

353-
/// Return a process-wide unique key
354-
pub fn process_key(&self) -> Uuid {
355-
self.process_key
356-
}
357-
358349
/// Return a mutable reference to the owned [TelemetryInstance]
359350
pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance {
360351
&mut self.telemetry

core/src/worker/client.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -230,9 +230,9 @@ pub trait WorkerClient: Sync + Send {
230230
/// Return name and version of the SDK
231231
fn sdk_name_and_version(&self) -> (String, String);
232232
/// Get worker identity
233-
fn get_identity(&self) -> String;
234-
/// Get process key
235-
fn get_process_key(&self) -> Uuid;
233+
fn identity(&self) -> String;
234+
/// Get worker set key
235+
fn worker_set_key(&self) -> Uuid;
236236
}
237237

238238
/// Configuration options shared by workflow, activity, and Nexus polling calls
@@ -698,12 +698,12 @@ impl WorkerClient for WorkerClientBag {
698698
(opts.client_name.clone(), opts.client_version.clone())
699699
}
700700

701-
fn get_identity(&self) -> String {
701+
fn identity(&self) -> String {
702702
self.identity.clone()
703703
}
704704

705-
fn get_process_key(&self) -> Uuid {
706-
self.replaceable_client.read().get_client().process_key()
705+
fn worker_set_key(&self) -> Uuid {
706+
self.replaceable_client.read().get_client().worker_set_key()
707707
}
708708
}
709709

core/src/worker/client/mocks.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ pub fn mock_worker_client() -> MockWorkerClient {
3333
.returning(|_| Ok(ShutdownWorkerResponse {}));
3434
r.expect_sdk_name_and_version()
3535
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
36-
r.expect_get_identity()
36+
r.expect_identity()
3737
.returning(|| "test-identity".to_string());
38-
r.expect_get_process_key().returning(Uuid::new_v4);
38+
r.expect_worker_set_key().returning(Uuid::new_v4);
3939
r
4040
}
4141

@@ -49,7 +49,7 @@ pub(crate) fn mock_manual_worker_client() -> MockManualWorkerClient {
4949
r.expect_is_mock().returning(|| true);
5050
r.expect_sdk_name_and_version()
5151
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
52-
r.expect_get_identity()
52+
r.expect_identity()
5353
.returning(|| "test-identity".to_string());
5454
r
5555
}
@@ -162,7 +162,7 @@ mockall::mock! {
162162
fn workers(&self) -> Arc<ClientWorkerSet>;
163163
fn is_mock(&self) -> bool;
164164
fn sdk_name_and_version(&self) -> (String, String);
165-
fn get_identity(&self) -> String;
166-
fn get_process_key(&self) -> Uuid;
165+
fn identity(&self) -> String;
166+
fn worker_set_key(&self) -> Uuid;
167167
}
168168
}

core/src/worker/heartbeat.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl SharedNamespaceWorker {
3535
.namespace(namespace.clone())
3636
.task_queue(format!(
3737
"temporal-sys/worker-commands/{namespace}/{}",
38-
client.get_process_key()
38+
client.worker_set_key(),
3939
))
4040
.no_remote_activities(true)
4141
.max_outstanding_nexus_tasks(5_usize)

core/src/worker/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,6 @@ impl Worker {
342342
self.config.namespace.clone(),
343343
self.config.client_identity_override.clone(),
344344
new_client,
345-
self.client.get_process_key(),
346345
);
347346

348347
self.client.replace_client(new_worker_client);
@@ -428,7 +427,7 @@ impl Worker {
428427
let shutdown_token = CancellationToken::new();
429428
let slot_context_data = Arc::new(PermitDealerContextData {
430429
task_queue: config.task_queue.clone(),
431-
worker_identity: client.get_identity(),
430+
worker_identity: client.identity(),
432431
worker_deployment_version: config.computed_deployment_version(),
433432
});
434433
let wft_slots = MeteredPermitDealer::new(
@@ -592,7 +591,7 @@ impl Worker {
592591
let worker_heartbeat = if let Some(heartbeat_interval) = worker_heartbeat_interval {
593592
let worker_instance_key = Uuid::new_v4();
594593
let worker_instance_key_clone = worker_instance_key.to_string();
595-
let worker_identity = client.get_identity();
594+
let worker_identity = client.identity();
596595
let task_queue = config.task_queue.clone();
597596
let sdk_name_and_ver = sdk_name_and_ver.clone();
598597

sdk/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
//! let runtime_options = RuntimeOptionsBuilder::default().telemetry_options(telemetry_options).build().unwrap();
2626
//! let runtime = CoreRuntime::new_assume_tokio(runtime_options)?;
2727
//!
28-
//! let client = server_options.connect("default", None, runtime.process_key()).await?;
28+
//! let client = server_options.connect("default", None, runtime.runtime_key()).await?;
2929
//!
3030
//! let worker_config = WorkerConfigBuilder::default()
3131
//! .namespace("default")

tests/common/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ pub(crate) async fn get_cloud_client() -> RetryClient<Client> {
204204
sgo.connect(
205205
env::var("TEMPORAL_NAMESPACE").expect("TEMPORAL_NAMESPACE must be set"),
206206
None,
207-
Uuid::new_v4(),
208207
)
209208
.await
210209
.unwrap()
@@ -456,7 +455,6 @@ impl CoreWfStarter {
456455
.connect(
457456
cfg.namespace.clone(),
458457
rt.telemetry().get_temporal_metric_meter(),
459-
rt.process_key(),
460458
)
461459
.await
462460
.expect("Must connect"),

0 commit comments

Comments
 (0)