Skip to content

Commit 3d10ba6

Browse files
Enhancement/allow custom metric buckets (#781)
1 parent 127835e commit 3d10ba6

File tree

4 files changed

+83
-1
lines changed

4 files changed

+83
-1
lines changed

temporalio/bridge/runtime.py

+1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class PrometheusConfig:
8080
counters_total_suffix: bool
8181
unit_suffix: bool
8282
durations_as_seconds: bool
83+
histogram_bucket_overrides: Optional[Mapping[str, Sequence[float]]] = None
8384

8485

8586
@dataclass(frozen=True)

temporalio/bridge/src/runtime.rs

+6
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ pub struct PrometheusConfig {
8383
counters_total_suffix: bool,
8484
unit_suffix: bool,
8585
durations_as_seconds: bool,
86+
histogram_bucket_overrides: Option<HashMap<String, Vec<f64>>>,
8687
}
8788

8889
const FORWARD_LOG_BUFFER_SIZE: usize = 2048;
@@ -347,6 +348,11 @@ impl TryFrom<MetricsConfig> for Arc<dyn CoreMeter> {
347348
if let Some(global_tags) = conf.global_tags {
348349
build.global_tags(global_tags);
349350
}
351+
if let Some(overrides) = prom_conf.histogram_bucket_overrides {
352+
build.histogram_bucket_overrides(temporal_sdk_core_api::telemetry::HistogramBucketOverrides {
353+
overrides,
354+
});
355+
}
350356
let prom_options = build.build().map_err(|err| {
351357
PyValueError::new_err(format!("Invalid Prometheus config: {}", err))
352358
})?;

temporalio/runtime.py

+2
Original file line numberDiff line numberDiff line change
@@ -277,13 +277,15 @@ class PrometheusConfig:
277277
counters_total_suffix: bool = False
278278
unit_suffix: bool = False
279279
durations_as_seconds: bool = False
280+
histogram_bucket_overrides: Optional[Mapping[str, Sequence[float]]] = None
280281

281282
def _to_bridge_config(self) -> temporalio.bridge.runtime.PrometheusConfig:
282283
return temporalio.bridge.runtime.PrometheusConfig(
283284
bind_address=self.bind_address,
284285
counters_total_suffix=self.counters_total_suffix,
285286
unit_suffix=self.unit_suffix,
286287
durations_as_seconds=self.durations_as_seconds,
288+
histogram_bucket_overrides=self.histogram_bucket_overrides,
287289
)
288290

289291

tests/test_runtime.py

+74-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import logging
22
import logging.handlers
33
import queue
4+
import re
45
import uuid
6+
from datetime import timedelta
57
from typing import List, cast
68
from urllib.request import urlopen
79

@@ -16,7 +18,7 @@
1618
TelemetryFilter,
1719
)
1820
from temporalio.worker import Worker
19-
from tests.helpers import assert_eq_eventually, find_free_port
21+
from tests.helpers import assert_eq_eventually, assert_eventually, find_free_port
2022

2123

2224
@workflow.defn
@@ -181,3 +183,74 @@ async def has_log() -> bool:
181183
assert record.levelno == logging.WARNING
182184
assert record.name == f"{logger.name}-sdk_core::temporal_sdk_core::worker::workflow"
183185
assert record.temporal_log.fields["run_id"] == handle.result_run_id # type: ignore
186+
187+
188+
async def test_prometheus_histogram_bucket_overrides(client: Client):
189+
# Set up a Prometheus configuration with custom histogram bucket overrides
190+
prom_addr = f"127.0.0.1:{find_free_port()}"
191+
special_value = float(1234.5678)
192+
histogram_overrides = {
193+
"temporal_long_request_latency": [special_value / 2, special_value],
194+
"custom_histogram": [special_value / 2, special_value],
195+
}
196+
197+
runtime = Runtime(
198+
telemetry=TelemetryConfig(
199+
metrics=PrometheusConfig(
200+
bind_address=prom_addr,
201+
counters_total_suffix=False,
202+
unit_suffix=False,
203+
durations_as_seconds=False,
204+
histogram_bucket_overrides=histogram_overrides,
205+
),
206+
),
207+
)
208+
209+
# Create a custom histogram metric
210+
custom_histogram = runtime.metric_meter.create_histogram(
211+
"custom_histogram", "Custom histogram", "ms"
212+
)
213+
214+
# Record a value to the custom histogram
215+
custom_histogram.record(600)
216+
217+
# Create client with overrides
218+
client_with_overrides = await Client.connect(
219+
client.service_client.config.target_host,
220+
namespace=client.namespace,
221+
runtime=runtime,
222+
)
223+
224+
async def run_workflow(client: Client):
225+
task_queue = f"task-queue-{uuid.uuid4()}"
226+
async with Worker(
227+
client,
228+
task_queue=task_queue,
229+
workflows=[HelloWorkflow],
230+
):
231+
assert "Hello, World!" == await client.execute_workflow(
232+
HelloWorkflow.run,
233+
"World",
234+
id=f"workflow-{uuid.uuid4()}",
235+
task_queue=task_queue,
236+
)
237+
238+
await run_workflow(client_with_overrides)
239+
240+
async def check_metrics() -> None:
241+
with urlopen(url=f"http://{prom_addr}/metrics") as f:
242+
metrics_output = f.read().decode("utf-8")
243+
244+
for key, buckets in histogram_overrides.items():
245+
assert (
246+
key in metrics_output
247+
), f"Missing {key} in full output: {metrics_output}"
248+
for bucket in buckets:
249+
# expect to have {key}_bucket and le={bucket} in the same line with arbitrary strings between them
250+
regex = re.compile(f'{key}_bucket.*le="{bucket}"')
251+
assert regex.search(
252+
metrics_output
253+
), f"Missing bucket for {key} in full output: {metrics_output}"
254+
255+
# Wait for metrics to appear and match the expected buckets
256+
await assert_eventually(check_metrics)

0 commit comments

Comments
 (0)