Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Canonical metrics: harmonized metric surface aligned with the cross-SDK catalog -- see [METRICS.md](METRICS.md) for the full catalog, configuration, and implementation details
- Bounded `uri` label on `http_api_client_request_seconds`: uses path templates (e.g. `/workflow/{workflowId}`) instead of fully-resolved paths, preventing metric cardinality explosion from dynamic IDs
- `WorkflowStatusProbe` in harness: opt-in probe (via `HARNESS_PROBE_RATE_PER_SEC`) that exercises UUID-bearing endpoints to validate template URI metrics
- Worker panic resilience: spawned task executions are wrapped in `catch_unwind` so that an uncaught panic is logged, publishes a `thread_uncaught_exceptions_total` metric event, and cleans up tracking state (semaphore permit, active task count) instead of silently leaking resources

### Changed

- The Rust SDK is unreleased, so the emitted metric surface is canonical on day one; there is no legacy mode or migration path
- `ApiClient` public methods accept `impl Into<ApiPath>` to pair resolved paths with bounded-cardinality metric templates -- see [METRICS.md](METRICS.md#detailed-technical-notes)
47 changes: 14 additions & 33 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -892,44 +892,25 @@ handler.add_event_listener(Arc::new(MyListener));

## Metrics

### MetricsSettings
The `MetricsCollector` in `src/metrics/collector.rs` implements the full
canonical Prometheus catalog using the `prometheus` crate. It is wired into
the worker framework via `TaskHandler::enable_metrics`, which registers it as
both a `TaskRunnerEventsListener` (for task-level events) and as the
`HttpMetricsObserver` on `ApiClient` (for HTTP request latency). An optional
built-in HTTP server exposes `/metrics` and `/health` endpoints for scraping.

```rust
use conductor::MetricsSettings;
use conductor::metrics::MetricsSettings;

let settings = MetricsSettings::default()
.with_http_port(9090) // Serve metrics on :9090/metrics
.with_metrics_path("/metrics")
.with_namespace("conductor")
.with_update_interval(Duration::from_secs(5));
handler.enable_metrics(
MetricsSettings::new()
.with_http_port(9090)
.with_metrics_path("/metrics"),
);
```

### Prometheus Metrics

| Metric | Type | Labels | Description |
|--------|------|--------|-------------|
| `conductor_task_poll_total` | Counter | `task_type` | Total poll attempts |
| `conductor_task_poll_error_total` | Counter | `task_type`, `error_type` | Poll errors |
| `conductor_task_execute_error_total` | Counter | `task_type`, `error_type` | Execution errors |
| `conductor_task_update_error_total` | Counter | `task_type` | Update errors |
| `conductor_task_paused_total` | Counter | `task_type` | Polls while paused |
| `conductor_task_poll_time_seconds` | Histogram | `task_type`, `status` | Poll latency |
| `conductor_task_execute_time_seconds` | Histogram | `task_type`, `status` | Execution time |
| `conductor_task_result_size_bytes` | Gauge | `task_type` | Result payload size |
| `conductor_active_workers` | Gauge | `task_type` | Active worker count |

### Accessing Metrics

```rust
// Via HTTP endpoint (if configured)
// GET http://localhost:9090/metrics

// Programmatically
if let Some(collector) = handler.metrics_collector() {
let metrics_text = collector.gather();
println!("{}", metrics_text);
}
```
See [METRICS.md](METRICS.md) for the complete metric catalog, labels, bucket
sets, intentional divergences, and example scrape output.

---

Expand Down
227 changes: 227 additions & 0 deletions METRICS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# Metrics Documentation

The Conductor Rust SDK includes built-in metrics collection using Prometheus to
monitor worker performance, API requests, and task execution.

All metric names, label names, label values, and Prometheus types emitted by
this SDK match the canonical catalog in
[`sdk-metrics-harmonization.md`](https://github.com/orkes-io/certification-cloud-util/blob/main/sdk-metrics-harmonization.md).
Because the Rust SDK is unreleased, there are no legacy/deprecated metric
names to carry forward — the emitted surface is canonical on day one.

## Table of Contents

- [Quick Reference](#quick-reference)
- [Configuration](#configuration)
- [Intentional divergences](#intentional-divergences)
- [Examples](#examples)
- [Troubleshooting](#troubleshooting)
- [Detailed Technical Notes](#detailed-technical-notes)

## Quick Reference

### Canonical metrics emitted by the SDK

| Metric | Type | Labels | Meaning |
|---|---|---|---|
| `task_poll_total` | Counter | `taskType` | Incremented for every poll request issued to the server. |
| `task_poll_error_total` | Counter | `taskType`, `exception` | Client-side poll failures. `exception` is the unqualified `ConductorError` variant name. |
| `task_execution_started_total` | Counter | `taskType` | Incremented when a polled task is dispatched to the user worker function. |
| `task_execute_error_total` | Counter | `taskType`, `exception` | User worker returned `Err(_)`. |
| `task_update_error_total` | Counter | `taskType`, `exception` | Task-result update back to the server failed after all retries. |
| `task_paused_total` | Counter | `taskType` | Poll skipped because the runner is paused. |
| `thread_uncaught_exceptions_total` | Counter | `exception` | Panic escaped a spawned worker task; `exception` is always `"Panic"`. |
| `workflow_start_error_total` | Counter | `workflowType`, `exception` | `WorkflowClient::start_workflow` failed client-side. |
| `task_ack_error_total` | Counter | `taskType`, `exception` | **Surface-only.** Not incremented by the internal runner (see [Intentional divergences](#intentional-divergences)). |
| `task_ack_failed_total` | Counter | `taskType` | **Surface-only.** Not incremented by the internal runner. |
| `task_execution_queue_full_total` | Counter | `taskType` | **Surface-only.** Not incremented by the internal runner. |
| `external_payload_used_total` | Counter | `entityName`, `operation`, `payloadType` | **Surface-only.** Reserved for future large-payload external-storage support. |
| `task_poll_time_seconds` | Histogram | `taskType`, `status` | Poll latency. `status ∈ {SUCCESS, FAILURE}`. |
| `task_execute_time_seconds` | Histogram | `taskType`, `status` | User worker function wall-clock. |
| `task_update_time_seconds` | Histogram | `taskType`, `status` | Latency of the `UpdateTask` call (including retries). |
| `http_api_client_request_seconds` | Histogram | `method`, `uri`, `status` | Latency of every Conductor API HTTP request. `status` is the HTTP status code as a string, or `"0"` for network errors. |
| `task_result_size_bytes` | Histogram | `taskType` | Serialized byte size of task result output. |
| `workflow_input_size_bytes` | Histogram | `workflowType`, `version` | Serialized byte size of workflow input. `version` is the workflow version as a string, or `""` when unset. |
| `active_workers` | Gauge | `taskType` | Current number of in-flight task executions. |

Time histograms use the canonical seconds bucket set:
`(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0)`.

Size histograms use the canonical size bucket set:
`(100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000)` bytes.

### Label values

- `status` on task time histograms: uppercase `"SUCCESS"` / `"FAILURE"`.
- `status` on `http_api_client_request_seconds`: HTTP status code rendered as
a string (e.g. `"200"`), or `"0"` when the transport layer fails before
receiving a status.
- `uri`: the API-relative path template without the server URL's path prefix
(e.g. `/tasks/poll/batch/{taskType}`, not `/api/tasks/poll/batch/my_worker`).
Dynamic path segments retain their `{placeholder}` tokens so that metric
label cardinality is bounded. See the
[path template note](#uri-label--path-templates) below.
- `exception`: the unqualified `ConductorError` variant name
(`Http`, `Json`, `Auth`, `Server`, …), the short type name for non-
`ConductorError` errors, or `"Panic"` for uncaught panics.

### `uri` label — path templates

The `uri` label on `http_api_client_request_seconds` carries the **path
template** (e.g. `/tasks/poll/batch/{taskType}`) rather than the
fully-resolved request path. The server URL's path prefix (e.g. `/api`) is
never included. This keeps metric cardinality bounded regardless of how many
unique workflow IDs, task types, or other dynamic path segments flow through
the SDK.

All Conductor SDKs (Go, Java, Python, Ruby, Rust) now follow this convention.
See [Detailed Technical Notes](#detailed-technical-notes) at the end of this
document for per-SDK implementation details.

## Configuration

Metrics are wired up by calling [`TaskHandler::enable_metrics`]. This:

- Registers a shared `MetricsCollector` as a `TaskRunnerEventsListener` for
task-level events.
- Installs the same `MetricsCollector` as the `HttpMetricsObserver` inside the
handler's `ApiClient`, capturing every HTTP request (including requests
made by `ConductorClient` instances vended via `TaskHandler::conductor_client()`).
- Optionally starts an HTTP scrape endpoint (`/metrics`, `/health`).

Example:

```rust
use conductor::{
configuration::Configuration,
metrics::MetricsSettings,
worker::TaskHandler,
};

let config = Configuration::from_env();
let mut handler = TaskHandler::new(config)?;

handler.enable_metrics(
MetricsSettings::new()
.with_http_port(9991)
.with_metrics_path("/metrics"),
);

// Workflow-start events will flow through the same dispatcher as tasks:
let conductor = handler.conductor_client();
let workflow_client = conductor.workflow_client();
```

By default `MetricsSettings::namespace` is `""`, so metric names appear
uncurried (e.g. `task_poll_total`, matching Java/Go/Python). Call
`.with_namespace("myapp")` to prefix names if you need to isolate Conductor
SDK metrics from other metrics in the same registry.

## Intentional divergences

Some asymmetries with the canonical catalog are kept by design rather than
papered over:

| Metric | Status in Rust SDK | Reason |
|---|---|---|
| `task_ack_error_total`, `task_ack_failed_total` | Registered; never incremented by the internal runner. Public helpers `MetricsCollector::increment_task_ack_error` / `increment_task_ack_failed` exposed for user code. | Matches the Go SDK's runtime model: the batch-poll response itself acts as the ack, so there is no separate ack call for the SDK to instrument. |
| `task_execution_queue_full_total` | Registered; never incremented by the internal runner. | Rust's worker scheduling uses a `tokio::sync::Semaphore`; acquisition awaits rather than rejecting, so there is no "queue full" condition for the SDK to surface. |
| `external_payload_used_total` | Registered; never incremented by the internal runner. | The Rust client does not yet integrate with the external-payload-storage branch of the Conductor API. Helper method retained for user code that implements its own external-payload plumbing. |
| `worker_restart_total` | Not emitted. | Python-only metric: Python has a multi-process worker supervisor; Rust spawns Tokio tasks, so there is no equivalent "restart a subprocess" event. |
| `task_execution_completed_total` | Not emitted. | Canonical catalog exposes task execution completion only through `task_execute_time_seconds_count{status="SUCCESS"}`, which is already present. |
| `active_workers` labels | `{taskType}` | Matches canonical. |
| Metric name prefix | `""` (none) by default | Matches Java/Go/Python. Can be overridden via `MetricsSettings::with_namespace`. |

## Examples

See [`examples/metrics_example.rs`](./examples/metrics_example.rs) for a
runnable end-to-end demo that spins up workers, serves `/metrics` on a
configurable port, and exercises every metric in the catalog.

```prometheus
# HTTP API client request latency (uri is the path template, not the resolved path)
http_api_client_request_seconds_bucket{method="GET",uri="/tasks/poll/batch/{taskType}",status="200",le="0.1"} 97
http_api_client_request_seconds_bucket{method="GET",uri="/tasks/poll/batch/{taskType}",status="200",le="+Inf"} 100
http_api_client_request_seconds_count{method="GET",uri="/tasks/poll/batch/{taskType}",status="200"} 100
http_api_client_request_seconds_sum{method="GET",uri="/tasks/poll/batch/{taskType}",status="200"} 8.21

# Task poll
task_poll_total{taskType="my_worker"} 124

# Task execute time (SUCCESS)
task_execute_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.25"} 42
task_execute_time_seconds_count{taskType="my_worker",status="SUCCESS"} 42

# Workflow start error
workflow_start_error_total{workflowType="my_wf",exception="Server"} 2
```

## Troubleshooting

### Metrics are empty

- Verify that `TaskHandler::enable_metrics` is called before `handler.start()`.
- Verify workers have polled or executed at least one task. Metrics are created
lazily when the corresponding event occurs, so a freshly started worker with
no traffic will have no series.
- Confirm the scrape endpoint is reachable at the expected host and port
(default: `http://localhost:9991/metrics`).

### Missing HTTP or workflow metrics

- `http_api_client_request_seconds` is recorded by the `HttpMetricsObserver`
installed on `ApiClient` by `enable_metrics`. If `enable_metrics` is not
called, no HTTP metrics are emitted.
- `workflow_start_error_total` and `workflow_input_size_bytes` require the
`WorkflowClient` to be obtained via `handler.conductor_client()` so that
events flow through the shared `EventDispatcher`. A standalone
`ConductorClient` created separately from the handler will not emit these
metrics.

### High cardinality

- The `uri` label on `http_api_client_request_seconds` uses path templates
(e.g. `/workflow/{workflowId}`) to keep cardinality bounded. If you see
fully-resolved paths in your metrics, verify that HTTP requests are going
through the SDK's `ApiClient` rather than a standalone HTTP client.
- Avoid embedding user identifiers or unbounded values in task type, workflow
type, or external payload labels.

### No legacy/canonical gating

Unlike the Python, Go, Java, JavaScript, and Ruby SDKs, the Rust SDK has no
released legacy metrics surface. It ships the canonical catalog directly with
no `WORKER_CANONICAL_METRICS` environment variable and no factory/switchout
pattern. If you operate a mixed fleet of Conductor workers across multiple
SDKs, the other SDKs require `WORKER_CANONICAL_METRICS=true` to emit the
same metric names and shapes that the Rust SDK emits by default.

---

## Detailed Technical Notes

### Path template `uri` label — how it works

The `uri` label on `http_api_client_request_seconds` carries a path
**template** rather than the fully-resolved request path, preventing
cardinality explosion from dynamic path segments (UUIDs, task type names,
etc.). The server URL's base path prefix (e.g. `/api`) is never included.

The Rust SDK implements this via the `ApiPath` struct. Public `ApiClient`
methods accept `impl Into<ApiPath<'_>>`, which pairs a resolved request
path with a bounded-cardinality metric template. For static endpoints
(no dynamic segments), callers pass a plain `&str` — the `From<&str>` impl
uses the same string for both the request path and the metric label. For
dynamic endpoints, callers use `ApiPath::templated(&path, "/template/{id}")`
to supply both. `ApiClient::record_request` then passes the template to
`HttpMetricsObserver::observe` as the `uri` label.

The template string is always the API-relative resource path (e.g.
`/workflow/{workflowId}`), never the fully-qualified URL or the base-path-
prefixed path. This means:

- `/workflow/{workflowId}` rather than `/api/workflow/abc-123-def`
- `/tasks/poll/batch/{taskType}` rather than `/tasks/poll/batch/my_worker`

Endpoints without path parameters (e.g. `/tasks/search`) use the raw resource
path directly, which is already a stable template.
6 changes: 6 additions & 0 deletions WORKER_COMPARISON.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ let tasks = task_client.batch_poll(..., available_slots, ...).await?;

### Prometheus Metrics

> **Note:** The table below is a historical snapshot from when the Python SDK
> was emitting its pre-harmonization (legacy) metrics surface. Worker metrics
> have since been harmonized across all Conductor SDKs under a single canonical
> catalog. For the complete and current Rust SDK metrics catalog, see
> [METRICS.md](METRICS.md).

| Metric | Python | Rust |
|--------|--------|------|
| `task_poll_total` | ✅ | ✅ |
Expand Down
20 changes: 11 additions & 9 deletions docs/WORKER.md
Original file line number Diff line number Diff line change
Expand Up @@ -471,18 +471,20 @@ let worker = FnWorker::new("batch_processor", |task: Task| async move {
use conductor::metrics::MetricsSettings;

let mut handler = TaskHandler::new(config)?;
handler.enable_metrics(MetricsSettings {
port: 9090,
enabled: true,
});
handler.enable_metrics(
MetricsSettings::new()
.with_http_port(9090)
.with_metrics_path("/metrics"),
);
```

Metrics available at `http://localhost:9090/metrics`:
Metrics are available at `http://localhost:9090/metrics`. The SDK emits the
full canonical Prometheus catalog (counters, histograms, and gauges) covering
worker polling, task execution, task result updates, HTTP API client latency,
and more.

- `conductor_worker_tasks_polled_total` - Total tasks polled
- `conductor_worker_tasks_executed_total` - Total tasks executed
- `conductor_worker_task_duration_seconds` - Task execution duration
- `conductor_worker_poll_errors_total` - Poll errors
See [METRICS.md](../METRICS.md) for the complete metric catalog, label
definitions, bucket sets, and configuration details.

### Event Listeners

Expand Down
26 changes: 16 additions & 10 deletions examples/metrics_example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,25 @@ async fn main() -> Result<()> {
println!("{}", "=".repeat(70));
println!("\nMetrics endpoint: http://localhost:9090/metrics");
println!("Health endpoint: http://localhost:9090/health");
println!("\nAvailable Metrics:");
println!("\nAvailable Metrics (canonical Conductor SDK catalog):");
println!(" Counter metrics:");
println!(" - conductor_task_poll_total{{task_type}}");
println!(" - conductor_task_poll_error_total{{task_type, error_type}}");
println!(" - conductor_task_execute_error_total{{task_type, error_type}}");
println!(" - conductor_task_update_error_total{{task_type}}");
println!(" - conductor_task_paused_total{{task_type}}");
println!(" - task_poll_total{{taskType}}");
println!(" - task_poll_error_total{{taskType, exception}}");
println!(" - task_execution_started_total{{taskType}}");
println!(" - task_execute_error_total{{taskType, exception}}");
println!(" - task_update_error_total{{taskType, exception}}");
println!(" - task_paused_total{{taskType}}");
println!(" - thread_uncaught_exceptions_total{{taskType, exception}}");
println!(" - workflow_start_error_total{{workflowType, exception}}");
println!("\n Histogram metrics:");
println!(" - conductor_task_poll_time_seconds{{task_type, status}}");
println!(" - conductor_task_execute_time_seconds{{task_type, status}}");
println!(" - task_poll_time_seconds{{taskType, status}}");
println!(" - task_execute_time_seconds{{taskType, status}}");
println!(" - task_update_time_seconds{{taskType, status}}");
println!(" - http_api_client_request_seconds{{method, uri, status}}");
println!("\n Gauge metrics:");
println!(" - conductor_task_result_size_bytes{{task_type}}");
println!(" - conductor_active_workers{{task_type}}");
println!(" - task_result_size_bytes{{taskType}}");
println!(" - workflow_input_size_bytes{{workflowType}}");
println!(" - active_workers{{taskType}}");
println!("\nWorkers:");
println!(" - quick_task: Fast execution (~50ms)");
println!(" - variable_task: Variable execution time");
Expand Down
Loading
Loading