Skip to content

Commit 9d39b9f

Browse files
committed
formating
1 parent 395027b commit 9d39b9f

13 files changed

Lines changed: 80 additions & 65 deletions

File tree

core-c-bridge/src/runtime.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ use std::{
1616
time::{Duration, UNIX_EPOCH},
1717
};
1818
use temporal_sdk_core::{
19-
CoreRuntime, RuntimeOptions as CoreRuntimeOptions, RuntimeOptionsBuilder as CoreRuntimeOptionsBuilder, TokioRuntimeBuilder,
19+
CoreRuntime, RuntimeOptions as CoreRuntimeOptions,
20+
RuntimeOptionsBuilder as CoreRuntimeOptionsBuilder, TokioRuntimeBuilder,
2021
telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter},
2122
};
2223
use temporal_sdk_core_api::telemetry::{

core/src/lib.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ mod core_tests;
3030
#[macro_use]
3131
pub mod test_help;
3232

33-
use std::collections::HashMap;
3433
pub(crate) use temporal_sdk_core_api::errors;
3534

3635
pub use pollers::{
@@ -119,7 +118,7 @@ where
119118
client_ident.clone(),
120119
worker_config.versioning_strategy.clone(),
121120
));
122-
121+
123122
let worker = Worker::new(
124123
worker_config.clone(),
125124
sticky_q,
@@ -370,10 +369,6 @@ impl CoreRuntime {
370369
pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance {
371370
&mut self.telemetry
372371
}
373-
374-
fn task_queue_key(&self) -> Uuid {
375-
self.process_key
376-
}
377372
}
378373

379374
impl Drop for CoreRuntime {

core/src/telemetry/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{abstractions::dbg_panic, telemetry::TelemetryInstance};
1+
use crate::abstractions::dbg_panic;
22

33
use std::{
44
fmt::{Debug, Display},

core/src/worker/client.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
//! Worker-specific client needs
22
33
pub(crate) mod mocks;
4-
use crate::{
5-
abstractions::dbg_panic, protosext::legacy_query_failure, worker::heartbeat::HeartbeatFn,
6-
};
4+
use crate::protosext::legacy_query_failure;
75
use parking_lot::RwLock;
8-
use std::{
9-
sync::{Arc, OnceLock},
10-
time::Duration,
11-
};
6+
use std::{sync::Arc, time::Duration};
127
use temporal_client::{
138
Client, ClientWorkerSet, IsWorkerTaskLongPoll, Namespace, NamespacedClient, NoRetryOnMatching,
149
RetryClient, WorkflowService,

core/src/worker/client/mocks.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ pub fn mock_worker_client() -> MockWorkerClient {
3535
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
3636
r.expect_get_identity()
3737
.returning(|| "test-identity".to_string());
38-
r.expect_get_process_key()
39-
.returning(|| Uuid::new_v4());
38+
r.expect_get_process_key().returning(Uuid::new_v4);
4039
r
4140
}
4241

core/src/worker/heartbeat.rs

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
1-
use crate::{WorkerClient, abstractions::dbg_panic};
2-
use gethostname::gethostname;
1+
use crate::WorkerClient;
2+
use crate::worker::{TaskPollers, WorkerTelemetry};
33
use parking_lot::Mutex;
44
use prost_types::Duration as PbDuration;
5-
use std::{fmt, sync::{Arc, OnceLock}, time::{Duration, SystemTime}};
65
use std::collections::HashMap;
7-
use temporal_sdk_core_api::worker::{WorkerConfig, WorkerConfigBuilder, WorkerVersioningStrategy};
8-
use temporal_sdk_core_protos::temporal::api::worker::v1::{WorkerHeartbeat, WorkerHostInfo};
9-
use tokio::{sync::Notify, task::JoinHandle, time::MissedTickBehavior};
10-
use uuid::Uuid;
6+
use std::{
7+
fmt,
8+
sync::Arc,
9+
time::{Duration, SystemTime},
10+
};
1111
use temporal_client::SharedNamespaceWorkerTrait;
12-
use crate::worker::{TaskPollers, WorkerTelemetry};
12+
use temporal_sdk_core_api::worker::{WorkerConfigBuilder, WorkerVersioningStrategy};
13+
use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHeartbeat;
14+
use tokio::sync::Notify;
1315

1416
/// Callback used to collect heartbeat data from each worker at the time of heartbeat
1517
pub(crate) type HeartbeatFn = Box<dyn Fn() -> WorkerHeartbeat + Send + Sync>;
@@ -29,12 +31,16 @@ impl SharedNamespaceWorker {
2931
heartbeat_interval: Duration,
3032
telemetry: Option<WorkerTelemetry>,
3133
) -> Self {
32-
println!("SharedNamespaceWorker::new() {:?}\n\tsdk_name_and_version key{:?}", client.get_identity(), client.sdk_name_and_version());
34+
println!(
35+
"SharedNamespaceWorker::new() {:?}\n\tsdk_name_and_version key{:?}",
36+
client.get_identity(),
37+
client.sdk_name_and_version()
38+
);
3339
let config = WorkerConfigBuilder::default()
3440
.namespace(namespace.clone())
3541
.task_queue(format!(
3642
"temporal-sys/worker-commands/{namespace}/{}",
37-
client.get_process_key().to_string()
43+
client.get_process_key()
3844
))
3945
.no_remote_activities(true)
4046
.max_outstanding_nexus_tasks(5_usize)
@@ -68,7 +74,9 @@ impl SharedNamespaceWorker {
6874
loop {
6975
// TODO: Race condition here, can technically shut down before anything is ever initialized
7076
if heartbeat_map_clone.lock().is_empty() {
71-
println!("// TODO: Race condition here, can technically shut down before anything is ever initialized");
77+
println!(
78+
"// TODO: Race condition here, can technically shut down before anything is ever initialized"
79+
);
7280
worker.shutdown().await;
7381
return;
7482
}
@@ -174,11 +182,15 @@ mod tests {
174182
worker,
175183
worker::client::mocks::mock_worker_client,
176184
};
177-
use std::{sync::{Arc, atomic::{AtomicUsize, Ordering}}, time::Duration};
178-
use mockall::mock;
185+
use std::{
186+
sync::{
187+
Arc,
188+
atomic::{AtomicUsize, Ordering},
189+
},
190+
time::Duration,
191+
};
179192
use temporal_sdk_core_api::worker::PollerBehavior;
180193
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::RecordWorkerHeartbeatResponse;
181-
use uuid::Uuid;
182194

183195
#[tokio::test]
184196
async fn worker_heartbeat_basic() {
@@ -217,7 +229,13 @@ mod tests {
217229
.into();
218230

219231
let client = Arc::new(mock);
220-
let worker = worker::Worker::new(config, None, client.clone(), None, Some(Duration::from_millis(100)));
232+
let worker = worker::Worker::new(
233+
config,
234+
None,
235+
client.clone(),
236+
None,
237+
Some(Duration::from_millis(100)),
238+
);
221239

222240
tokio::time::sleep(Duration::from_millis(250)).await;
223241
worker.drain_activity_poller_and_shutdown().await;

core/src/worker/mod.rs

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use std::{
5353
convert::TryInto,
5454
future,
5555
sync::{
56-
Arc, OnceLock,
56+
Arc,
5757
atomic::{AtomicBool, Ordering},
5858
},
5959
time::Duration,
@@ -67,7 +67,6 @@ use temporal_sdk_core_api::{
6767
errors::{CompleteNexusError, WorkerValidationError},
6868
worker::PollerBehavior,
6969
};
70-
use temporal_sdk_core_protos::temporal::api::cloud::account::v1::Metrics;
7170
use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHostInfo;
7271
use temporal_sdk_core_protos::{
7372
TaskToken,
@@ -84,7 +83,7 @@ use temporal_sdk_core_protos::{
8483
taskqueue::v1::{StickyExecutionAttributes, TaskQueue},
8584
},
8685
};
87-
use tokio::sync::{OnceCell, mpsc::unbounded_channel, watch};
86+
use tokio::sync::{mpsc::unbounded_channel, watch};
8887
use tokio_stream::wrappers::UnboundedReceiverStream;
8988
use tokio_util::sync::CancellationToken;
9089
use tracing::Subscriber;
@@ -167,9 +166,6 @@ impl WorkerTelemetry {
167166
struct WorkerHeartbeat {
168167
/// Instance key used to identify this worker in worker heartbeating
169168
worker_instance_key: Uuid,
170-
/// Used to remove this worker from the parent map used to track this worker for
171-
/// worker heartbeat
172-
shutdown_callback: OnceCell<Arc<dyn Fn() + Send + Sync>>,
173169
/// Heartbeat interval, defaults to 60s
174170
heartbeat_interval: Duration,
175171
/// Telemetry instance, needed to initialize [SharedNamespaceWorker] when replacing client
@@ -388,15 +384,11 @@ impl Worker {
388384
telem_instance: Option<&TelemetryInstance>,
389385
worker_heartbeat_interval: Option<Duration>,
390386
) -> Self {
391-
let worker_telemetry = if let Some(telem) = telem_instance {
392-
Some(WorkerTelemetry {
393-
metric_meter: telem.get_metric_meter(),
394-
temporal_metric_meter: telem.get_temporal_metric_meter(),
395-
trace_subscriber: telem.trace_subscriber(),
396-
})
397-
} else {
398-
None
399-
};
387+
let worker_telemetry = telem_instance.map(|telem| WorkerTelemetry {
388+
metric_meter: telem.get_metric_meter(),
389+
temporal_metric_meter: telem.get_temporal_metric_meter(),
390+
trace_subscriber: telem.trace_subscriber(),
391+
});
400392

401393
Worker::new_with_pollers_inner(
402394
config,
@@ -661,8 +653,7 @@ impl Worker {
661653

662654
Some(WorkerHeartbeat {
663655
worker_instance_key,
664-
shutdown_callback: OnceCell::new(),
665-
heartbeat_interval: heartbeat_interval,
656+
heartbeat_interval,
666657
telemetry: worker_telemetry.clone(),
667658
})
668659
} else {

core/src/worker/workflow/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
internal_flags::InternalFlags,
2424
pollers::TrackedPermittedTqResp,
2525
protosext::{ValidPollWFTQResponse, protocol_messages::IncomingProtocolMessage},
26-
telemetry::{TelemetryInstance, VecDisplayer, set_trace_subscriber_for_current_thread},
26+
telemetry::{VecDisplayer, set_trace_subscriber_for_current_thread},
2727
worker::{
2828
LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution,
2929
PostActivateHookData,

tests/common/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@ use temporal_sdk::{
3838
WorkerInterceptor,
3939
},
4040
};
41-
use temporal_sdk_core::{ClientOptions, ClientOptionsBuilder, CoreRuntime, WorkerConfigBuilder, init_replay_worker, init_worker, replay::{HistoryForReplay, ReplayWorkerInput}, telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter}, RuntimeOptionsBuilder, RuntimeOptions};
41+
use temporal_sdk_core::{
42+
ClientOptions, ClientOptionsBuilder, CoreRuntime, RuntimeOptionsBuilder, WorkerConfigBuilder,
43+
init_replay_worker, init_worker,
44+
replay::{HistoryForReplay, ReplayWorkerInput},
45+
telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter},
46+
};
4247
use temporal_sdk_core_api::{
4348
Worker as CoreWorker,
4449
telemetry::{
@@ -809,10 +814,6 @@ pub(crate) fn get_integ_telem_options() -> TelemetryOptions {
809814
.unwrap()
810815
}
811816

812-
pub fn get_integ_runtime_options(heartbeat_interval: Option<Duration>) -> RuntimeOptions {
813-
RuntimeOptions::new(get_integ_telem_options(), heartbeat_interval)
814-
}
815-
816817
#[async_trait::async_trait(?Send)]
817818
pub(crate) trait WorkflowHandleExt {
818819
async fn fetch_history_and_replay(

tests/integ_tests/client_tests.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,10 @@ async fn http_proxy() {
446446
opts.target_url = format!("http://127.0.0.1:{}", server.addr.port())
447447
.parse()
448448
.unwrap();
449-
let client = opts.connect("my-namespace", None, Uuid::new_v4()).await.unwrap();
449+
let client = opts
450+
.connect("my-namespace", None, Uuid::new_v4())
451+
.await
452+
.unwrap();
450453
let _ = client.list_namespaces().await;
451454
assert!(call_count.load(Ordering::SeqCst) == 1);
452455
assert!(tcp_proxy.hit_count() == 0);
@@ -456,7 +459,10 @@ async fn http_proxy() {
456459
target_addr: tcp_proxy_addr.to_string(),
457460
basic_auth: None,
458461
});
459-
let proxied_client = opts.connect("my-namespace", None, Uuid::new_v4()).await.unwrap();
462+
let proxied_client = opts
463+
.connect("my-namespace", None, Uuid::new_v4())
464+
.await
465+
.unwrap();
460466
let _ = proxied_client.list_namespaces().await;
461467
assert!(call_count.load(Ordering::SeqCst) == 2);
462468
assert!(tcp_proxy.hit_count() == 1);
@@ -478,7 +484,10 @@ async fn http_proxy() {
478484
target_addr: format!("unix:{}", sock_path.to_str().unwrap()),
479485
basic_auth: None,
480486
});
481-
let proxied_client = opts.connect("my-namespace", None, Uuid::new_v4()).await.unwrap();
487+
let proxied_client = opts
488+
.connect("my-namespace", None, Uuid::new_v4())
489+
.await
490+
.unwrap();
482491
let _ = proxied_client.list_namespaces().await;
483492
assert!(call_count.load(Ordering::SeqCst) == 3);
484493
assert!(unix_proxy.hit_count() == 1);

0 commit comments

Comments
 (0)