Skip to content

Commit 3a5ed35

Browse files
authored
Add meters to custom slot supplier contexts (#913)
1 parent 0b8ef8e commit 3a5ed35

8 files changed

Lines changed: 175 additions & 29 deletions

File tree

core-api/src/telemetry/metrics.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{
22
any::Any,
33
borrow::Cow,
44
fmt::Debug,
5+
ops::Deref,
56
sync::{Arc, OnceLock},
67
time::Duration,
78
};
@@ -63,6 +64,13 @@ pub struct TemporalMeter {
6364
pub default_attribs: NewAttributes,
6465
}
6566

67+
impl Deref for TemporalMeter {
68+
type Target = dyn CoreMeter;
69+
fn deref(&self) -> &Self::Target {
70+
self.inner.as_ref()
71+
}
72+
}
73+
6674
impl CoreMeter for Arc<dyn CoreMeter> {
6775
fn new_attributes(&self, attribs: NewAttributes) -> MetricAttributes {
6876
self.as_ref().new_attributes(attribs)

core-api/src/worker.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,6 @@ pub trait WorkerTuner {
282282
fn nexus_task_slot_supplier(
283283
&self,
284284
) -> Arc<dyn SlotSupplier<SlotKind = NexusSlotKind> + Send + Sync>;
285-
286-
/// Core will call this at worker initialization time, allowing the implementation to hook up to
287-
/// metrics if any are configured. If not, it will not be called.
288-
fn attach_metrics(&self, metrics: TemporalMeter);
289285
}
290286

291287
/// Implementing this trait allows users to customize how many tasks of certain kinds the worker
@@ -339,6 +335,11 @@ pub trait SlotReservationContext: Send + Sync {
339335

340336
/// Returns true iff this is a sticky poll for a workflow task
341337
fn is_sticky(&self) -> bool;
338+
339+
/// Returns the metrics meter if metrics are enabled
340+
fn get_metrics_meter(&self) -> Option<TemporalMeter> {
341+
None
342+
}
342343
}
343344

344345
pub trait SlotMarkUsedContext: Send + Sync {
@@ -347,6 +348,11 @@ pub trait SlotMarkUsedContext: Send + Sync {
347348
fn permit(&self) -> &SlotSupplierPermit;
348349
/// Returns the info of slot that was marked as used
349350
fn info(&self) -> &<Self::SlotKind as SlotKind>::Info;
351+
352+
/// Returns the metrics meter if metrics are enabled
353+
fn get_metrics_meter(&self) -> Option<TemporalMeter> {
354+
None
355+
}
350356
}
351357

352358
pub trait SlotReleaseContext: Send + Sync {
@@ -355,6 +361,11 @@ pub trait SlotReleaseContext: Send + Sync {
355361
fn permit(&self) -> &SlotSupplierPermit;
356362
/// Returns the info of slot that was released, if it was used
357363
fn info(&self) -> Option<&<Self::SlotKind as SlotKind>::Info>;
364+
365+
/// Returns the metrics meter if metrics are enabled
366+
fn get_metrics_meter(&self) -> Option<TemporalMeter> {
367+
None
368+
}
358369
}
359370

360371
#[derive(Default, Debug)]

core/src/abstractions.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ use std::{
1010
atomic::{AtomicBool, AtomicUsize, Ordering},
1111
},
1212
};
13-
use temporal_sdk_core_api::worker::{
14-
SlotKind, SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier,
15-
SlotSupplierPermit, WorkerDeploymentVersion, WorkflowSlotKind,
13+
use temporal_sdk_core_api::{
14+
telemetry::metrics::TemporalMeter,
15+
worker::{
16+
SlotKind, SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier,
17+
SlotSupplierPermit, WorkerDeploymentVersion, WorkflowSlotKind,
18+
},
1619
};
1720
use tokio::sync::watch;
1821
use tokio_util::sync::CancellationToken;
@@ -36,6 +39,7 @@ pub(crate) struct MeteredPermitDealer<SK: SlotKind> {
3639
/// there will need to be some associated refactoring.
3740
max_permits: Option<usize>,
3841
metrics_ctx: MetricsContext,
42+
meter: Option<TemporalMeter>,
3943
/// Only applies to permit dealers for workflow tasks. True if this permit dealer is associated
4044
/// with a sticky queue poller.
4145
is_sticky_poller: bool,
@@ -59,12 +63,14 @@ where
5963
metrics_ctx: MetricsContext,
6064
max_permits: Option<usize>,
6165
context_data: Arc<PermitDealerContextData>,
66+
meter: Option<TemporalMeter>,
6267
) -> Self {
6368
Self {
6469
supplier,
6570
unused_claimants: Arc::new(AtomicUsize::new(0)),
6671
extant_permits: watch::channel(0),
6772
metrics_ctx,
73+
meter,
6874
max_permits,
6975
is_sticky_poller: false,
7076
context_data,
@@ -141,6 +147,7 @@ where
141147
release_ctx: ReleaseCtx {
142148
permit: res,
143149
stored_info: None,
150+
meter: self.meter.clone(),
144151
},
145152
use_fn: Box::new(move |info| {
146153
supp_c.mark_slot_used(info);
@@ -182,11 +189,16 @@ impl<SK: SlotKind> SlotReservationContext for MeteredPermitDealer<SK> {
182189
fn is_sticky(&self) -> bool {
183190
self.is_sticky_poller
184191
}
192+
193+
fn get_metrics_meter(&self) -> Option<TemporalMeter> {
194+
self.meter.clone()
195+
}
185196
}
186197

187198
struct UseCtx<'a, SK: SlotKind> {
188199
stored_info: &'a SK::Info,
189200
permit: &'a SlotSupplierPermit,
201+
meter: Option<TemporalMeter>,
190202
}
191203

192204
impl<SK: SlotKind> SlotMarkUsedContext for UseCtx<'_, SK> {
@@ -199,11 +211,16 @@ impl<SK: SlotKind> SlotMarkUsedContext for UseCtx<'_, SK> {
199211
fn info(&self) -> &<Self::SlotKind as SlotKind>::Info {
200212
self.stored_info
201213
}
214+
215+
fn get_metrics_meter(&self) -> Option<TemporalMeter> {
216+
self.meter.clone()
217+
}
202218
}
203219

204220
struct ReleaseCtx<SK: SlotKind> {
205221
permit: SlotSupplierPermit,
206222
stored_info: Option<SK::Info>,
223+
meter: Option<TemporalMeter>,
207224
}
208225

209226
impl<SK: SlotKind> SlotReleaseContext for ReleaseCtx<SK> {
@@ -216,6 +233,10 @@ impl<SK: SlotKind> SlotReleaseContext for ReleaseCtx<SK> {
216233
fn info(&self) -> Option<&<Self::SlotKind as SlotKind>::Info> {
217234
self.stored_info.as_ref()
218235
}
236+
237+
fn get_metrics_meter(&self) -> Option<TemporalMeter> {
238+
self.meter.clone()
239+
}
219240
}
220241

221242
/// A version of [MeteredPermitDealer] that can be closed and supports waiting for close to complete.
@@ -353,6 +374,7 @@ impl<SK: SlotKind> OwnedMeteredSemPermit<SK> {
353374
let ctx = UseCtx {
354375
stored_info: &info,
355376
permit: &self.release_ctx.permit,
377+
meter: self.release_ctx.meter.clone(),
356378
};
357379
(self.use_fn)(&ctx);
358380
self.release_ctx.stored_info = Some(info);
@@ -386,6 +408,7 @@ pub(crate) mod tests {
386408
MetricsContext::no_op(),
387409
None,
388410
Arc::new(Default::default()),
411+
None,
389412
)
390413
}
391414

core/src/worker/activities/local_activities.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ impl LocalActivityManager {
255255
MetricsContext::no_op(),
256256
None,
257257
Arc::new(Default::default()),
258+
None,
258259
),
259260
hb_tx,
260261
MetricsContext::no_op(),

core/src/worker/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,6 @@ impl Worker {
318318
.unwrap_or_else(|| Arc::new(TunerBuilder::from_config(&config).build()));
319319

320320
metrics.worker_registered();
321-
if let Some(meter) = meter {
322-
tuner.attach_metrics(meter.clone());
323-
}
324321
let shutdown_token = CancellationToken::new();
325322
let slot_context_data = Arc::new(PermitDealerContextData {
326323
task_queue: config.task_queue.clone(),
@@ -338,13 +335,15 @@ impl Worker {
338335
None
339336
},
340337
slot_context_data.clone(),
338+
meter.clone(),
341339
);
342340
let wft_permits = wft_slots.get_extant_count_rcv();
343341
let act_slots = MeteredPermitDealer::new(
344342
tuner.activity_task_slot_supplier(),
345343
metrics.with_new_attrs([activity_worker_type()]),
346344
None,
347345
slot_context_data.clone(),
346+
meter.clone(),
348347
);
349348
let act_permits = act_slots.get_extant_count_rcv();
350349
let (external_wft_tx, external_wft_rx) = unbounded_channel();
@@ -353,6 +352,7 @@ impl Worker {
353352
metrics.with_new_attrs([nexus_worker_type()]),
354353
None,
355354
slot_context_data.clone(),
355+
meter.clone(),
356356
);
357357
let (wft_stream, act_poller, nexus_poller) = match task_pollers {
358358
TaskPollers::Real => {
@@ -438,6 +438,7 @@ impl Worker {
438438
metrics.with_new_attrs([local_activity_worker_type()]),
439439
None,
440440
slot_context_data,
441+
meter.clone(),
441442
);
442443
let la_permits = la_pemit_dealer.get_extant_count_rcv();
443444
let local_act_mgr = Arc::new(LocalActivityManager::new(

core/src/worker/tuner.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,10 @@ pub use resource_based::{
77
ResourceSlotOptions,
88
};
99

10-
use std::sync::{Arc, OnceLock};
11-
use temporal_sdk_core_api::{
12-
telemetry::metrics::TemporalMeter,
13-
worker::{
14-
ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, SlotKind, SlotSupplier,
15-
WorkerConfig, WorkerTuner, WorkflowSlotKind,
16-
},
10+
use std::sync::Arc;
11+
use temporal_sdk_core_api::worker::{
12+
ActivitySlotKind, LocalActivitySlotKind, NexusSlotKind, SlotKind, SlotSupplier, WorkerConfig,
13+
WorkerTuner, WorkflowSlotKind,
1714
};
1815

1916
/// Allows for the composition of different slot suppliers into a [WorkerTuner]
@@ -22,7 +19,6 @@ pub struct TunerHolder {
2219
act_supplier: Arc<dyn SlotSupplier<SlotKind = ActivitySlotKind> + Send + Sync>,
2320
la_supplier: Arc<dyn SlotSupplier<SlotKind = LocalActivitySlotKind> + Send + Sync>,
2421
nexus_supplier: Arc<dyn SlotSupplier<SlotKind = NexusSlotKind> + Send + Sync>,
25-
metrics: OnceLock<TemporalMeter>,
2622
}
2723

2824
/// Can be used to construct a [TunerHolder] without needing to manually construct each
@@ -245,7 +241,6 @@ impl TunerBuilder {
245241
.nexus_slot_supplier
246242
.clone()
247243
.unwrap_or_else(|| Arc::new(FixedSizeSlotSupplier::new(100))),
248-
metrics: OnceLock::new(),
249244
}
250245
}
251246
}
@@ -274,8 +269,4 @@ impl WorkerTuner for TunerHolder {
274269
) -> Arc<dyn SlotSupplier<SlotKind = NexusSlotKind> + Send + Sync> {
275270
self.nexus_supplier.clone()
276271
}
277-
278-
fn attach_metrics(&self, m: TemporalMeter) {
279-
let _ = self.metrics.set(m);
280-
}
281272
}

core/src/worker/tuner/resource_based.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,9 @@ where
262262
type SlotKind = SK;
263263

264264
async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit {
265+
if let Some(m) = ctx.get_metrics_meter() {
266+
self.inner.attach_metrics(m);
267+
}
265268
loop {
266269
if let Some(value) = self.issue_if_below_minimums(ctx) {
267270
return value;
@@ -282,6 +285,9 @@ where
282285
}
283286

284287
fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option<SlotSupplierPermit> {
288+
if let Some(m) = ctx.get_metrics_meter() {
289+
self.inner.attach_metrics(m);
290+
}
285291
if let v @ Some(_) = self.issue_if_below_minimums(ctx) {
286292
return v;
287293
}
@@ -396,10 +402,6 @@ impl<MI: SystemResourceInfo + Sync + Send + 'static> WorkerTuner for ResourceBas
396402
let o = self.nexus_opts.unwrap_or(DEFAULT_NEXUS_SLOT_OPTS);
397403
self.slots.as_kind(o)
398404
}
399-
400-
fn attach_metrics(&self, metrics: TemporalMeter) {
401-
self.slots.attach_metrics(metrics);
402-
}
403405
}
404406

405407
impl<MI: SystemResourceInfo + Sync + Send> ResourceController<MI> {
@@ -596,6 +598,7 @@ mod tests {
596598
MetricsContext::no_op(),
597599
None,
598600
Arc::new(Default::default()),
601+
None,
599602
);
600603
let pd_s = pd.clone().into_sticky();
601604
// Start with too high usage
@@ -625,6 +628,7 @@ mod tests {
625628
MetricsContext::no_op(),
626629
None,
627630
Arc::new(Default::default()),
631+
None,
628632
);
629633
// Start with too high usage
630634
used.store(90_000, Ordering::Release);
@@ -649,6 +653,7 @@ mod tests {
649653
MetricsContext::no_op(),
650654
None,
651655
Arc::new(Default::default()),
656+
None,
652657
);
653658
let pd_s = pd.clone().into_sticky();
654659
let order = crossbeam_queue::ArrayQueue::new(2);
@@ -684,6 +689,7 @@ mod tests {
684689
MetricsContext::no_op(),
685690
None,
686691
Arc::new(Default::default()),
692+
None,
687693
);
688694
let order = crossbeam_queue::ArrayQueue::new(2);
689695
let waits_free = async {
@@ -713,6 +719,7 @@ mod tests {
713719
MetricsContext::no_op(),
714720
None,
715721
Arc::new(Default::default()),
722+
None,
716723
);
717724
used.store(90_000, Ordering::Release);
718725
let _p1 = pd.try_acquire_owned().unwrap();

0 commit comments

Comments
 (0)