Skip to content

Commit 598a1b5

Browse files
committed
cargo fmt
1 parent 6c979b1 commit 598a1b5

6 files changed

Lines changed: 54 additions & 22 deletions

File tree

client/src/worker_registry/mod.rs

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22
//! This is needed to implement Eager Workflow Start, a latency optimization in which the client,
33
//! after reserving a slot, directly forwards a WFT to a local worker.
44
5+
use anyhow::bail;
56
use parking_lot::RwLock;
6-
use std::collections::{HashMap, hash_map::Entry::{ Occupied, Vacant}};
7+
use std::collections::{
8+
HashMap,
9+
hash_map::Entry::{Occupied, Vacant},
10+
};
711
use std::sync::Arc;
8-
use anyhow::bail;
912
use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHeartbeat;
1013
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse;
1114
use uuid::Uuid;
@@ -70,15 +73,21 @@ impl ClientWorkerSetImpl {
7073
None
7174
}
7275

73-
fn register(&mut self, worker: Arc<dyn ClientWorker + Send + Sync>) -> Result<(), anyhow::Error> {
76+
fn register(
77+
&mut self,
78+
worker: Arc<dyn ClientWorker + Send + Sync>,
79+
) -> Result<(), anyhow::Error> {
7480
let slot_key = SlotKey::new(
7581
worker.namespace().to_string(),
7682
worker.task_queue().to_string(),
7783
);
7884
if let Vacant(p) = self.slot_providers.entry(slot_key.clone()) {
7985
p.insert(worker.worker_instance_key());
8086
} else {
81-
bail!("Registration of multiple workers on the same namespace and task queue for the same client not allowed: {slot_key:?}, worker_instance_key: {:?}.", worker.worker_instance_key());
87+
bail!(
88+
"Registration of multiple workers on the same namespace and task queue for the same client not allowed: {slot_key:?}, worker_instance_key: {:?}.",
89+
worker.worker_instance_key()
90+
);
8291
}
8392

8493
if worker.heartbeat_enabled()
@@ -107,12 +116,15 @@ impl ClientWorkerSetImpl {
107116
&mut self,
108117
worker_instance_key: Uuid,
109118
) -> Result<Arc<dyn ClientWorker + Send + Sync>, anyhow::Error> {
110-
let worker = self.all_workers.remove(&worker_instance_key).ok_or_else(|| {
111-
anyhow::anyhow!(
112-
"Worker with worker_instance_key {} not found",
113-
worker_instance_key
114-
)
115-
})?;
119+
let worker = self
120+
.all_workers
121+
.remove(&worker_instance_key)
122+
.ok_or_else(|| {
123+
anyhow::anyhow!(
124+
"Worker with worker_instance_key {} not found",
125+
worker_instance_key
126+
)
127+
})?;
116128

117129
let slot_key = SlotKey::new(
118130
worker.namespace().to_string(),
@@ -211,7 +223,10 @@ impl ClientWorkerSet {
211223
}
212224

213225
/// Register a local worker that can provide WFT processing slots and potentially worker heartbeating.
214-
pub fn register_worker(&self, worker: Arc<dyn ClientWorker + Send + Sync>) -> Result<(), anyhow::Error> {
226+
pub fn register_worker(
227+
&self,
228+
worker: Arc<dyn ClientWorker + Send + Sync>,
229+
) -> Result<(), anyhow::Error> {
215230
self.worker_manager.write().register(worker)
216231
}
217232

@@ -343,7 +358,9 @@ mod tests {
343358
worker_keys.push(worker_instance_key);
344359
} else {
345360
// Should get error for duplicate namespace+task_queue combinations
346-
assert!(result.unwrap_err().to_string().contains("Registration of multiple workers on the same namespace and task queue"));
361+
assert!(result.unwrap_err().to_string().contains(
362+
"Registration of multiple workers on the same namespace and task queue"
363+
));
347364
}
348365
}
349366

@@ -479,15 +496,20 @@ mod tests {
479496
// second worker register should fail due to duplicate namespace+task_queue
480497
let result = manager.register_worker(Arc::new(worker2));
481498
assert!(result.is_err());
482-
assert!(result.unwrap_err().to_string().contains("Registration of multiple workers on the same namespace and task queue"));
499+
assert!(
500+
result
501+
.unwrap_err()
502+
.to_string()
503+
.contains("Registration of multiple workers on the same namespace and task queue")
504+
);
483505

484506
assert_eq!((1, 1), manager.num_providers());
485507
assert_eq!(manager.num_heartbeat_workers(), 1);
486508

487509
let impl_ref = manager.worker_manager.read();
488510
assert_eq!(impl_ref.shared_worker.len(), 1);
489511
assert!(impl_ref.shared_worker.contains_key("test_namespace"));
490-
}
512+
}
491513

492514
#[test]
493515
fn multiple_workers_same_namespace_share_heartbeat_manager() {

core/src/core_tests/workflow_tasks.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2997,7 +2997,8 @@ async fn both_normal_and_sticky_pollers_poll_concurrently() {
29972997
None,
29982998
None,
29992999
false,
3000-
).unwrap();
3000+
)
3001+
.unwrap();
30013002

30023003
for _ in 1..50 {
30033004
let activation = worker.poll_workflow_activation().await.unwrap();

core/src/test_help/integ_helpers.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,8 @@ pub fn mock_worker(mocks: MocksHolder) -> Worker {
206206
None,
207207
None,
208208
false,
209-
).unwrap()
209+
)
210+
.unwrap()
210211
}
211212

212213
pub struct FakeWfResponses {

core/src/worker/heartbeat.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,8 @@ mod tests {
227227
None,
228228
Some(Duration::from_millis(100)),
229229
false,
230-
).unwrap();
230+
)
231+
.unwrap();
231232

232233
tokio::time::sleep(Duration::from_millis(250)).await;
233234
worker.drain_activity_poller_and_shutdown().await;

core/src/worker/mod.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use crate::{
4848
worker::workflow::wft_poller,
4949
};
5050
use activities::WorkerActivityTasks;
51+
use anyhow::bail;
5152
use futures_util::{StreamExt, stream};
5253
use gethostname::gethostname;
5354
use parking_lot::{Mutex, RwLock};
@@ -61,7 +62,6 @@ use std::{
6162
},
6263
time::Duration,
6364
};
64-
use anyhow::bail;
6565
use temporal_client::{ClientWorker, HeartbeatCallback, Slot as SlotTrait};
6666
use temporal_client::{
6767
ConfiguredClient, SharedNamespaceWorkerTrait, TemporalServiceClientWithMetrics,
@@ -311,9 +311,11 @@ impl WorkerTrait for Worker {
311311
}
312312
self.shutdown_token.cancel();
313313
// First, unregister worker from the client
314-
if let Err(e) = self.client
314+
if let Err(e) = self
315+
.client
315316
.workers()
316-
.unregister_worker(self.worker_instance_key) {
317+
.unregister_worker(self.worker_instance_key)
318+
{
317319
error!(
318320
task_queue=%self.config.task_queue,
319321
namespace=%self.config.namespace,
@@ -384,7 +386,10 @@ impl Worker {
384386
/// For worker heartbeat, this will remove an existing shared worker if it is the last worker of
385387
/// the old client and create a new nexus worker if it's the first client of the namespace on
386388
/// the new client.
387-
pub fn replace_client(&self, new_client: ConfiguredClient<TemporalServiceClientWithMetrics>) -> Result<(), anyhow::Error> {
389+
pub fn replace_client(
390+
&self,
391+
new_client: ConfiguredClient<TemporalServiceClientWithMetrics>,
392+
) -> Result<(), anyhow::Error> {
388393
// Unregister worker from current client, register in new client at the end
389394
let client_worker = self
390395
.client

tests/integ_tests/polling_tests.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,9 @@ async fn switching_worker_client_changes_poll() {
198198

199199
// Swap client, poll for next task, confirm it's second wf, and respond w/ empty
200200
info!("Replacing client and polling again");
201-
worker.replace_client(client2.get_client().inner().clone());
201+
worker
202+
.replace_client(client2.get_client().inner().clone())
203+
.unwrap();
202204
let act2 = worker.poll_workflow_activation().await.unwrap();
203205
assert_eq!(wf2.run_id, act2.run_id);
204206
worker.complete_execution(&act2.run_id).await;

0 commit comments

Comments
 (0)