Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ pub use temporal_sdk_core_protos::temporal::api::{
},
};
pub use tonic;
pub use worker_registry::{Slot, SlotManager, SlotProvider, WorkerKey};
pub use worker_registry::{
ClientWorkerSet, SharedNamespaceWorkerTrait, Slot, SlotProvider, WorkerKey,
};
pub use workflow_handle::{
GetWorkflowResultOpts, WorkflowExecutionInfo, WorkflowExecutionResult, WorkflowHandle,
};
Expand Down Expand Up @@ -388,7 +390,7 @@ pub struct ConfiguredClient<C> {
headers: Arc<RwLock<ClientHeaders>>,
/// Capabilities as read from the `get_system_info` RPC call made on client connection
capabilities: Option<get_system_info_response::Capabilities>,
workers: Arc<SlotManager>,
workers: Arc<ClientWorkerSet>,
Comment thread
yuandrew marked this conversation as resolved.
}

impl<C> ConfiguredClient<C> {
Expand Down Expand Up @@ -438,7 +440,7 @@ impl<C> ConfiguredClient<C> {
}

/// Returns a cloned reference to a registry with workers using this client instance
pub fn workers(&self) -> Arc<SlotManager> {
pub fn workers(&self) -> Arc<ClientWorkerSet> {
self.workers.clone()
}
}
Expand Down Expand Up @@ -497,9 +499,10 @@ impl ClientOptions {
&self,
namespace: impl Into<String>,
metrics_meter: Option<TemporalMeter>,
process_key: Uuid,
Comment thread
yuandrew marked this conversation as resolved.
Outdated
) -> Result<RetryClient<Client>, ClientInitError> {
let client = self.connect_no_namespace(metrics_meter).await?.into_inner();
let client = Client::new(client, namespace.into());
let client = Client::new(client, namespace.into(), process_key);
let retry_client = RetryClient::new(client, self.retry_config.clone());
Ok(retry_client)
}
Expand Down Expand Up @@ -584,7 +587,7 @@ impl ClientOptions {
client: TemporalServiceClient::new(svc),
options: Arc::new(self.clone()),
capabilities: None,
workers: Arc::new(SlotManager::new()),
workers: Arc::new(ClientWorkerSet::new()),
};
if !self.skip_get_system_info {
match client
Expand Down Expand Up @@ -848,17 +851,21 @@ pub struct Client {
inner: ConfiguredClient<TemporalServiceClientWithMetrics>,
/// The namespace this client interacts with
namespace: String,
/// Process-wide key, used for worker heartbeating
process_key: Uuid,
}

impl Client {
/// Create a new client from an existing configured lower level client and a namespace
pub fn new(
client: ConfiguredClient<TemporalServiceClientWithMetrics>,
namespace: String,
process_key: Uuid,
) -> Self {
Client {
inner: client,
namespace,
process_key,
}
}

Expand Down Expand Up @@ -901,6 +908,11 @@ impl Client {
pub fn into_inner(self) -> ConfiguredClient<TemporalServiceClientWithMetrics> {
self.inner
}

/// Returns the process-wide key
pub fn process_key(&self) -> Uuid {
self.process_key
}
}

impl NamespacedClient for Client {
Expand Down
12 changes: 6 additions & 6 deletions client/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
TEMPORAL_NAMESPACE_HEADER_KEY, TemporalServiceClient,
metrics::{namespace_kv, task_queue_kv},
raw::sealed::RawClientLike,
worker_registry::{Slot, SlotManager},
worker_registry::{ClientWorkerSet, Slot},
};
use futures_util::{FutureExt, TryFutureExt, future::BoxFuture};
use std::sync::Arc;
Expand Down Expand Up @@ -68,7 +68,7 @@ pub(super) mod sealed {
fn health_client_mut(&mut self) -> &mut HealthClient<Self::SvcType>;

/// Return a registry with workers using this client instance
fn get_workers_info(&self) -> Option<Arc<SlotManager>>;
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>>;

async fn call<F, Req, Resp>(
&mut self,
Expand Down Expand Up @@ -134,7 +134,7 @@ where
self.get_client_mut().health_client_mut()
}

fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
self.get_client().get_workers_info()
}

Expand Down Expand Up @@ -213,7 +213,7 @@ where
self.health_svc_mut()
}

fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
None
}
}
Expand Down Expand Up @@ -268,7 +268,7 @@ where
self.client.health_client_mut()
}

fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
Some(self.workers())
}
}
Expand Down Expand Up @@ -316,7 +316,7 @@ impl RawClientLike for Client {
self.inner.health_client_mut()
}

fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
fn get_workers_info(&self) -> Option<Arc<ClientWorkerSet>> {
self.inner.get_workers_info()
}
}
Expand Down
110 changes: 86 additions & 24 deletions client/src/worker_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
//! This is needed to implement Eager Workflow Start, a latency optimization in which the client,
//! after reserving a slot, directly forwards a WFT to a local worker.

use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use slotmap::SlotMap;
use std::collections::{HashMap, hash_map::Entry::Vacant};

use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHeartbeat;
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse;

slotmap::new_key_type! {
Expand Down Expand Up @@ -49,7 +49,7 @@ impl SlotKey {
}
}

/// This is an inner class for [SlotManager] needed to hide the mutex.
/// This is an inner class for [ClientWorkerSet] needed to hide the mutex.
#[derive(Default, Debug)]
struct SlotManagerImpl {
/// Maps keys, i.e., namespace#task_queue, to provider.
Expand Down Expand Up @@ -109,19 +109,42 @@ impl SlotManagerImpl {
}
}

/// This trait represents a shared namespace worker that sends worker heartbeats and worker commands.
Comment thread
yuandrew marked this conversation as resolved.
Outdated
pub trait SharedNamespaceWorkerTrait: std::fmt::Debug {
/// Namespace that the shared namespace worker is connected to.
fn namespace(&self) -> String;

/// Unregisters a heartbeat callback. Returns the callback removed, as well as a bool that
/// indicates if there are no remaining callbacks in the SharedNamespaceWorker, indicating
/// the shared worker itself can be shut down.
fn unregister_callback(
&self,
worker_instance_key: String,
) -> (Option<Box<dyn Fn() -> WorkerHeartbeat + Send + Sync>>, bool);

/// Registers a heartbeat callback.
fn register_callback(
Comment thread
yuandrew marked this conversation as resolved.
Outdated
&self,
worker_instance_key: String,
heartbeat_callback: Box<dyn Fn() -> WorkerHeartbeat + Send + Sync>,
);
}

/// Enables local workers to make themselves visible to a shared client instance.
/// There can only be one worker registered per namespace+queue_name+client, others will get ignored.
/// It also provides a convenient method to find compatible slots within the collection.
#[derive(Default, Debug)]
pub struct SlotManager {
manager: RwLock<SlotManagerImpl>,
pub struct ClientWorkerSet {
slot_manager: RwLock<SlotManagerImpl>,
heartbeat_manager: Mutex<HashMap<String, Box<dyn SharedNamespaceWorkerTrait + Send + Sync>>>,
}

impl SlotManager {
impl ClientWorkerSet {
/// Factory method.
pub fn new() -> Self {
Self {
manager: RwLock::new(SlotManagerImpl::new()),
slot_manager: RwLock::new(SlotManagerImpl::new()),
heartbeat_manager: Mutex::new(HashMap::new()),
}
}

Expand All @@ -131,26 +154,65 @@ impl SlotManager {
namespace: String,
task_queue: String,
) -> Option<Box<dyn Slot + Send>> {
self.manager
self.slot_manager
.read()
.try_reserve_wft_slot(namespace, task_queue)
}

/// Register a local worker that can provide WFT processing slots.
pub fn register(&self, provider: Box<dyn SlotProvider + Send + Sync>) -> Option<WorkerKey> {
self.manager.write().register(provider)
pub fn register_slot(
Comment thread
yuandrew marked this conversation as resolved.
Outdated
&self,
provider: Box<dyn SlotProvider + Send + Sync>,
) -> Option<WorkerKey> {
self.slot_manager.write().register(provider)
}

/// Unregister a provider, typically when its worker starts shutdown.
pub fn unregister(&self, id: WorkerKey) -> Option<Box<dyn SlotProvider + Send + Sync>> {
self.manager.write().unregister(id)
pub fn unregister_slot(&self, id: WorkerKey) -> Option<Box<dyn SlotProvider + Send + Sync>> {
self.slot_manager.write().unregister(id)
}

/// Register a worker with the worker heartbeat manager.
pub fn register_heartbeat_worker(
&self,
namespace: String,
worker_instance_key: String,
heartbeat_callback: Box<dyn Fn() -> WorkerHeartbeat + Send + Sync>,
shared_worker_callback: impl Fn() -> Box<dyn SharedNamespaceWorkerTrait + Send + Sync>,
Comment thread
yuandrew marked this conversation as resolved.
Outdated
) {
let mut shared_namespace_map = self.heartbeat_manager.lock();
let worker = shared_namespace_map
.entry(namespace)
.or_insert_with(|| shared_worker_callback());
worker.register_callback(worker_instance_key, heartbeat_callback)
}

/// Unregister a worker with the worker heartbeat manager.
pub fn unregister_heartbeat_worker(
&self,
namespace: String,
worker_instance_key: String,
) -> Option<Box<dyn Fn() -> WorkerHeartbeat + Send + Sync>> {
let mut heartbeat_manager = self.heartbeat_manager.lock();
if let Some(shared_worker) = heartbeat_manager.get(&namespace) {
let (callback, is_empty) = shared_worker.unregister_callback(worker_instance_key);
if is_empty {
heartbeat_manager.remove(&namespace);
}
callback
} else {
warn!(
"Namespace {namespace} isn't registered to client worker heartbeat, ignoring unregister."
);
None
}
}

#[cfg(test)]
/// Returns (num_providers, num_buckets), where a bucket key is namespace+task_queue.
/// There is only one provider per bucket so `num_providers` should be equal to `num_buckets`.
pub fn num_providers(&self) -> (usize, usize) {
self.manager.read().num_providers()
self.slot_manager.read().num_providers()
}
}

Expand Down Expand Up @@ -197,9 +259,9 @@ mod tests {
new_mock_provider("foo".to_string(), "bar_q".to_string(), false, false);
let mock_provider2 = new_mock_provider("foo".to_string(), "bar_q".to_string(), false, true);

let manager = SlotManager::new();
let some_slots = manager.register(Box::new(mock_provider1));
let no_slots = manager.register(Box::new(mock_provider2));
let manager = ClientWorkerSet::new();
let some_slots = manager.register_slot(Box::new(mock_provider1));
let no_slots = manager.register_slot(Box::new(mock_provider2));
assert!(no_slots.is_none());

let mut found = 0;
Expand All @@ -214,15 +276,15 @@ mod tests {
assert_eq!(found, 10);
assert_eq!((1, 1), manager.num_providers());

manager.unregister(some_slots.unwrap());
manager.unregister_slot(some_slots.unwrap());
assert_eq!((0, 0), manager.num_providers());

let mock_provider1 =
new_mock_provider("foo".to_string(), "bar_q".to_string(), false, false);
let mock_provider2 = new_mock_provider("foo".to_string(), "bar_q".to_string(), false, true);

let no_slots = manager.register(Box::new(mock_provider2));
let some_slots = manager.register(Box::new(mock_provider1));
let no_slots = manager.register_slot(Box::new(mock_provider2));
let some_slots = manager.register_slot(Box::new(mock_provider1));
assert!(some_slots.is_none());

let mut not_found = 0;
Expand All @@ -236,28 +298,28 @@ mod tests {
}
assert_eq!(not_found, 10);
assert_eq!((1, 1), manager.num_providers());
manager.unregister(no_slots.unwrap());
manager.unregister_slot(no_slots.unwrap());
assert_eq!((0, 0), manager.num_providers());
}

#[test]
fn registry_keeps_one_provider_per_namespace() {
let manager = SlotManager::new();
let manager = ClientWorkerSet::new();
let mut worker_keys = vec![];
for i in 0..10 {
let namespace = format!("myId{}", i % 3);
let mock_provider = new_mock_provider(namespace, "bar_q".to_string(), false, false);
worker_keys.push(manager.register(Box::new(mock_provider)));
worker_keys.push(manager.register_slot(Box::new(mock_provider)));
}
assert_eq!((3, 3), manager.num_providers());

let count = worker_keys
.iter()
.filter(|key| key.is_some())
.fold(0, |count, key| {
manager.unregister(key.unwrap());
manager.unregister_slot(key.unwrap());
// Should be idempotent
manager.unregister(key.unwrap());
manager.unregister_slot(key.unwrap());
count + 1
});
assert_eq!(3, count);
Expand Down
6 changes: 0 additions & 6 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,6 @@ pub struct WorkerConfig {

/// A versioning strategy for this worker.
pub versioning_strategy: WorkerVersioningStrategy,

/// The interval within which the worker will send a heartbeat.
/// The timer is reset on each existing RPC call that also happens to send this data, like
/// `PollWorkflowTaskQueueRequest`.
#[builder(default)]
pub heartbeat_interval: Option<Duration>,
}

impl WorkerConfig {
Expand Down
1 change: 1 addition & 0 deletions core-c-bridge/include/temporal-sdk-core-c-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ typedef struct TemporalCoreTelemetryOptions {

typedef struct TemporalCoreRuntimeOptions {
const struct TemporalCoreTelemetryOptions *telemetry;
uint64_t worker_heartbeat_duration_millis;
} TemporalCoreRuntimeOptions;

typedef struct TemporalCoreTestServerOptions {
Expand Down
Loading
Loading