Skip to content

Commit e08370e

Browse files
committed
formating
1 parent 12d7060 commit e08370e

13 files changed

Lines changed: 671 additions & 498 deletions

File tree

core-c-bridge/src/runtime.rs

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,29 @@
1-
use crate::ByteArray;
2-
use crate::ByteArrayRef;
3-
use crate::MetadataRef;
4-
use crate::metric::CustomMetricMeter;
5-
use crate::metric::CustomMetricMeterRef;
1+
use crate::{
2+
ByteArray, ByteArrayRef, MetadataRef,
3+
metric::{CustomMetricMeter, CustomMetricMeterRef},
4+
};
65

76
use serde_json::json;
8-
use std::collections::HashMap;
9-
use std::fmt;
10-
use std::net::SocketAddr;
11-
use std::str::FromStr;
12-
use std::sync::atomic::AtomicBool;
13-
use std::sync::atomic::Ordering;
14-
use std::sync::{Arc, Mutex};
15-
use std::time::Duration;
16-
use std::time::UNIX_EPOCH;
17-
use temporal_sdk_core::CoreRuntime;
18-
use temporal_sdk_core::RuntimeOptions as CoreRuntimeOptions;
19-
use temporal_sdk_core::RuntimeOptionsBuilder as CoreRuntimeOptionsBuilder;
20-
use temporal_sdk_core::TokioRuntimeBuilder;
21-
use temporal_sdk_core::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter};
22-
use temporal_sdk_core_api::telemetry::HistogramBucketOverrides;
23-
use temporal_sdk_core_api::telemetry::MetricTemporality;
24-
use temporal_sdk_core_api::telemetry::metrics::CoreMeter;
25-
use temporal_sdk_core_api::telemetry::{CoreLog, CoreLogConsumer};
7+
use std::{
8+
collections::HashMap,
9+
fmt,
10+
net::SocketAddr,
11+
str::FromStr,
12+
sync::{
13+
Arc, Mutex,
14+
atomic::{AtomicBool, Ordering},
15+
},
16+
time::{Duration, UNIX_EPOCH},
17+
};
18+
use temporal_sdk_core::{
19+
CoreRuntime, RuntimeOptions as CoreRuntimeOptions,
20+
RuntimeOptionsBuilder as CoreRuntimeOptionsBuilder, TokioRuntimeBuilder,
21+
telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter},
22+
};
2623
use temporal_sdk_core_api::telemetry::{
27-
Logger, OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder,
28-
TelemetryOptions as CoreTelemetryOptions, TelemetryOptionsBuilder,
24+
CoreLog, CoreLogConsumer, HistogramBucketOverrides, Logger, MetricTemporality,
25+
OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder,
26+
TelemetryOptions as CoreTelemetryOptions, TelemetryOptionsBuilder, metrics::CoreMeter,
2927
};
3028
use tracing::Level;
3129
use url::Url;

core/src/lib.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ mod core_tests;
3030
#[macro_use]
3131
pub mod test_help;
3232

33-
use std::collections::HashMap;
3433
pub(crate) use temporal_sdk_core_api::errors;
3534

3635
pub use pollers::{
@@ -119,7 +118,7 @@ where
119118
client_ident.clone(),
120119
worker_config.versioning_strategy.clone(),
121120
));
122-
121+
123122
let worker = Worker::new(
124123
worker_config.clone(),
125124
sticky_q,
@@ -370,10 +369,6 @@ impl CoreRuntime {
370369
pub fn telemetry_mut(&mut self) -> &mut TelemetryInstance {
371370
&mut self.telemetry
372371
}
373-
374-
fn task_queue_key(&self) -> Uuid {
375-
self.process_key
376-
}
377372
}
378373

379374
impl Drop for CoreRuntime {

core/src/telemetry/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{abstractions::dbg_panic, telemetry::TelemetryInstance};
1+
use crate::abstractions::dbg_panic;
22

33
use std::{
44
fmt::{Debug, Display},

core/src/worker/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,13 @@
22
33
pub(crate) mod mocks;
44
use crate::protosext::legacy_query_failure;
5-
use parking_lot::{RwLock};
5+
use parking_lot::RwLock;
66
use std::{sync::Arc, time::Duration};
77
use temporal_client::{
88
Client, ClientWorkerSet, IsWorkerTaskLongPoll, Namespace, NamespacedClient, NoRetryOnMatching,
99
RetryClient, WorkflowService,
1010
};
1111
use temporal_sdk_core_api::worker::WorkerVersioningStrategy;
12-
use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHeartbeat;
1312
use temporal_sdk_core_protos::{
1413
TaskToken,
1514
coresdk::{workflow_commands::QueryResult, workflow_completion},
@@ -29,6 +28,7 @@ use temporal_sdk_core_protos::{
2928
query::v1::WorkflowQueryResult,
3029
sdk::v1::WorkflowTaskCompletedMetadata,
3130
taskqueue::v1::{StickyExecutionAttributes, TaskQueue, TaskQueueMetadata},
31+
worker::v1::WorkerHeartbeat,
3232
workflowservice::v1::{get_system_info_response::Capabilities, *},
3333
},
3434
};
@@ -126,7 +126,7 @@ impl WorkerClientBag {
126126

127127
/// This trait contains everything workers need to interact with Temporal, and hence provides a
128128
/// minimal mocking surface. Delegates to [WorkflowClientTrait] so see that for details.
129-
#[cfg_attr(test, mockall::automock)]
129+
#[cfg_attr(any(feature = "test-utilities", test), mockall::automock)]
130130
#[async_trait::async_trait]
131131
pub trait WorkerClient: Sync + Send {
132132
/// Poll workflow tasks

core/src/worker/client/mocks.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub fn mock_worker_client() -> MockWorkerClient {
3535
.returning(|| ("test-core".to_string(), "0.0.0".to_string()));
3636
r.expect_get_identity()
3737
.returning(|| "test-identity".to_string());
38+
r.expect_get_process_key().returning(Uuid::new_v4);
3839
r
3940
}
4041

core/src/worker/heartbeat.rs

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ use crate::worker::{TaskPollers, WorkerTelemetry};
33
use parking_lot::Mutex;
44
use prost_types::Duration as PbDuration;
55
use std::collections::HashMap;
6-
use std::fmt;
7-
use std::sync::Arc;
8-
use std::time::{Duration, SystemTime};
6+
use std::{
7+
fmt,
8+
sync::Arc,
9+
time::{Duration, SystemTime},
10+
};
911
use temporal_client::SharedNamespaceWorkerTrait;
1012
use temporal_sdk_core_api::worker::{WorkerConfigBuilder, WorkerVersioningStrategy};
1113
use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHeartbeat;
@@ -29,12 +31,16 @@ impl SharedNamespaceWorker {
2931
heartbeat_interval: Duration,
3032
telemetry: Option<WorkerTelemetry>,
3133
) -> Self {
32-
println!("SharedNamespaceWorker::new() {:?}\n\tsdk_name_and_version key{:?}", client.get_identity(), client.sdk_name_and_version());
34+
println!(
35+
"SharedNamespaceWorker::new() {:?}\n\tsdk_name_and_version key{:?}",
36+
client.get_identity(),
37+
client.sdk_name_and_version()
38+
);
3339
let config = WorkerConfigBuilder::default()
3440
.namespace(namespace.clone())
3541
.task_queue(format!(
3642
"temporal-sys/worker-commands/{namespace}/{}",
37-
client.get_process_key().to_string()
43+
client.get_process_key()
3844
))
3945
.no_remote_activities(true)
4046
.max_outstanding_nexus_tasks(5_usize)
@@ -68,7 +74,9 @@ impl SharedNamespaceWorker {
6874
loop {
6975
// TODO: Race condition here, can technically shut down before anything is ever initialized
7076
if heartbeat_map_clone.lock().is_empty() {
71-
println!("// TODO: Race condition here, can technically shut down before anything is ever initialized");
77+
println!(
78+
"// TODO: Race condition here, can technically shut down before anything is ever initialized"
79+
);
7280
worker.shutdown().await;
7381
return;
7482
}
@@ -169,17 +177,20 @@ impl fmt::Debug for SharedNamespaceWorker {
169177
#[cfg(test)]
170178
mod tests {
171179
use super::*;
172-
use crate::test_help::WorkerExt;
173-
use crate::test_help::test_worker_cfg;
174-
use crate::worker;
175-
use crate::worker::client::mocks::mock_worker_client;
176-
use std::sync::Arc;
177-
use std::sync::atomic::AtomicUsize;
178-
use std::sync::atomic::Ordering;
179-
use std::time::Duration;
180+
use crate::{
181+
test_help::{WorkerExt, test_worker_cfg},
182+
worker,
183+
worker::client::mocks::mock_worker_client,
184+
};
185+
use std::{
186+
sync::{
187+
Arc,
188+
atomic::{AtomicUsize, Ordering},
189+
},
190+
time::Duration,
191+
};
180192
use temporal_sdk_core_api::worker::PollerBehavior;
181193
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::RecordWorkerHeartbeatResponse;
182-
use uuid::Uuid;
183194

184195
#[tokio::test]
185196
async fn worker_heartbeat_basic() {
@@ -218,29 +229,13 @@ mod tests {
218229
.into();
219230

220231
let client = Arc::new(mock);
221-
let worker = worker::Worker::new(config, None, client.clone(), None, None);
222-
223-
let namespace = "test-namespace".to_string();
224-
let process_key = Uuid::new_v4();
225-
// let mut shared_namespace_worker = SharedNamespaceWorker::new(
226-
// client,
227-
// WorkerHeartbeatIdentity::new(
228-
// "test-endpoint".to_string(),
229-
// namespace.clone(),
230-
// process_key.to_string(),
231-
// "test-identity".to_string(),
232-
// ),
233-
// Duration::from_millis(100),
234-
// None,
235-
// );
236-
// TODO: translate to new way
237-
// let worker_instance_key = worker.worker_instance_key().unwrap();
238-
// shared_namespace_worker.register_callback(
239-
// worker_instance_key,
240-
// worker
241-
// .get_heartbeat_callback()
242-
// .expect("heartbeat callback should be set"),
243-
// );
232+
let worker = worker::Worker::new(
233+
config,
234+
None,
235+
client.clone(),
236+
None,
237+
Some(Duration::from_millis(100)),
238+
);
244239

245240
tokio::time::sleep(Duration::from_millis(250)).await;
246241
worker.drain_activity_poller_and_shutdown().await;

core/src/worker/mod.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ use temporal_sdk_core_api::{
7070
errors::{CompleteNexusError, WorkerValidationError},
7171
worker::PollerBehavior,
7272
};
73-
use temporal_sdk_core_protos::temporal::api::cloud::account::v1::Metrics;
7473
use temporal_sdk_core_protos::temporal::api::worker::v1::WorkerHostInfo;
7574
use temporal_sdk_core_protos::{
7675
TaskToken,
@@ -87,7 +86,7 @@ use temporal_sdk_core_protos::{
8786
taskqueue::v1::{StickyExecutionAttributes, TaskQueue},
8887
},
8988
};
90-
use tokio::sync::{OnceCell, mpsc::unbounded_channel, watch};
89+
use tokio::sync::{mpsc::unbounded_channel, watch};
9190
use tokio_stream::wrappers::UnboundedReceiverStream;
9291
use tokio_util::sync::CancellationToken;
9392
use tracing::Subscriber;
@@ -170,9 +169,6 @@ impl WorkerTelemetry {
170169
struct WorkerHeartbeat {
171170
/// Instance key used to identify this worker in worker heartbeating
172171
worker_instance_key: Uuid,
173-
/// Used to remove this worker from the parent map used to track this worker for
174-
/// worker heartbeat
175-
shutdown_callback: OnceCell<Arc<dyn Fn() + Send + Sync>>,
176172
/// Heartbeat interval, defaults to 60s
177173
heartbeat_interval: Duration,
178174
/// Telemetry instance, needed to initialize [SharedNamespaceWorker] when replacing client
@@ -391,15 +387,11 @@ impl Worker {
391387
telem_instance: Option<&TelemetryInstance>,
392388
worker_heartbeat_interval: Option<Duration>,
393389
) -> Self {
394-
let worker_telemetry = if let Some(telem) = telem_instance {
395-
Some(WorkerTelemetry {
396-
metric_meter: telem.get_metric_meter(),
397-
temporal_metric_meter: telem.get_temporal_metric_meter(),
398-
trace_subscriber: telem.trace_subscriber(),
399-
})
400-
} else {
401-
None
402-
};
390+
let worker_telemetry = telem_instance.map(|telem| WorkerTelemetry {
391+
metric_meter: telem.get_metric_meter(),
392+
temporal_metric_meter: telem.get_temporal_metric_meter(),
393+
trace_subscriber: telem.trace_subscriber(),
394+
});
403395

404396
Worker::new_with_pollers_inner(
405397
config,
@@ -664,8 +656,7 @@ impl Worker {
664656

665657
Some(WorkerHeartbeat {
666658
worker_instance_key,
667-
shutdown_callback: OnceCell::new(),
668-
heartbeat_interval: heartbeat_interval,
659+
heartbeat_interval,
669660
telemetry: worker_telemetry.clone(),
670661
})
671662
} else {

core/src/worker/workflow/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
internal_flags::InternalFlags,
2424
pollers::TrackedPermittedTqResp,
2525
protosext::{ValidPollWFTQResponse, protocol_messages::IncomingProtocolMessage},
26-
telemetry::{TelemetryInstance, VecDisplayer, set_trace_subscriber_for_current_thread},
26+
telemetry::{VecDisplayer, set_trace_subscriber_for_current_thread},
2727
worker::{
2828
LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution,
2929
PostActivateHookData,

0 commit comments

Comments
 (0)