Skip to content

Commit 58642ca

Browse files
rcohjlizen
andauthored
Add reset-frame support + try_for_each_event to decoder (#134)
* Add reset-frame support + try_for_each_event to decoder Decoder changes for the thread-local encoding pipeline: - Reset frames: when a valid TRC\0 header appears mid-stream, the decoder resets schemas, string pool, and timestamp base, then continues. This enables raw-copy concatenation of independently encoded batches. Handled in next_frame, next_frame_ref, and try_for_each_event. - try_for_each_event: like for_each_event but the callback may return Err to stop early. Includes TryForEachError enum. - for_each_event simplified to delegate to try_for_each_event. - schema_cache switched from HashMap<WireTypeId, _> to Vec<Option<_>> indexed by WireTypeId.0 for faster lookup. - JS decoder (decode.js): mid-stream header detection and state reset. All 115 existing tests pass. * Update dial9-trace-format/src/decoder.rs Co-authored-by: Jess Izen <44884346+jlizen@users.noreply.github.com> --------- Co-authored-by: Jess Izen <44884346+jlizen@users.noreply.github.com>
1 parent b94e87d commit 58642ca

2 files changed

Lines changed: 122 additions & 35 deletions

File tree

dial9-trace-format/js/decode.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,20 @@ class TraceDecoder {
105105
if (this._pos >= this._view.byteLength) return null;
106106
try {
107107
const tag = this._view.getUint8(this._pos);
108+
// Mid-stream header = reset frame (concatenated thread-local batch)
109+
if (tag === MAGIC[0] && this._pos + 5 <= this._view.byteLength) {
110+
let isHeader = true;
111+
for (let i = 1; i < 4; i++) {
112+
if (this._view.getUint8(this._pos + i) !== MAGIC[i]) { isHeader = false; break; }
113+
}
114+
if (isHeader) {
115+
this.schemas = new Map();
116+
this.stringPool = new Map();
117+
this._timestampBaseNs = 0n;
118+
this._pos += 5; // skip header
119+
return this.nextFrame();
120+
}
121+
}
108122
this._pos++;
109123
switch (tag) {
110124
case TAG_SCHEMA: return this._decodeSchema();

dial9-trace-format/src/decoder.rs

Lines changed: 108 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,24 @@ impl fmt::Display for DecodeError {
2525

2626
impl std::error::Error for DecodeError {}
2727

28+
/// Error returned by [`Decoder::try_for_each_event`].
29+
#[derive(Debug)]
30+
pub enum TryForEachError<E> {
31+
Decode(DecodeError),
32+
User(E),
33+
}
34+
35+
impl<E: fmt::Display> fmt::Display for TryForEachError<E> {
36+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37+
match self {
38+
TryForEachError::Decode(e) => write!(f, "{e}"),
39+
TryForEachError::User(e) => write!(f, "{e}"),
40+
}
41+
}
42+
}
43+
44+
impl<E: fmt::Display + fmt::Debug> std::error::Error for TryForEachError<E> {}
45+
2846
/// A decoded event passed to [`Decoder::for_each_event`].
2947
///
3048
/// `'a` is the lifetime of the input data buffer (strings, stack frames borrow from it).
@@ -48,7 +66,7 @@ pub struct StringPool(pub(crate) HashMap<InternedString, String>);
4866

4967
impl StringPool {
5068
pub(crate) fn new() -> Self {
51-
Self(HashMap::new())
69+
Self(HashMap::default())
5270
}
5371

5472
pub(crate) fn insert(&mut self, id: InternedString, value: String) {
@@ -108,7 +126,7 @@ pub struct Decoder<'a> {
108126
data: &'a [u8],
109127
pos: usize,
110128
registry: SchemaRegistry,
111-
schema_cache: HashMap<WireTypeId, SchemaCache>,
129+
schema_cache: Vec<Option<SchemaCache>>,
112130
string_pool: StringPool,
113131
version: u8,
114132
timestamp_base_ns: u64,
@@ -121,7 +139,7 @@ impl<'a> Decoder<'a> {
121139
data,
122140
pos: HEADER_SIZE,
123141
registry: SchemaRegistry::new(),
124-
schema_cache: HashMap::new(),
142+
schema_cache: Vec::new(),
125143
string_pool: StringPool::new(),
126144
version,
127145
timestamp_base_ns: 0,
@@ -140,6 +158,30 @@ impl<'a> Decoder<'a> {
140158
&self.string_pool
141159
}
142160

161+
/// Reset decoder state (schemas, string pool, timestamp base) as if
162+
/// starting a fresh stream. Used when a mid-stream header is encountered
163+
/// (the "reset frame" pattern for concatenated thread-local batches).
164+
fn reset_state(&mut self) {
165+
self.registry = SchemaRegistry::new();
166+
self.schema_cache.clear();
167+
self.string_pool = StringPool::new();
168+
self.timestamp_base_ns = 0;
169+
}
170+
171+
/// If the current position starts with a valid header, reset state and
172+
/// skip past it, returning true.
173+
fn try_consume_reset_header(&mut self) -> bool {
174+
if self.pos + HEADER_SIZE <= self.data.len()
175+
&& codec::decode_header(&self.data[self.pos..]).is_some()
176+
{
177+
self.reset_state();
178+
self.pos += HEADER_SIZE;
179+
true
180+
} else {
181+
false
182+
}
183+
}
184+
143185
/// Consume this decoder and create an [`Encoder`] that appends to the
144186
/// decoded trace. The encoder inherits the string pool, schema registry,
145187
/// and timestamp base so new frames are compatible with the existing data.
@@ -155,22 +197,26 @@ impl<'a> Decoder<'a> {
155197
)
156198
}
157199

158-
fn schema_info(&self, type_id: WireTypeId) -> Option<SchemaInfo<'_>> {
159-
self.schema_cache.get(&type_id).map(|c| SchemaInfo {
160-
field_types: &c.field_types,
161-
has_timestamp: c.has_timestamp,
162-
})
200+
pub(crate) fn schema_info(&self, type_id: WireTypeId) -> Option<SchemaInfo<'_>> {
201+
self.schema_cache
202+
.get(type_id.0 as usize)
203+
.and_then(|s| s.as_ref())
204+
.map(|c| SchemaInfo {
205+
field_types: &c.field_types,
206+
has_timestamp: c.has_timestamp,
207+
})
163208
}
164209

165210
fn register_schema(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
166-
self.schema_cache.insert(
167-
type_id,
168-
SchemaCache {
169-
name: entry.name.clone(),
170-
field_types: entry.fields.iter().map(|f| f.field_type).collect(),
171-
has_timestamp: entry.has_timestamp,
172-
},
173-
);
211+
let idx = type_id.0 as usize;
212+
if idx >= self.schema_cache.len() {
213+
self.schema_cache.resize_with(idx + 1, || None);
214+
}
215+
self.schema_cache[idx] = Some(SchemaCache {
216+
name: entry.name.clone(),
217+
field_types: entry.fields.iter().map(|f| f.field_type).collect(),
218+
has_timestamp: entry.has_timestamp,
219+
});
174220
self.registry.register(type_id, entry)
175221
}
176222

@@ -181,6 +227,9 @@ impl<'a> Decoder<'a> {
181227
if self.pos >= self.data.len() {
182228
return Ok(None);
183229
}
230+
if self.try_consume_reset_header() {
231+
return self.next_frame();
232+
}
184233
let remaining = &self.data[self.pos..];
185234
let base = self.timestamp_base_ns;
186235
let (frame, consumed) =
@@ -243,6 +292,9 @@ impl<'a> Decoder<'a> {
243292
if self.pos >= self.data.len() {
244293
return Ok(None);
245294
}
295+
if self.try_consume_reset_header() {
296+
return self.next_frame_ref();
297+
}
246298
let remaining = &self.data[self.pos..];
247299
let base = self.timestamp_base_ns;
248300
let (frame, consumed) =
@@ -312,6 +364,22 @@ impl<'a> Decoder<'a> {
312364
&mut self,
313365
mut f: impl for<'f> FnMut(RawEvent<'a, 'f>),
314366
) -> Result<(), DecodeError> {
367+
self.try_for_each_event(|ev| {
368+
f(ev);
369+
Ok::<(), std::convert::Infallible>(())
370+
})
371+
.map_err(|e| match e {
372+
TryForEachError::Decode(d) => d,
373+
TryForEachError::User(inf) => match inf {},
374+
})
375+
}
376+
377+
/// Like [`for_each_event`](Self::for_each_event), but the callback may
378+
/// return an error to stop iteration early.
379+
pub fn try_for_each_event<E>(
380+
&mut self,
381+
mut f: impl for<'f> FnMut(RawEvent<'a, 'f>) -> Result<(), E>,
382+
) -> Result<(), TryForEachError<E>> {
315383
let mut values_buf: Vec<FieldValueRef<'a>> = Vec::new();
316384
while self.pos < self.data.len() {
317385
let remaining = &self.data[self.pos..];
@@ -328,19 +396,23 @@ impl<'a> Decoder<'a> {
328396
WireTypeId(u16::from_le_bytes(b.try_into().unwrap()))
329397
}
330398
None => {
331-
return Err(DecodeError {
399+
return Err(TryForEachError::Decode(DecodeError {
332400
pos: self.pos,
333401
message: "truncated event frame".into(),
334-
});
402+
}));
335403
}
336404
};
337-
let cache = match self.schema_cache.get(&type_id) {
405+
let cache = match self
406+
.schema_cache
407+
.get(type_id.0 as usize)
408+
.and_then(|s| s.as_ref())
409+
{
338410
Some(c) => c,
339411
None => {
340-
return Err(DecodeError {
412+
return Err(TryForEachError::Decode(DecodeError {
341413
pos: self.pos,
342414
message: format!("unknown type_id {type_id:?}"),
343-
});
415+
}));
344416
}
345417
};
346418

@@ -351,10 +423,10 @@ impl<'a> Decoder<'a> {
351423
Some(self.timestamp_base_ns + delta as u64)
352424
}
353425
None => {
354-
return Err(DecodeError {
426+
return Err(TryForEachError::Decode(DecodeError {
355427
pos: self.pos + pos,
356428
message: "truncated timestamp delta".into(),
357-
});
429+
}));
358430
}
359431
}
360432
} else {
@@ -369,10 +441,10 @@ impl<'a> Decoder<'a> {
369441
pos += consumed;
370442
}
371443
None => {
372-
return Err(DecodeError {
444+
return Err(TryForEachError::Decode(DecodeError {
373445
pos: self.pos + pos,
374446
message: "truncated field value".into(),
375-
});
447+
}));
376448
}
377449
}
378450
}
@@ -386,35 +458,36 @@ impl<'a> Decoder<'a> {
386458
timestamp_ns,
387459
fields: &values_buf,
388460
string_pool: &self.string_pool,
389-
});
461+
})
462+
.map_err(TryForEachError::User)?;
390463
}
391464
codec::TAG_TIMESTAMP_RESET => {
392-
// Handle timestamp resets inline to avoid next_frame_ref's
393-
// recursive consumption of the following frame.
394465
let ts = match self.data.get(self.pos + 1..self.pos + 9) {
395466
Some(b) => u64::from_le_bytes(b.try_into().unwrap()),
396467
None => {
397-
return Err(DecodeError {
468+
return Err(TryForEachError::Decode(DecodeError {
398469
pos: self.pos,
399470
message: "truncated timestamp reset".into(),
400-
});
471+
}));
401472
}
402473
};
403474
self.timestamp_base_ns = ts;
404475
self.pos += 9;
405476
}
406477
_ => {
407-
// Use next_frame_ref for non-event frames (schema, pool, symbol table)
408-
// `next_frame_ref` will update the decoder state as we read the frames (e.g. the pooled strings)
478+
// Mid-stream header = reset frame (tag 0x54 = 'T' from TRC\0)
479+
if tag == codec::MAGIC[0] && self.try_consume_reset_header() {
480+
continue;
481+
}
409482
match self.next_frame_ref() {
410483
Ok(Some(_)) => {}
411484
Ok(None) => {
412-
return Err(DecodeError {
485+
return Err(TryForEachError::Decode(DecodeError {
413486
pos: self.pos,
414487
message: format!("failed to decode frame with tag 0x{tag:02x}"),
415-
});
488+
}));
416489
}
417-
Err(e) => return Err(e),
490+
Err(e) => return Err(TryForEachError::Decode(e)),
418491
}
419492
}
420493
}

0 commit comments

Comments
 (0)