Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions client/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,15 @@ proxier! {
r.extensions_mut().insert(labels);
}
);
(
record_worker_heartbeat,
RecordWorkerHeartbeatRequest,
RecordWorkerHeartbeatResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
}

proxier! {
Expand Down
3 changes: 2 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ itertools = "0.14"
lru = "0.13"
mockall = "0.13"
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
opentelemetry_sdk = { version = "0.29", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views"], optional = true }
opentelemetry_sdk = { version = "0.29", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views", "testing"], optional = true }
opentelemetry-otlp = { version = "0.29", features = ["tokio", "metrics", "tls", "http-proto", "grpc-tonic"], optional = true }
opentelemetry-prometheus = { version = "0.29", optional = true }
parking_lot = { version = "0.12", features = ["send_guard"] }
Expand Down Expand Up @@ -71,6 +71,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = ["p
url = "2.2"
uuid = { version = "1.1", features = ["v4"] }
zip = { version = "2.0", optional = true }
gethostname = "1.0.2"

# 1st party local deps
[dependencies.temporal-sdk-core-api]
Expand Down
9 changes: 9 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use crate::{
use anyhow::bail;
use futures_util::Stream;
use std::sync::Arc;
use parking_lot::Mutex;
use temporal_client::{ConfiguredClient, NamespacedClient, TemporalServiceClientWithMetrics};
use temporal_sdk_core_api::{
Worker as WorkerTrait,
Expand Down Expand Up @@ -89,6 +90,7 @@ pub fn init_worker<CT>(
where
CT: Into<sealed::AnyClient>,
{
println!("init_worker");
let client = init_worker_client(&worker_config, *client.into().into_inner());
if client.namespace() != worker_config.namespace {
bail!("Passed in client is not bound to the same namespace as the worker");
Expand All @@ -98,18 +100,25 @@ where
}
let client_ident = client.get_identity().to_owned();
let sticky_q = sticky_q_name_for_worker(&client_ident, &worker_config);
let in_memory_meter = runtime.telemetry.in_memory_meter();
let heartbeat_info = Arc::new(Mutex::new(worker::WorkerHeartbeatInfo::new(in_memory_meter, worker_config.clone())));
let client_bag = Arc::new(WorkerClientBag::new(
client,
worker_config.namespace.clone(),
client_ident,
worker_config.versioning_strategy.clone(),
heartbeat_info.clone(),
));

// TODO: adding client after feels clunky
heartbeat_info.lock().add_client(client_bag.clone());

Ok(Worker::new(
worker_config,
sticky_q,
client_bag,
Some(&runtime.telemetry),
Some(heartbeat_info),
))
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/replay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ where
hist_allow_tx.send("Failed".to_string()).unwrap();
async move { Ok(RespondWorkflowTaskFailedResponse::default()) }.boxed()
});
let mut worker = Worker::new(self.config, None, Arc::new(client), None);
let mut worker = Worker::new(self.config, None, Arc::new(client), None, None);
worker.set_post_activate_hook(post_activate);
shutdown_tok(worker.shutdown_token());
Ok(worker)
Expand Down
128 changes: 128 additions & 0 deletions core/src/telemetry/in_memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use crate::api::telemetry::metrics::MetricAttributes;
use fmt::Debug;
use opentelemetry_sdk::metrics::{InMemoryMetricExporter, SdkMeterProvider};
use std::{fmt, sync::Arc};
use anyhow::anyhow;
use opentelemetry::KeyValue;
use opentelemetry::metrics::Meter;
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use temporal_sdk_core_api::telemetry::metrics::{
CoreMeter, Counter, Gauge, GaugeF64, Histogram, HistogramDuration, HistogramF64,
MetricParameters, NewAttributes,
};
use crate::abstractions::dbg_panic;

#[derive(Debug, Clone)]
pub struct InMemoryMeter {
pub(crate) meter: Meter,
pub(crate) in_memory_exporter: InMemoryMetricExporter,
pub(crate) mp: SdkMeterProvider,

}

impl InMemoryMeter {
/// TODO:
pub fn new(in_memory_exporter: InMemoryMetricExporter, meter: Meter, mp: SdkMeterProvider) -> InMemoryMeter {
Self {
meter,
in_memory_exporter,
mp,
}
}

/// TODO
pub fn meter_provider(&self) -> SdkMeterProvider {
self.mp.clone()
}

/// TODO
pub fn in_mem_exporter(&self) -> InMemoryMetricExporter {
self.in_memory_exporter.clone()
}

/// TODO
pub fn get_metrics(&self) -> Result<Vec<ResourceMetrics>, anyhow::Error> {
self.mp.force_flush().map_err(|e| anyhow!("failed to flush MeterProvider: {}", e))?;
self.in_memory_exporter.get_finished_metrics().map_err(Into::into)
}
}

impl CoreMeter for InMemoryMeter {
fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
MetricAttributes::OTel {
kvs: Arc::new(attribs.attributes.into_iter().map(KeyValue::from).collect()),
}
}

fn extend_attributes(
&self,
existing: MetricAttributes,
attribs: NewAttributes,
) -> MetricAttributes {
if let MetricAttributes::OTel { mut kvs } = existing {
Arc::make_mut(&mut kvs).extend(attribs.attributes.into_iter().map(Into::into));
MetricAttributes::OTel { kvs }
} else {
dbg_panic!("Must use OTel attributes with an OTel metric implementation");
existing
}
}

fn counter(&self, params: MetricParameters) -> Arc<dyn Counter> {
Arc::new(
self.meter
.u64_counter(params.name)
.with_unit(params.unit)
.with_description(params.description)
.build(),
)
}

fn histogram(&self, params: MetricParameters) -> Arc<dyn Histogram> {
Arc::new(
self.meter
.u64_histogram(params.name)
.with_unit(params.unit)
.with_description(params.description)
.build(),
)
}

fn histogram_f64(&self, params: MetricParameters) -> Arc<dyn HistogramF64> {
Arc::new(
self.meter
.f64_histogram(params.name)
.with_unit(params.unit)
.with_description(params.description)
.build(),
)
}

// TODO: fix
fn histogram_duration(&self, mut params: MetricParameters) -> Arc<dyn HistogramDuration> {
params.unit = "s".into();
Arc::new(
crate::telemetry::otel::DurationHistogram::Seconds(self.histogram_f64(params))
)
}

fn gauge(&self, params: MetricParameters) -> Arc<dyn Gauge> {
Arc::new(
self.meter
.u64_gauge(params.name)
.with_unit(params.unit)
.with_description(params.description)
.build(),
)
}

fn gauge_f64(&self, params: MetricParameters) -> Arc<dyn GaugeF64> {
Arc::new(
self.meter
.f64_gauge(params.name)
.with_unit(params.unit)
.with_description(params.description)
.build(),
)
}
}
70 changes: 51 additions & 19 deletions core/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use temporal_sdk_core_api::telemetry::metrics::{
};
use temporal_sdk_core_protos::temporal::api::enums::v1::WorkflowTaskFailedCause;
use temporal_sdk_core_protos::temporal::api::failure::v1::Failure;
use crate::telemetry::InMemoryMeter;

/// Used to track context associated with metrics, and record/update them
///
Expand Down Expand Up @@ -58,18 +59,32 @@ struct Instruments {
num_pollers: Arc<dyn Gauge>,
task_slots_available: Arc<dyn Gauge>,
task_slots_used: Arc<dyn Gauge>,
sticky_cache_hit: Arc<dyn Counter>,
sticky_cache_miss: Arc<dyn Counter>,
sticky_cache_hit: MetricAndHeartbeatCounter,
sticky_cache_miss: MetricAndHeartbeatCounter,
sticky_cache_size: Arc<dyn Gauge>,
sticky_cache_forced_evictions: Arc<dyn Counter>,
}

struct MetricAndHeartbeatCounter {
metric: Arc<dyn Counter>,
for_heartbeat: HeartbeatCounter,
}

struct HeartbeatCounter (Arc<dyn Counter>);

impl Counter for MetricAndHeartbeatCounter {
fn add(&self, value: u64, attributes: &MetricAttributes) {
self.metric.add(value, attributes);
self.for_heartbeat.0.add(value, attributes);
}
}

impl MetricsContext {
pub(crate) fn no_op() -> Self {
let meter = Arc::new(NoOpCoreMeter);
Self {
kvs: meter.new_attributes(Default::default()),
instruments: Arc::new(Instruments::new(meter.as_ref())),
instruments: Arc::new(Instruments::new(meter.as_ref(), None)),
meter,
}
}
Expand All @@ -84,7 +99,7 @@ impl MetricsContext {
let kvs = meter.inner.new_attributes(meter.default_attribs);
Self {
kvs,
instruments: Arc::new(Instruments::new(meter.inner.as_ref())),
instruments: Arc::new(Instruments::new(meter.inner.as_ref(), telemetry.in_memory_meter.clone())),
meter: meter.inner,
}
} else {
Expand Down Expand Up @@ -310,7 +325,34 @@ impl MetricsContext {
}

impl Instruments {
fn new(meter: &dyn CoreMeter) -> Self {
fn new(meter: &dyn CoreMeter, in_mem_thing: Option<Arc<InMemoryMeter>>) -> Self {
// TODO: handle when in_mem_thing is None
let in_mem_thing = in_mem_thing.unwrap();
let params = MetricParameters {
name: "sticky_cache_hit".into(),
description: "Count of times the workflow cache was used for a new workflow task"
.into(),
unit: "".into(),
};
let sticky_cache_hit = MetricAndHeartbeatCounter {
metric: meter.counter(params.clone()),
for_heartbeat: HeartbeatCounter(in_mem_thing.counter(params)),
}
;

let params = MetricParameters {
name: "sticky_cache_miss".into(),
description:
"Count of times the workflow cache was missing a workflow for a sticky task"
.into(),
unit: "".into(),
};
let sticky_cache_miss = MetricAndHeartbeatCounter {
metric: meter.counter(params.clone()),
for_heartbeat: HeartbeatCounter(in_mem_thing.counter(params)),
};


Self {
wf_completed_counter: meter.counter(MetricParameters {
name: "workflow_completed".into(),
Expand Down Expand Up @@ -471,19 +513,9 @@ impl Instruments {
description: "Current number of used slots per task type".into(),
unit: "".into(),
}),
sticky_cache_hit: meter.counter(MetricParameters {
name: "sticky_cache_hit".into(),
description: "Count of times the workflow cache was used for a new workflow task"
.into(),
unit: "".into(),
}),
sticky_cache_miss: meter.counter(MetricParameters {
name: "sticky_cache_miss".into(),
description:
"Count of times the workflow cache was missing a workflow for a sticky task"
.into(),
unit: "".into(),
}),
sticky_cache_hit,
sticky_cache_miss,

sticky_cache_size: meter.gauge(MetricParameters {
name: STICKY_CACHE_SIZE_NAME.into(),
description: "Current number of cached workflows".into(),
Expand Down Expand Up @@ -591,7 +623,7 @@ pub const ACTIVITY_EXEC_LATENCY_HISTOGRAM_NAME: &str = "activity_execution_laten
pub(super) const NUM_POLLERS_NAME: &str = "num_pollers";
pub(super) const TASK_SLOTS_AVAILABLE_NAME: &str = "worker_task_slots_available";
pub(super) const TASK_SLOTS_USED_NAME: &str = "worker_task_slots_used";
pub(super) const STICKY_CACHE_SIZE_NAME: &str = "sticky_cache_size";
pub const STICKY_CACHE_SIZE_NAME: &str = "sticky_cache_size";

/// Track a failure metric if the failure is not a benign application failure.
pub(crate) fn should_record_failure_metric(failure: &Option<Failure>) -> bool {
Expand Down
Loading