Skip to content

Commit 6aaf41c

Browse files
authored
Add an example metrics-service to produce semi-realistic trace data
1 parent dcbb2ff commit 6aaf41c

9 files changed

Lines changed: 2940 additions & 246 deletions

File tree

Cargo.lock

Lines changed: 2321 additions & 246 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
resolver = "3"
33
members = [
44
"dial9-tokio-telemetry",
5+
"examples/metrics-service",
56
]
67

78
[workspace.dependencies]
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[package]
2+
name = "metrics-service"
3+
edition = "2024"
4+
version = "0.1.0"
5+
6+
[dependencies]
7+
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "sync"] }
8+
tokio-util = { version = "0.7", features = ["rt"] }
9+
axum = "0.8"
10+
aws-config = "1"
11+
aws-sdk-dynamodb = "1"
12+
serde = { version = "1", features = ["derive"] }
13+
serde_json = "1"
14+
dial9-tokio-telemetry = { path = "../../dial9-tokio-telemetry" }
15+
reqwest = { version = "0.12", features = ["json"] }
16+
clap = { version = "4", features = ["derive"] }

examples/metrics-service/README.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Metrics Service Example
2+
3+
A demonstration service that collects, aggregates, and stores metrics using Tokio, Axum, and DynamoDB, instrumented with `dial9-tokio-telemetry` for runtime tracing.
4+
5+
## What It Does
6+
7+
- **HTTP API**: Accepts metric submissions via POST and queries aggregated metrics via GET
8+
- **In-memory buffering**: Collects metrics in memory before periodic flushing
9+
- **DynamoDB persistence**: Stores aggregated metrics with timestamp-based partitioning
10+
- **Load testing**: Built-in client that simulates variable load patterns
11+
- **Telemetry**: Captures Tokio runtime traces to disk for performance analysis
12+
13+
The service runs for 55 seconds with a load profile that ramps up, sustains, ramps down, and includes a thundering herd spike.
14+
15+
## Usage
16+
17+
Run the example:
18+
19+
```bash
20+
cargo run
21+
```
22+
23+
The service will:
24+
1. Start an HTTP server on `0.0.0.0:3001`
25+
2. Create a DynamoDB table named `metrics-service` (requires AWS credentials)
26+
3. Launch a background flush worker (10-second intervals)
27+
4. Run a load-generating client with varying concurrency
28+
5. Write telemetry traces to `/tmp/metrics-service-traces/`
29+
6. Shut down automatically after 55 seconds
30+
31+
### API Endpoints
32+
33+
**Record a metric:**
34+
```bash
35+
curl -X POST http://localhost:3001/metrics \
36+
-H "Content-Type: application/json" \
37+
-d '{"name": "cpu", "value": 42.5}'
38+
```
39+
40+
**Query aggregated metrics:**
41+
```bash
42+
curl http://localhost:3001/metrics/cpu
43+
```
44+
45+
Returns JSON array with timestamp, sum, count, min, max for each time window.
46+
47+
## Configuration
48+
49+
Edit constants in `src/main.rs`:
50+
51+
| Constant | Default | Description |
52+
|----------|---------|-------------|
53+
| `FLUSH_INTERVAL` | 10s | How often to flush buffered metrics to DynamoDB |
54+
| `TABLE_NAME` | `"metrics-service"` | DynamoDB table name |
55+
| `SERVER_ADDR` | `"0.0.0.0:3001"` | HTTP server bind address |
56+
| `RUN_DURATION` | 55s | Total runtime before shutdown |
57+
58+
### Load Profile
59+
60+
Edit `src/client.rs` to adjust:
61+
- `MAX_WORKERS`: Peak concurrent requests (default: 40)
62+
- `THUNDERING_HERD`: Spike concurrency (default: 200)
63+
- `BASELINE`: Steady-state concurrency (default: 4)
64+
- `METRICS`: Metric names to cycle through
65+
66+
### Telemetry
67+
68+
Traces are written to `/tmp/metrics-service-traces/trace.bin` with:
69+
- Max file size: 1 MB (rotates automatically)
70+
- Max total size: 30 MB
71+
72+
Change the path or limits in the `RotatingWriter::new()` call in `main.rs`.
73+
74+
## Requirements
75+
76+
- AWS credentials configured (for DynamoDB access)
77+
- Write permissions to `/tmp/metrics-service-traces/`
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use std::collections::HashMap;
2+
use tokio::sync::Mutex;
3+
4+
use crate::ddb::DdbClient;
5+
6+
#[derive(Default)]
7+
struct Aggregate {
8+
sum: f64,
9+
count: u64,
10+
min: f64,
11+
max: f64,
12+
}
13+
14+
impl Aggregate {
15+
fn record(&mut self, value: f64) {
16+
if self.count == 0 {
17+
self.min = value;
18+
self.max = value;
19+
} else {
20+
self.min = self.min.min(value);
21+
self.max = self.max.max(value);
22+
}
23+
self.sum += value;
24+
self.count += 1;
25+
}
26+
}
27+
28+
pub struct MetricsBuffer {
29+
inner: Mutex<HashMap<String, Aggregate>>,
30+
}
31+
32+
impl MetricsBuffer {
33+
pub fn new() -> Self {
34+
Self {
35+
inner: Mutex::new(HashMap::new()),
36+
}
37+
}
38+
39+
pub async fn record(&self, name: String, value: f64) {
40+
self.inner
41+
.lock()
42+
.await
43+
.entry(name)
44+
.or_default()
45+
.record(value);
46+
}
47+
48+
pub async fn flush_to_ddb(&self, ddb: &DdbClient) {
49+
let snapshot: HashMap<String, (f64, u64, f64, f64)> = {
50+
let mut guard = self.inner.lock().await;
51+
guard
52+
.drain()
53+
.map(|(k, v)| (k, (v.sum, v.count, v.min, v.max)))
54+
.collect()
55+
};
56+
57+
let ts = std::time::SystemTime::now()
58+
.duration_since(std::time::UNIX_EPOCH)
59+
.unwrap()
60+
.as_secs();
61+
62+
for (name, (sum, count, min, max)) in snapshot {
63+
if let Err(e) = ddb.put_aggregate(&name, ts, sum, count, min, max).await {
64+
eprintln!("flush error for {name}: {e}");
65+
}
66+
}
67+
}
68+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use std::sync::Arc;
2+
use std::time::{Duration, Instant};
3+
4+
use reqwest::Client;
5+
use serde_json::json;
6+
use tokio::sync::Semaphore;
7+
use tokio::time::sleep;
8+
use tokio_util::sync::CancellationToken;
9+
10+
const METRICS: &[&str] = &["cpu", "memory", "latency", "error_rate", "queue_depth"];
11+
const MAX_WORKERS: usize = 40;
12+
const THUNDERING_HERD: usize = 200;
13+
const BASELINE: usize = 4;
14+
15+
/// Load profile (elapsed_secs, target_concurrency):
16+
/// 0-10: ramp 4 -> 40
17+
/// 10-20: hold at 40
18+
/// 20-30: ramp 40 -> 4
19+
/// 30-40: hold at 4 (baseline)
20+
/// 40-45: thundering herd (200)
21+
/// 45+: back to baseline (4)
22+
fn target_concurrency(elapsed: f64) -> usize {
23+
if elapsed < 10.0 {
24+
let t = elapsed / 10.0;
25+
(BASELINE as f64 + t * (MAX_WORKERS - BASELINE) as f64) as usize
26+
} else if elapsed < 20.0 {
27+
MAX_WORKERS
28+
} else if elapsed < 30.0 {
29+
let t = (elapsed - 20.0) / 10.0;
30+
(MAX_WORKERS as f64 - t * (MAX_WORKERS - BASELINE) as f64) as usize
31+
} else if elapsed < 40.0 {
32+
BASELINE
33+
} else if elapsed < 45.0 {
34+
THUNDERING_HERD
35+
} else {
36+
BASELINE
37+
}
38+
}
39+
40+
pub async fn run(base_url: &str, shutdown: CancellationToken) {
41+
let client = Arc::new(Client::new());
42+
// semaphore controls how many workers run concurrently
43+
let sem = Arc::new(Semaphore::new(0));
44+
let start = Instant::now();
45+
46+
// spawn a large pool of workers that each wait for a permit
47+
for i in 0..THUNDERING_HERD {
48+
let client = client.clone();
49+
let sem = sem.clone();
50+
let base_url = base_url.to_string();
51+
let shutdown = shutdown.clone();
52+
tokio::spawn(async move {
53+
let mut tick: u64 = i as u64;
54+
loop {
55+
tokio::select! {
56+
_ = shutdown.cancelled() => break,
57+
permit = sem.acquire() => {
58+
let _permit = permit.unwrap();
59+
do_work(&client, &base_url, i, tick).await;
60+
tick += 1;
61+
}
62+
}
63+
}
64+
});
65+
}
66+
67+
// coordinator: adjusts semaphore permits to match target concurrency
68+
let mut current = 0usize;
69+
loop {
70+
if shutdown.is_cancelled() {
71+
break;
72+
}
73+
let target = target_concurrency(start.elapsed().as_secs_f64());
74+
match target.cmp(&current) {
75+
std::cmp::Ordering::Greater => {
76+
sem.add_permits(target - current);
77+
println!("concurrency -> {target}");
78+
}
79+
std::cmp::Ordering::Less => {
80+
// acquire and forget permits to reduce concurrency
81+
let to_remove = current - target;
82+
let sem2 = sem.clone();
83+
tokio::spawn(async move {
84+
for _ in 0..to_remove {
85+
sem2.acquire().await.unwrap().forget();
86+
}
87+
});
88+
println!("concurrency -> {target}");
89+
}
90+
std::cmp::Ordering::Equal => {}
91+
}
92+
current = target;
93+
sleep(Duration::from_millis(500)).await;
94+
}
95+
}
96+
97+
async fn do_work(client: &Client, base_url: &str, worker: usize, tick: u64) {
98+
let metric = METRICS[tick as usize % METRICS.len()];
99+
let value = (tick as f64 * 1.3 + worker as f64 * 7.7).sin().abs() * 100.0;
100+
101+
let _ = client
102+
.post(format!("{base_url}/metrics"))
103+
.json(&json!({"name": metric, "value": value}))
104+
.send()
105+
.await;
106+
107+
if tick.is_multiple_of(10) {
108+
let _ = client
109+
.get(format!("{base_url}/metrics/{metric}"))
110+
.send()
111+
.await;
112+
}
113+
}

0 commit comments

Comments
 (0)