Skip to content

Commit 8d61fb0

Browse files
authored
feat: Store S3 metadata into segement metadata (#311)
* feat: Store S3 metadata into segement metadata - Make segment metadata writing lazy so late-bound items will still be persisted into segment metadata - update sgement metadata now merges entries - copy s3 config options into segment metadata * PR feedback * Eagerly populate worker_ids when attaching a runtime Pre-populate RuntimeContext::worker_ids in attach_runtime() right after metrics_and_base is set, instead of lazily on each worker thread's first poll/park event. We already know num_workers and base from RuntimeMetrics at that point, so segment metadata is complete from the first flush cycle. * Eagerly populate worker_ids when attaching a runtime Pre-populate RuntimeContext::worker_ids in attach_runtime() right after metrics_and_base is set, instead of lazily on each worker thread's first poll/park event. We already know num_workers and base from RuntimeMetrics at that point, so segment metadata is complete from the first flush cycle. Strengthen the propagates_second_runtime_metadata test to assert exact worker ID values (0,1 and 2,3) now that they're available immediately. * Eagerly populate worker_ids when attaching a runtime Pre-populate RuntimeContext::worker_ids in attach_runtime() right after metrics_and_base is set, instead of lazily on each worker thread's first poll/park event. We already know num_workers and base from RuntimeMetrics at that point, so segment metadata is complete from the first flush cycle. Strengthen tests to assert exact worker ID values now that they're available immediately.
1 parent 7d106ae commit 8d61fb0

6 files changed

Lines changed: 303 additions & 46 deletions

File tree

PROGRESS.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Eager worker_ids population
2+
3+
## What changed
4+
Pre-populate `RuntimeContext::worker_ids` in `attach_runtime()` immediately after `metrics_and_base` is set, instead of lazily on each worker thread's first poll/park event.
5+
6+
## Why
7+
When a runtime is attached, we already know `num_workers` and `base` from `RuntimeMetrics`. Eagerly populating the map means `metadata_entry()` returns complete runtime→worker mappings from the very first flush cycle, rather than converging over time as workers come online.
8+
9+
## Change
10+
One block added in `attach_runtime()` (recorder/mod.rs):
11+
```rust
12+
{
13+
let mut ids = ctx.worker_ids.write().unwrap();
14+
for i in 0..num_workers {
15+
ids.insert(i as usize, base + i);
16+
}
17+
}
18+
```
19+
20+
`register_worker_if_needed` remains idempotent — guarded by a thread-local flag, and the insert is a no-op overwrite with the same value.
21+
22+
## Verification
23+
- `cargo fmt --check`
24+
- `cargo clippy --all-targets --all-features`
25+
- `cargo nextest run` — 460/461 pass (1 pre-existing CPU-load-sensitive flaky test in dial9-perf-self-profile)
26+
- `cargo nextest run --stress-duration 20s` — 2 iterations, 460/460 pass

dial9-tokio-telemetry/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ tracing-layer = ["dep:tracing-subscriber"]
4949
worker-s3 = ["dep:aws-sdk-s3-transfer-manager", "dep:aws-sdk-s3", "dep:aws-config", "dep:time"]
5050

5151
[dev-dependencies]
52-
dial9-tokio-telemetry = { path = ".", features = ["analysis", "tracing-layer"] }
52+
dial9-tokio-telemetry = { path = ".", features = ["analysis", "tracing-layer", "worker-s3"] }
5353
assert2 = { workspace = true }
5454
criterion = "0.5"
5555
clap = { version = "4", features = ["derive"] }

dial9-tokio-telemetry/src/background_task/s3.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,18 @@ impl S3Config {
125125
&self.bucket
126126
}
127127

128+
pub(crate) fn as_metadata(&self) -> impl Iterator<Item = (&str, &str)> {
129+
[
130+
("bucket", self.bucket.as_str()),
131+
("service_name", self.service_name.as_str()),
132+
("instance_path", self.instance_path.as_str()),
133+
("boot_id", self.boot_id.as_str()),
134+
]
135+
.into_iter()
136+
.chain(self.prefix.as_ref().map(|p| ("prefix", p.as_str())))
137+
.chain(self.region.as_ref().map(|r| ("region", r.as_str())))
138+
}
139+
128140
/// Optional region override for the S3 client.
129141
pub(crate) fn region(&self) -> Option<&str> {
130142
self.region.as_deref()

dial9-tokio-telemetry/src/telemetry/recorder/mod.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,16 @@ fn attach_runtime(
317317
});
318318
});
319319

320+
// Eagerly populate worker_ids so segment metadata is complete from the
321+
// first flush cycle, rather than waiting for each worker thread to lazily
322+
// register on its first poll/park event.
323+
{
324+
let mut ids = ctx.worker_ids.write().unwrap();
325+
for i in 0..num_workers {
326+
ids.insert(i as usize, base + i);
327+
}
328+
}
329+
320330
shared.contexts.lock().unwrap().push(ctx);
321331

322332
Ok(runtime)
@@ -1007,6 +1017,15 @@ impl TelemetryCore {
10071017
let shared = Arc::new(SharedState::new(start_mono_ns));
10081018
#[allow(unused_mut)]
10091019
let mut event_writer = EventWriter::new(Box::new(writer));
1020+
#[cfg(feature = "worker-s3")]
1021+
if let Some(s3_config) = s3_config.as_ref() {
1022+
event_writer.update_segment_metadata(
1023+
s3_config
1024+
.as_metadata()
1025+
.map(|(k, v)| (k.to_string(), v.to_string()))
1026+
.collect(),
1027+
)
1028+
}
10101029

10111030
#[cfg(feature = "cpu-profiling")]
10121031
{
@@ -1757,15 +1776,18 @@ mod tests {
17571776
"expected at least one SegmentMetadata event in trace files"
17581777
);
17591778

1760-
// At least one segment's metadata should contain both runtime mappings.
1779+
// At least one segment's metadata should contain both runtime mappings
1780+
// with the exact worker IDs (eagerly populated at attach time).
17611781
let has_both = all_metadata.iter().any(|entries| {
1762-
let has_main = entries.iter().any(|(k, _)| k == "runtime.main");
1763-
let has_io = entries.iter().any(|(k, _)| k == "runtime.io");
1782+
let has_main = entries
1783+
.iter()
1784+
.any(|(k, v)| k == "runtime.main" && v == "0,1");
1785+
let has_io = entries.iter().any(|(k, v)| k == "runtime.io" && v == "2,3");
17641786
has_main && has_io
17651787
});
17661788
assert!(
17671789
has_both,
1768-
"expected segment metadata to contain both runtime.main and runtime.io, \
1790+
"expected segment metadata to contain runtime.main=0,1 and runtime.io=2,3, \
17691791
got: {all_metadata:?}"
17701792
);
17711793
}

0 commit comments

Comments
 (0)