Skip to content

Commit 58baa38

Browse files
authored
feat!: Add typed list and map FieldTypes (#367)
* feat(trace-format): add DynamicList and DynamicMap field types * docs(trace-format): document DynamicList and DynamicMap in SPEC.md * feat!(trace-format): non_exhaustive on FieldType, FieldDef, SchemaEntry with constructors * refactor!(trace-format): opaque DynamicListRef and DynamicMapRef wrappers * refactor!(trace-format): make FieldDef and SchemaEntry fields private, add getters * docs(design): update metrique integration for DynamicList/DynamicMap wire format * feat(js): add DynamicList and DynamicMap decoding to JS parser
1 parent 976b564 commit 58baa38

28 files changed

Lines changed: 838 additions & 495 deletions

dial9-tokio-telemetry/design/metrique-integration.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ This design depends on the entry descriptor system in metrique (see `docs/entry-
1717
- **`dial9::Context`**: a `#[doc(hidden)]` dial9-internal field tag carried by `Dial9Context`'s own fields. Users do not interact with it directly; they flatten `Dial9Context` into their entry, and the sink walks the descriptor on first-use to find fields tagged `Context`. The name is not a stable guarantee; a future typed source-extraction mechanism would replace this tag-based discovery.
1818
- **`Dial9EntryWriter`**: the dial9 adapter that walks `Entry::write` on the flush thread. Uses the cached context- and payload-field index sets to route each callback to either the event header (context) or the payload encoder (Emit), or to skip.
1919
- **First-use per-descriptor**: the moment a `Dial9Stream` first sees an entry with a given `DescriptorId`. Dial9 walks the descriptor once, caches the index sets and any diagnostics, and uses the cache for every subsequent entry of that type.
20-
- **Trace format**: dial9's wire format, defined in `dial9-trace-format/SPEC.md`. Carries schema frames (one per entry type), event frames (one per emission), pool frames (deduplicated strings and stack frames), and schema-annotation frames (per-field metadata). This design relies on two format features that ship independently of the integration: `TAG_SCHEMA_ANNOTATIONS` and the `List` / `Map` field types.
20+
- **Trace format**: dial9's wire format, defined in `dial9-trace-format/SPEC.md`. Carries schema frames (one per entry type), event frames (one per emission), pool frames (deduplicated strings and stack frames), and schema-annotation frames (per-field metadata). This design relies on two format features that ship independently of the integration: `TAG_SCHEMA_ANNOTATIONS` and the `DynamicList` / `DynamicMap` field types.
2121

2222
## User-facing API
2323

@@ -156,8 +156,8 @@ The builder and manual composition paths are unchanged from the original design.
156156
│ encode according to FieldShape: │
157157
│ Known : encode scalar │
158158
│ Optional: encode presence byte + inner │
159-
│ List : encode <count> <repeated element>
160-
│ Flex : encode map<key, value>
159+
│ List : encode <count> <tag + value per element>
160+
│ Flex : encode <count> <key_tag+key+val_tag+val>
161161
│ Opaque : report + skip (sink-side validation) │
162162
│ │
163163
│ if field is tagged Interned and carries string data: │
@@ -229,7 +229,7 @@ Per entry:
229229
3. First-use per `DescriptorId`: walk the descriptor to compute the context-field indices (fields tagged `dial9::Context`) and payload-field indices (tagged `dial9::Emit`). Build the wire schema with annotations for units.
230230
4. Walk `entry.write(..)` with a `Dial9EntryWriter` that uses the cached index sets to route each callback to either the event header (context) or the payload encoder (Emit), or to skip. `Interned` fields have their string data routed through the dial9 string pool. Relies on the metrique contract that `Entry::write` emits `value` callbacks in descriptor order.
231231

232-
`Dial9EntryWriter` overrides `ValueWriter::values()` (the default implementation comma-joins elements into a string) to preserve the typed list wire encoding for `Vec<T>` fields.
232+
`Dial9EntryWriter` overrides `ValueWriter::values()` (the default implementation comma-joins elements into a string) to preserve the self-describing list wire encoding for `Vec<T>` fields.
233233

234234
A `catch_unwind(AssertUnwindSafe(..))` guard around the `Entry::write` walk drops offending events (rate-limited log) without poisoning the flush thread's state.
235235

dial9-tokio-telemetry/src/background_task/sealed.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ fn parse_segment_timestamp(data: &[u8]) -> Result<u64, ParseTimestampError> {
8686
let name = dec
8787
.registry()
8888
.get(type_id)
89-
.map(|s| s.name.as_str())
89+
.map(|s| s.name())
9090
.ok_or(ParseTimestampError::UnknownTypeId(type_id.0))?;
9191
if name == "ClockSyncEvent" {
9292
return match values.first() {

dial9-tokio-telemetry/src/telemetry/format.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ pub(crate) fn decode_ref<'a>(
322322
schema: &SchemaEntry,
323323
) -> Option<TelemetryEventRef<'a>> {
324324
use dial9_trace_format::TraceEvent as _;
325-
let field_defs = &schema.fields;
325+
let field_defs = schema.fields();
326326
Some(match name {
327327
"PollStartEvent" => {
328328
TelemetryEventRef::PollStart(PollStartEvent::decode(timestamp_ns, fields, field_defs)?)

dial9-tokio-telemetry/src/telemetry/recorder/shared_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ mod shuttle_tests {
555555
dec.for_each_event(|ev| {
556556
if ev.name == "ValidationEvent"
557557
&& let Some(decoded) =
558-
ValidationEvent::decode(ev.timestamp_ns, ev.fields, &ev.schema.fields)
558+
ValidationEvent::decode(ev.timestamp_ns, ev.fields, &ev.schema.fields())
559559
{
560560
out.push(ValidationEvent {
561561
timestamp_ns: decoded.timestamp_ns,

dial9-tokio-telemetry/src/tracing_layer.rs

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -94,47 +94,23 @@ fn build_callsite_schemas(meta: &'static tracing::Metadata<'static>) -> Callsite
9494

9595
// Base fields present on all span events
9696
let mut enter_fields = vec![
97-
FieldDef {
98-
name: "worker_id".into(),
99-
field_type: FieldType::Varint,
100-
},
101-
FieldDef {
102-
name: "span_id".into(),
103-
field_type: FieldType::Varint,
104-
},
105-
FieldDef {
106-
name: "parent_span_id".into(),
107-
field_type: FieldType::OptionalVarint,
108-
},
109-
FieldDef {
110-
name: "span_name".into(),
111-
field_type: FieldType::PooledString,
112-
},
97+
FieldDef::new("worker_id", FieldType::Varint),
98+
FieldDef::new("span_id", FieldType::Varint),
99+
FieldDef::new("parent_span_id", FieldType::OptionalVarint),
100+
FieldDef::new("span_name", FieldType::PooledString),
113101
];
114102
let mut exit_fields = vec![
115-
FieldDef {
116-
name: "worker_id".into(),
117-
field_type: FieldType::Varint,
118-
},
119-
FieldDef {
120-
name: "span_id".into(),
121-
field_type: FieldType::Varint,
122-
},
123-
FieldDef {
124-
name: "span_name".into(),
125-
field_type: FieldType::PooledString,
126-
},
103+
FieldDef::new("worker_id", FieldType::Varint),
104+
FieldDef::new("span_id", FieldType::Varint),
105+
FieldDef::new("span_name", FieldType::PooledString),
127106
];
128107

129108
// Add user-defined fields as optional interned strings
130109
let mut field_names = Vec::new();
131110
for field in meta.fields() {
132111
let name = field.name();
133112
field_names.push(name);
134-
let def = FieldDef {
135-
name: name.to_string(),
136-
field_type: FieldType::OptionalPooledString,
137-
};
113+
let def = FieldDef::new(name.to_string(), FieldType::OptionalPooledString);
138114
enter_fields.push(def.clone());
139115
exit_fields.push(def);
140116
}

dial9-tokio-telemetry/tests/tracing_layer.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,56 +55,56 @@ fn decode_span_events(path: &std::path::Path) -> SpanEvents {
5555
if ev.name.starts_with("SpanEnter:") {
5656
result.enter_count += 1;
5757
result.enter_schema_names.insert(ev.name.to_owned());
58-
for (field_def, field_val) in ev.schema.fields.iter().zip(ev.fields.iter()) {
59-
if field_def.name == "span_name"
58+
for (field_def, field_val) in ev.schema.fields().iter().zip(ev.fields.iter()) {
59+
if field_def.name() == "span_name"
6060
&& let FieldValueRef::PooledString(id) = field_val
6161
&& let Some(name) = ev.string_pool.get(*id)
6262
{
6363
result.enter_names.push(name.to_owned());
6464
}
65-
if field_def.name == "span_id"
65+
if field_def.name() == "span_id"
6666
&& let FieldValueRef::Varint(v) = field_val
6767
{
6868
result.entered_span_ids.insert(*v);
6969
}
70-
if field_def.name == "parent_span_id"
70+
if field_def.name() == "parent_span_id"
7171
&& let FieldValueRef::Varint(v) = field_val
7272
&& *v > 0
7373
{
7474
result.saw_parent_span_id = true;
7575
}
76-
if field_def.name == "worker_id"
76+
if field_def.name() == "worker_id"
7777
&& let FieldValueRef::Varint(v) = field_val
7878
{
7979
result.worker_ids.insert(*v);
8080
}
8181
// User-defined fields are optional pooled strings
8282
if !["worker_id", "span_id", "parent_span_id", "span_name"]
83-
.contains(&field_def.name.as_str())
83+
.contains(&field_def.name())
8484
&& let FieldValueRef::PooledString(id) = field_val
8585
&& let Some(v) = ev.string_pool.get(*id)
8686
{
8787
result
8888
.enter_fields
89-
.push((field_def.name.clone(), v.to_owned()));
89+
.push((field_def.name().to_owned(), v.to_owned()));
9090
}
9191
}
9292
} else if ev.name.starts_with("SpanExit:") {
9393
result.exit_count += 1;
94-
for (field_def, field_val) in ev.schema.fields.iter().zip(ev.fields.iter()) {
95-
if !["worker_id", "span_id", "span_name"].contains(&field_def.name.as_str())
94+
for (field_def, field_val) in ev.schema.fields().iter().zip(ev.fields.iter()) {
95+
if !["worker_id", "span_id", "span_name"].contains(&field_def.name())
9696
&& let FieldValueRef::PooledString(id) = field_val
9797
&& let Some(v) = ev.string_pool.get(*id)
9898
{
9999
result
100100
.exit_fields
101-
.push((field_def.name.clone(), v.to_owned()));
101+
.push((field_def.name().to_owned(), v.to_owned()));
102102
}
103103
}
104104
} else if ev.name == "SpanCloseEvent" {
105105
result.close_count += 1;
106-
for (field_def, field_val) in ev.schema.fields.iter().zip(ev.fields.iter()) {
107-
if field_def.name == "span_id"
106+
for (field_def, field_val) in ev.schema.fields().iter().zip(ev.fields.iter()) {
107+
if field_def.name() == "span_id"
108108
&& let FieldValueRef::Varint(v) = field_val
109109
{
110110
result.closed_span_ids.insert(*v);

dial9-trace-format-derive/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@ fn derive_trace_event_impl(input: DeriveInput) -> proc_macro2::TokenStream {
5959

6060
let field_name_str = field_name.to_string();
6161
field_def_tokens.push(quote! {
62-
::dial9_trace_format::schema::FieldDef {
63-
name: #field_name_str.to_string(),
64-
field_type: <#ty as ::dial9_trace_format::TraceField>::field_type(),
65-
}
62+
::dial9_trace_format::schema::FieldDef::new(
63+
#field_name_str,
64+
<#ty as ::dial9_trace_format::TraceField>::field_type(),
65+
)
6666
});
6767
encode_tokens.push(quote! {
6868
<#ty as ::dial9_trace_format::TraceField>::encode(&self.#field_name, enc)?;
@@ -80,7 +80,7 @@ fn derive_trace_event_impl(input: DeriveInput) -> proc_macro2::TokenStream {
8080
decode_tokens.push(quote! {
8181
#field_name: {
8282
let val = field_defs.iter().zip(fields.iter())
83-
.find(|(f, _)| f.name == #field_name_str)
83+
.find(|(f, _)| f.name() == #field_name_str)
8484
.map(|(_, v)| v);
8585
match val {
8686
Some(v) => <#ty as ::dial9_trace_format::TraceField>::decode_ref(v)?,

0 commit comments

Comments
 (0)