Skip to content

Commit 6384c8c

Browse files
committed
Replace client test passes, need to write unit tests in worker_registry
1 parent 61bc366 commit 6384c8c

10 files changed

Lines changed: 309 additions & 203 deletions

File tree

client/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub use temporal_sdk_core_protos::temporal::api::{
3333
};
3434
pub use tonic;
3535
pub use worker_registry::{
36-
ClientWorkerSet, SharedNamespaceWorkerTrait, Slot, SlotProvider, WorkerKey,
36+
ClientWorker, ClientWorkerSet, HeartbeatCallback, SharedNamespaceWorkerTrait, Slot, WorkerKey,
3737
};
3838
pub use workflow_handle::{
3939
GetWorkflowResultOpts, WorkflowExecutionInfo, WorkflowExecutionResult, WorkflowHandle,

client/src/worker_registry/mod.rs

Lines changed: 144 additions & 98 deletions
Large diffs are not rendered by default.

core/src/core_tests/workflow_tasks.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2996,6 +2996,7 @@ async fn both_normal_and_sticky_pollers_poll_concurrently() {
29962996
Arc::new(mock_client),
29972997
None,
29982998
None,
2999+
false,
29993000
);
30003001

30013002
for _ in 1..50 {

core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ use temporal_sdk_core_api::{
7070
telemetry::TelemetryOptions,
7171
};
7272
use temporal_sdk_core_protos::coresdk::ActivityHeartbeat;
73-
use uuid::Uuid;
7473

7574
/// Initialize a worker bound to a task queue.
7675
///
@@ -124,6 +123,7 @@ where
124123
client_bag.clone(),
125124
Some(&runtime.telemetry),
126125
runtime.heartbeat_interval,
126+
false,
127127
);
128128

129129
Ok(worker)

core/src/replay/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ where
114114
hist_allow_tx.send("Failed".to_string()).unwrap();
115115
async move { Ok(RespondWorkflowTaskFailedResponse::default()) }.boxed()
116116
});
117-
let mut worker = Worker::new(self.config, None, Arc::new(client), None, None);
117+
let mut worker = Worker::new(self.config, None, Arc::new(client), None, None, false);
118118
worker.set_post_activate_hook(post_activate);
119119
shutdown_tok(worker.shutdown_token());
120120
Ok(worker)

core/src/test_help/integ_helpers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ pub fn mock_worker(mocks: MocksHolder) -> Worker {
205205
},
206206
None,
207207
None,
208+
false,
208209
)
209210
}
210211

core/src/worker/heartbeat.rs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use temporal_client::SharedNamespaceWorkerTrait;
1212
use temporal_sdk_core_api::worker::{WorkerConfigBuilder, WorkerVersioningStrategy};
1313
use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHeartbeat;
1414
use tokio::sync::Notify;
15+
use tokio_util::sync::CancellationToken;
1516

1617
/// Callback used to collect heartbeat data from each worker at the time of heartbeat
1718
pub(crate) type HeartbeatFn = Box<dyn Fn() -> WorkerHeartbeat + Send + Sync>;
@@ -22,6 +23,7 @@ pub(crate) type HeartbeatFn = Box<dyn Fn() -> WorkerHeartbeat + Send + Sync>;
2223
pub(crate) struct SharedNamespaceWorker {
2324
heartbeat_map: Arc<Mutex<HashMap<String, HeartbeatFn>>>,
2425
namespace: String,
26+
cancel: CancellationToken,
2527
}
2628

2729
impl SharedNamespaceWorker {
@@ -51,12 +53,16 @@ impl SharedNamespaceWorker {
5153
TaskPollers::Real,
5254
telemetry,
5355
None,
56+
true,
5457
);
5558

5659
let last_heartbeat_time_map = Mutex::new(HashMap::new());
5760

5861
// heartbeat task
5962
let reset_notify = Arc::new(Notify::new());
63+
let cancel = CancellationToken::new();
64+
let child = cancel.child_token();
65+
6066
let client_clone = client;
6167
let namespace_clone = namespace.clone();
6268

@@ -67,15 +73,6 @@ impl SharedNamespaceWorker {
6773
let mut ticker = tokio::time::interval(heartbeat_interval);
6874
ticker.tick().await;
6975
loop {
70-
// TODO: Race condition here, can technically shut down before anything is ever initialized
71-
if heartbeat_map_clone.lock().is_empty() {
72-
println!(
73-
"// TODO: Race condition here, can technically shut down before anything is ever initialized"
74-
);
75-
worker.shutdown().await;
76-
return;
77-
}
78-
7976
tokio::select! {
8077
_ = ticker.tick() => {
8178
let mut hb_to_send = Vec::new();
@@ -94,10 +91,18 @@ impl SharedNamespaceWorker {
9491
} else {
9592
None
9693
};
94+
let sdk_name_and_ver = client_clone.sdk_name_and_version();
9795

9896
heartbeat.elapsed_since_last_heartbeat = elapsed_since_last_heartbeat;
9997
heartbeat.heartbeat_time = Some(now.into());
10098

99+
// All of these heartbeat details rely on a client. Due to type limitations
100+
// from the dependency graph of worker_registry, this must be populated
101+
// from within SharedNamespaceWorker to get the current and proper client
102+
heartbeat.worker_identity = client_clone.identity();
103+
heartbeat.sdk_name = sdk_name_and_ver.0;
104+
heartbeat.sdk_version = sdk_name_and_ver.1;
105+
101106
hb_to_send.push(heartbeat);
102107

103108
last_heartbeat_time_map.insert(instance_key.clone(), now);
@@ -115,13 +120,18 @@ impl SharedNamespaceWorker {
115120
_ = reset_notify.notified() => {
116121
ticker.reset();
117122
}
123+
_ = child.cancelled() => {
124+
worker.shutdown().await;
125+
return;
126+
}
118127
}
119128
}
120129
});
121130

122131
Self {
123132
heartbeat_map,
124133
namespace,
134+
cancel,
125135
}
126136
}
127137
}
@@ -136,10 +146,11 @@ impl SharedNamespaceWorkerTrait for SharedNamespaceWorker {
136146
worker_instance_key: String,
137147
) -> (Option<Box<dyn Fn() -> WorkerHeartbeat + Send + Sync>>, bool) {
138148
let mut heartbeat_map = self.heartbeat_map.lock();
139-
(
140-
heartbeat_map.remove(&worker_instance_key),
141-
heartbeat_map.is_empty(),
142-
)
149+
let heartbeat_callback = heartbeat_map.remove(&worker_instance_key);
150+
if heartbeat_map.is_empty() {
151+
self.cancel.cancel();
152+
}
153+
(heartbeat_callback, heartbeat_map.is_empty())
143154
}
144155
fn register_callback(
145156
&self,
@@ -230,6 +241,7 @@ mod tests {
230241
client.clone(),
231242
None,
232243
Some(Duration::from_millis(100)),
244+
false,
233245
);
234246

235247
tokio::time::sleep(Duration::from_millis(250)).await;

0 commit comments

Comments
 (0)