Skip to content

Commit 813222b

Browse files
committed
feat: Implements OTEL Server and Output Capture
1 parent 0685887 commit 813222b

29 files changed

Lines changed: 1799 additions & 324 deletions

Cargo.lock

Lines changed: 77 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,4 @@ instant-acme = { version = "0.8.4", default-features = false, features = ["hyper
4646
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-manual-roots", "json"] }
4747
x509-parser = "0.18.0"
4848
http = "1.4.0"
49+
axum = "0.8.8"

TELEMETRY.md

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
# Telemetry Configuration
22

3-
Dispenser includes a built-in, high-performance telemetry system powered by [Delta Lake](https://delta.io/). It allows you to automatically collect deployment events and container health status, writing them directly to data lakes (S3, GCS, Azure) or local filesystems in Parquet format.
3+
Dispenser includes a built-in, high-performance telemetry system powered by [Delta Lake](https://delta.io/). It allows you to automatically collect deployment events, container health status, application logs/traces, and raw container output, writing them directly to data lakes (S3, GCS, Azure) or local filesystems in Parquet format.
44

55
## Overview
66

77
The telemetry system runs in a dedicated, isolated thread to ensure that heavy I/O operations never block the main orchestration loop. It provides:
88

99
1. **Deployment Tracking**: Every time a container is created, updated, or restarted, a detailed event is logged.
1010
2. **Health Monitoring**: Periodically samples the status of all managed containers (CPU, memory, uptime, health checks).
11-
3. **Delta Lake Integration**: Writes data using the Delta Lake protocol, enabling ACID transactions, scalable metadata handling, and direct compatibility with tools like Spark, Trino, Athena, and Databricks.
11+
3. **Application Telemetry (OTLP)**: Ingests structured logs and traces from services using standard OpenTelemetry SDKs.
12+
4. **Container Output**: Captures raw `stdout` and `stderr` streams from all managed containers with sequence-guaranteed ordering.
13+
5. **Delta Lake Integration**: Writes data using the Delta Lake protocol, enabling ACID transactions, scalable metadata handling, and direct compatibility with tools like Spark, Trino, Athena, and Databricks.
1214

1315
## Configuration
1416

@@ -24,6 +26,9 @@ enabled = true
2426
# Supported schemes: file://, s3://, gs://, az://, adls://
2527
table_uri_deployments = "s3://my-data-lake/dispenser/deployments"
2628
table_uri_status = "s3://my-data-lake/dispenser/status"
29+
table_uri_logs = "s3://my-data-lake/dispenser/logs"
30+
table_uri_traces = "s3://my-data-lake/dispenser/traces"
31+
table_uri_container_output = "s3://my-data-lake/dispenser/container-output"
2732

2833
# Optional: How often to sample container status (default: 60 seconds)
2934
status_interval = 60
@@ -69,9 +74,25 @@ Dispenser supports several authentication methods via environment variables:
6974
* Service Principal: `AZURE_CLIENT_ID`, `AZURE_CLIENT_SECRET`, `AZURE_TENANT_ID`.
7075
* Managed Identity (if running on Azure VMs/AKS).
7176

77+
## OpenTelemetry (OTLP) Ingestion
78+
79+
Dispenser acts as a sidecar host for your services. When telemetry is enabled, Dispenser starts an **Ingestion Service** listening on.
80+
81+
* **Endpoint**: `http://0.0.0.0:4318`
82+
* **Protocol**: OTLP/HTTP (JSON)
83+
84+
### Automatic Environment Variables
85+
86+
Dispenser automatically injects the following environment variables into all managed containers to simplify instrumentation:
87+
88+
* `OTEL_EXPORTER_OTLP_ENDPOINT="http://172.28.0.1:4318"`
89+
* `OTEL_SERVICE_NAME="{service_name}"` (The name from your `service.toml`)
90+
91+
Standard OTel SDKs will automatically detect these variables and begin shipping logs and traces to Dispenser without further configuration.
92+
7293
## Data Schemas
7394

74-
Dispenser automatically manages two Delta tables. It will create them if they do not exist.
95+
Dispenser automatically manages several Delta tables. It will create them if they do not exist.
7596

7697
### Deployments Table (`dispenser-deployments`)
7798

@@ -116,6 +137,56 @@ Records periodic snapshots of the runtime state of containers.
116137
| `failing_streak` | `INTEGER` | Consecutive healthcheck failures. |
117138
| `last_health_output` | `STRING` | Output of the last failed healthcheck (truncated). |
118139

140+
### Logs Table (`dispenser-logs`)
141+
142+
Stores structured logs emitted by applications via OTel.
143+
144+
| Column | Type | Description |
145+
| :--- | :--- | :--- |
146+
| `date` | `DATE` | Partition column (UTC). |
147+
| `timestamp` | `TIMESTAMP` | Exact time of the log entry. |
148+
| `service` | `STRING` | Service name. |
149+
| `severity` | `STRING` | INFO, WARN, ERROR, etc. |
150+
| `body` | `STRING` | The log message. |
151+
| `trace_id` | `STRING` | Associated trace ID (hex). |
152+
| `span_id` | `STRING` | Associated span ID (hex). |
153+
| `attributes` | `MAP<STRING, STRING>` | Flattened log attributes. |
154+
| `resource` | `MAP<STRING, STRING>` | Resource attributes (pod, node, etc). |
155+
156+
### Traces Table (`dispenser-traces`)
157+
158+
Stores distributed tracing spans.
159+
160+
| Column | Type | Description |
161+
| :--- | :--- | :--- |
162+
| `date` | `DATE` | Partition column. |
163+
| `trace_id` | `STRING` | Trace ID (32-char hex). |
164+
| `span_id` | `STRING` | Span ID (16-char hex). |
165+
| `parent_span_id` | `STRING` | Parent Span ID. |
166+
| `name` | `STRING` | Span name (e.g., "GET /api/users"). |
167+
| `kind` | `STRING` | SERVER, CLIENT, PRODUCER, etc. |
168+
| `start_time` | `TIMESTAMP` | Start time. |
169+
| `end_time` | `TIMESTAMP` | End time. |
170+
| `duration_ms` | `LONG` | Calculated duration. |
171+
| `status_code` | `STRING` | OK, ERROR. |
172+
| `status_message` | `STRING` | Error description. |
173+
| `service` | `STRING` | Service name. |
174+
| `attributes` | `MAP<STRING, STRING>` | Span attributes. |
175+
176+
### Container Output Table (`dispenser-container-output`)
177+
178+
Captures raw `stdout` and `stderr` streams.
179+
180+
| Column | Type | Description |
181+
| :--- | :--- | :--- |
182+
| `date` | `DATE` | Partition column. |
183+
| `timestamp` | `TIMESTAMP` | Exact time of the log line. |
184+
| `service` | `STRING` | Service name. |
185+
| `container_id` | `STRING` | Full container ID. |
186+
| `stream` | `STRING` | `stdout` or `stderr`. |
187+
| `message` | `STRING` | The raw log line. |
188+
| `sequence` | `LONG` | Monotonically increasing counter for perfect ordering. |
189+
119190
## Performance Tuning
120191

121192
### Buffering & Latency
@@ -144,4 +215,4 @@ The telemetry service runs on a dedicated Tokio runtime spawned in a separate OS
144215
To prevent indefinite storage growth, Dispenser applies the following default retention policies during table creation:
145216

146217
* **Log Retention**: 30 days (Deployments), 7 days (Status). Delta log history is kept for time-travel queries.
147-
* **Deleted Files**: 7 days (Deployments), 1 day (Status). Vacuum operations can reclaim space after this period.
218+
* **Deleted Files**: 7 days (Deployments), 1 day (Status). Vacuum operations can reclaim space after this period.

example/dispenser.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@ delay = 60
44
[proxy]
55
enabled = true
66

7+
[telemetry]
8+
enabled = true
9+
table_uri_deployments = "telem/deployments"
10+
table_uri_status = "telem/status"
11+
table_uri_logs = "telem/logs"
12+
table_uri_container_output = "telem/container_out"
13+
table_uri_traces = "telem/traces"
14+
715
[[service]]
816
path = "service1"
917

src/main.rs

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ use crate::{
55
cli::Commands,
66
proxy::{acme, run_dummy_proxy, run_proxy, ProxySignals},
77
service::{
8+
file::TelemetryConfig,
89
manager::{ServiceMangerConfig, ServicesManager},
910
vars::ServiceConfigError,
1011
},
12+
telemetry::{TelemetryClient, TelemetryService},
1113
};
1214

1315
mod cli;
@@ -81,31 +83,7 @@ async fn main() -> ExitCode {
8183
};
8284

8385
let telemetry_client =
84-
if let Some(telemetry_config) = &service_manager_config.entrypoint_file.telemetry {
85-
if telemetry_config.enabled {
86-
let (tx, rx) = tokio::sync::mpsc::channel(10000);
87-
let config = telemetry_config.clone();
88-
89-
std::thread::spawn(move || {
90-
let rt = tokio::runtime::Builder::new_multi_thread()
91-
.worker_threads(2)
92-
.enable_all()
93-
.build()
94-
.expect("Failed to build telemetry runtime");
95-
96-
rt.block_on(async {
97-
let service = crate::telemetry::TelemetryService::new(config, rx);
98-
service.run().await;
99-
});
100-
});
101-
102-
Some(crate::telemetry::TelemetryClient::new(tx))
103-
} else {
104-
None
105-
}
106-
} else {
107-
None
108-
};
86+
init_telemetry(service_manager_config.entrypoint_file.telemetry.as_ref());
10987

11088
let manager =
11189
match ServicesManager::from_config(service_manager_config, None, telemetry_client.clone())
@@ -226,3 +204,37 @@ async fn main() -> ExitCode {
226204
}
227205
}
228206
}
207+
208+
fn init_telemetry(config: Option<&TelemetryConfig>) -> Option<TelemetryClient> {
209+
let telemetry_config = config?;
210+
if !telemetry_config.enabled {
211+
return None;
212+
}
213+
214+
let (tx, rx) = tokio::sync::mpsc::channel(10000);
215+
let config = telemetry_config.clone();
216+
217+
// Run ingestion service on main tokio runtime
218+
let tx_ingestion = tx.clone();
219+
tokio::spawn(async move {
220+
if let Err(e) = crate::telemetry::ingestion::start_ingestion_service(tx_ingestion).await {
221+
log::error!("OTLP ingestion service failed: {}", e);
222+
}
223+
});
224+
225+
// Telemetry Service runs in its own thread/runtime for Delta Lake operations
226+
std::thread::spawn(move || {
227+
let rt = tokio::runtime::Builder::new_multi_thread()
228+
.worker_threads(2)
229+
.enable_all()
230+
.build()
231+
.expect("Failed to build telemetry runtime");
232+
233+
rt.block_on(async {
234+
let service = TelemetryService::new(config, rx);
235+
service.run().await;
236+
});
237+
});
238+
239+
Some(TelemetryClient::new(tx))
240+
}

src/service/file.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ pub struct TelemetryConfig {
6666
pub enabled: bool,
6767
pub table_uri_deployments: String,
6868
pub table_uri_status: String,
69+
pub table_uri_logs: String,
70+
pub table_uri_traces: String,
71+
pub table_uri_container_output: String,
6972
pub buffer_size: Option<usize>,
7073
#[serde(default = "default_status_interval")]
7174
pub status_interval: u64,

0 commit comments

Comments
 (0)