Skip to content

Commit e58506c

Browse files
committed
Try TraceProjection (incomplete)
Signed-off-by: Bob Weinand <[email protected]>
1 parent 2f92752 commit e58506c

File tree

10 files changed

+1469
-29
lines changed

10 files changed

+1469
-29
lines changed

datadog-trace-utils/src/msgpack_decoder/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
pub mod decode;
55
pub mod v04;
66
pub mod v05;
7+
pub mod v1;
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
pub(crate) mod span;
5+
6+
use self::span::decode_span;
7+
use crate::msgpack_decoder::decode::buffer::Buffer;
8+
use crate::msgpack_decoder::decode::error::DecodeError;
9+
use crate::span::v04::{Span, SpanBytes, SpanSlice};
10+
use crate::span::{TraceData, Traces, TracesBytes, TracesSlice};
11+
12+
/// Decodes a Bytes buffer into a `Vec<Vec<SpanBytes>>` object, also represented as a vector of
13+
/// `TracerPayloadV04` objects.
14+
///
15+
/// # Arguments
16+
///
17+
/// * `data` - A tinybytes Bytes buffer containing the encoded data. Bytes are expected to be
18+
/// encoded msgpack data containing a list of a list of v04 spans.
19+
///
20+
/// # Returns
21+
///
22+
/// * `Ok(Vec<TracerPayloadV04>, usize)` - A vector of decoded `Vec<SpanSlice>` objects if
23+
/// successful. and the number of bytes in the slice used by the decoder.
24+
/// * `Err(DecodeError)` - An error if the decoding process fails.
25+
///
26+
/// # Errors
27+
///
28+
/// This function will return an error if:
29+
/// - The array length for trace count or span count cannot be read.
30+
/// - Any span cannot be decoded.
31+
///
32+
/// # Examples
33+
///
34+
/// ```
35+
/// use datadog_trace_protobuf::pb::Span;
36+
/// use datadog_trace_utils::msgpack_decoder::v04::from_bytes;
37+
/// use rmp_serde::to_vec_named;
38+
/// use tinybytes;
39+
///
40+
/// let span = Span {
41+
/// name: "test-span".to_owned(),
42+
/// ..Default::default()
43+
/// };
44+
/// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap();
45+
/// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data);
46+
/// let (decoded_traces, _payload_size) =
47+
/// from_bytes(encoded_data_as_tinybytes).expect("Decoding failed");
48+
///
49+
/// assert_eq!(1, decoded_traces.len());
50+
/// assert_eq!(1, decoded_traces[0].len());
51+
/// let decoded_span = &decoded_traces[0][0];
52+
/// assert_eq!("test-span", decoded_span.name.as_str());
53+
/// ```
54+
pub fn from_bytes(data: tinybytes::Bytes) -> Result<(TracesBytes, usize), DecodeError> {
55+
from_buffer(&mut Buffer::new(data))
56+
}
57+
58+
/// Decodes a slice of bytes into a `Vec<Vec<SpanSlice>>` object.
59+
/// The resulting spans have the same lifetime as the initial buffer.
60+
///
61+
/// # Arguments
62+
///
63+
/// * `data` - A slice of bytes containing the encoded data. Bytes are expected to be encoded
64+
/// msgpack data containing a list of a list of v04 spans.
65+
///
66+
/// # Returns
67+
///
68+
/// * `Ok(Vec<TracerPayloadV04>, usize)` - A vector of decoded `Vec<SpanSlice>` objects if
69+
/// successful. and the number of bytes in the slice used by the decoder.
70+
/// * `Err(DecodeError)` - An error if the decoding process fails.
71+
///
72+
/// # Errors
73+
///
74+
/// This function will return an error if:
75+
/// - The array length for trace count or span count cannot be read.
76+
/// - Any span cannot be decoded.
77+
///
78+
/// # Examples
79+
///
80+
/// ```
81+
/// use datadog_trace_protobuf::pb::Span;
82+
/// use datadog_trace_utils::msgpack_decoder::v04::from_slice;
83+
/// use rmp_serde::to_vec_named;
84+
/// use tinybytes;
85+
///
86+
/// let span = Span {
87+
/// name: "test-span".to_owned(),
88+
/// ..Default::default()
89+
/// };
90+
/// let encoded_data = to_vec_named(&vec![vec![span]]).unwrap();
91+
/// let encoded_data_as_tinybytes = tinybytes::Bytes::from(encoded_data);
92+
/// let (decoded_traces, _payload_size) =
93+
/// from_slice(&encoded_data_as_tinybytes).expect("Decoding failed");
94+
///
95+
/// assert_eq!(1, decoded_traces.len());
96+
/// assert_eq!(1, decoded_traces[0].len());
97+
/// let decoded_span = &decoded_traces[0][0];
98+
/// assert_eq!("test-span", decoded_span.name);
99+
/// ```
100+
pub fn from_slice(data: &[u8]) -> Result<(TracesSlice<'_>, usize), DecodeError> {
101+
from_buffer(&mut Buffer::new(data))
102+
}
103+
104+
#[allow(clippy::type_complexity)]
105+
pub fn from_buffer<T: TraceData>(
106+
data: &mut Buffer<T>,
107+
) -> Result<(Traces<T>, usize), DecodeError> {
108+
let trace_count = rmp::decode::read_array_len(data.as_mut_slice()).map_err(|_| {
109+
DecodeError::InvalidFormat("Unable to read array len for trace count".to_owned())
110+
})?;
111+
112+
let traces = Traces::default();
113+
114+
// Intentionally skip the size of the array (as it will be recomputed after coalescing).
115+
let start_len = data.len();
116+
117+
118+
119+
Ok((traces, start_len - data.len()))
120+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::msgpack_decoder::decode::buffer::Buffer;
5+
use crate::msgpack_decoder::decode::error::DecodeError;
6+
use crate::msgpack_decoder::decode::number::read_nullable_number;
7+
use crate::msgpack_decoder::decode::span_event::read_span_events;
8+
use crate::msgpack_decoder::decode::span_link::read_span_links;
9+
use crate::msgpack_decoder::decode::string::{
10+
read_nullable_str_map_to_strings, read_nullable_string,
11+
};
12+
use crate::msgpack_decoder::decode::{meta_struct::read_meta_struct, metrics::read_metrics};
13+
use crate::span::{v04::Span, SpanKey, TraceData};
14+
use std::borrow::Borrow;
15+
16+
/// Decodes a slice of bytes into a `Span` object.
17+
///
18+
/// # Arguments
19+
///
20+
/// * `buf` - A mutable reference to a slice of bytes containing the encoded data.
21+
///
22+
/// # Returns
23+
///
24+
/// * `Ok(Span)` - A decoded `Span` object if successful.
25+
/// * `Err(DecodeError)` - An error if the decoding process fails.
26+
///
27+
/// # Errors
28+
///
29+
/// This function will return an error if:
30+
/// - The map length cannot be read.
31+
/// - Any key or value cannot be decoded.
32+
pub fn decode_span<T: TraceData>(buffer: &mut Buffer<T>) -> Result<Span<T>, DecodeError> {
33+
let mut span = Span::<T>::default();
34+
35+
let span_size = rmp::decode::read_map_len(buffer.as_mut_slice()).map_err(|_| {
36+
DecodeError::InvalidFormat("Unable to get map len for span size".to_owned())
37+
})?;
38+
39+
for _ in 0..span_size {
40+
fill_span(&mut span, buffer)?;
41+
}
42+
43+
Ok(span)
44+
}
45+
46+
// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the
47+
// BytesStrings
48+
fn fill_span<T: TraceData>(span: &mut Span<T>, buf: &mut Buffer<T>) -> Result<(), DecodeError> {
49+
let key = buf
50+
.read_string()?
51+
.borrow()
52+
.parse::<SpanKey>()
53+
.map_err(|e| DecodeError::InvalidFormat(e.message))?;
54+
55+
match key {
56+
SpanKey::Service => span.service = read_nullable_string(buf)?,
57+
SpanKey::Name => span.name = read_nullable_string(buf)?,
58+
SpanKey::Resource => span.resource = read_nullable_string(buf)?,
59+
SpanKey::TraceId => span.trace_id = read_nullable_number(buf)?,
60+
SpanKey::SpanId => span.span_id = read_nullable_number(buf)?,
61+
SpanKey::ParentId => span.parent_id = read_nullable_number(buf)?,
62+
SpanKey::Start => span.start = read_nullable_number(buf)?,
63+
SpanKey::Duration => span.duration = read_nullable_number(buf)?,
64+
SpanKey::Error => span.error = read_nullable_number(buf)?,
65+
SpanKey::Type => span.r#type = read_nullable_string(buf)?,
66+
SpanKey::Meta => span.meta = read_nullable_str_map_to_strings(buf)?,
67+
SpanKey::Metrics => span.metrics = read_metrics(buf)?,
68+
SpanKey::MetaStruct => span.meta_struct = read_meta_struct(buf)?,
69+
SpanKey::SpanLinks => span.span_links = read_span_links(buf)?,
70+
SpanKey::SpanEvents => span.span_events = read_span_events(buf)?,
71+
}
72+
Ok(())
73+
}
74+
75+
#[cfg(test)]
76+
mod tests {
77+
use super::SpanKey;
78+
use crate::span::SpanKeyParseError;
79+
use std::str::FromStr;
80+
81+
#[test]
82+
fn test_span_key_from_str() {
83+
assert_eq!(SpanKey::from_str("service").unwrap(), SpanKey::Service);
84+
assert_eq!(SpanKey::from_str("name").unwrap(), SpanKey::Name);
85+
assert_eq!(SpanKey::from_str("resource").unwrap(), SpanKey::Resource);
86+
assert_eq!(SpanKey::from_str("trace_id").unwrap(), SpanKey::TraceId);
87+
assert_eq!(SpanKey::from_str("span_id").unwrap(), SpanKey::SpanId);
88+
assert_eq!(SpanKey::from_str("parent_id").unwrap(), SpanKey::ParentId);
89+
assert_eq!(SpanKey::from_str("start").unwrap(), SpanKey::Start);
90+
assert_eq!(SpanKey::from_str("duration").unwrap(), SpanKey::Duration);
91+
assert_eq!(SpanKey::from_str("error").unwrap(), SpanKey::Error);
92+
assert_eq!(SpanKey::from_str("meta").unwrap(), SpanKey::Meta);
93+
assert_eq!(SpanKey::from_str("metrics").unwrap(), SpanKey::Metrics);
94+
assert_eq!(SpanKey::from_str("type").unwrap(), SpanKey::Type);
95+
assert_eq!(
96+
SpanKey::from_str("meta_struct").unwrap(),
97+
SpanKey::MetaStruct
98+
);
99+
assert_eq!(SpanKey::from_str("span_links").unwrap(), SpanKey::SpanLinks);
100+
assert_eq!(
101+
SpanKey::from_str("span_events").unwrap(),
102+
SpanKey::SpanEvents
103+
);
104+
105+
let invalid_result = SpanKey::from_str("invalid_key");
106+
let msg = format!("SpanKeyParseError: Invalid span key: {}", "invalid_key");
107+
assert!(matches!(invalid_result, Err(SpanKeyParseError { .. })));
108+
assert_eq!(invalid_result.unwrap_err().to_string(), msg);
109+
}
110+
}

datadog-trace-utils/src/span/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4+
pub mod table;
45
pub mod trace_utils;
56
pub mod v04;
67
pub mod v05;
8+
pub mod v1;
9+
10+
mod trace;
11+
pub use trace::*;
712

813
use crate::msgpack_decoder::decode::buffer::read_string_ref_nomut;
914
use crate::msgpack_decoder::decode::error::DecodeError;
@@ -14,6 +19,7 @@ use std::fmt;
1419
use std::fmt::Debug;
1520
use std::hash::Hash;
1621
use std::marker::PhantomData;
22+
use std::ops::Deref;
1723
use tinybytes::{Bytes, BytesString};
1824

1925
/// Trait representing the requirements for a type to be used as a Span "string" type.
@@ -126,6 +132,7 @@ impl<'a> TraceData for SliceData<'a> {
126132
}
127133
}
128134

135+
129136
#[derive(Debug)]
130137
pub struct SpanKeyParseError {
131138
pub message: String,
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use std::collections::HashMap;
2+
use std::marker::PhantomData;
3+
use crate::span::TraceData;
4+
5+
trait TraceDataType {
6+
type Data<T: TraceData>;
7+
}
8+
#[derive(Debug, Default, Eq, PartialEq, Hash)]
9+
pub struct TraceDataBytes;
10+
impl TraceDataType for TraceDataBytes {
11+
type Data<T: TraceData> = T::Bytes;
12+
}
13+
#[derive(Debug, Default, Eq, PartialEq, Hash)]
14+
pub struct TraceDataText;
15+
impl TraceDataType for TraceDataText {
16+
type Data<T: TraceData> = T::Text;
17+
}
18+
19+
#[derive(Copy, Debug, Default, Eq, PartialEq, Hash)]
20+
#[repr(transparent)]
21+
pub struct TraceDataRef<T: TraceDataType> {
22+
index: u32,
23+
_phantom: PhantomData<T>,
24+
}
25+
26+
impl<T: TraceDataType> TraceDataRef<T> {
27+
fn new(r#ref: u32) -> Self {
28+
Self {
29+
index: r#ref,
30+
_phantom: PhantomData,
31+
}
32+
}
33+
}
34+
35+
pub type TraceStringRef = TraceDataRef<TraceDataText>;
36+
pub type TraceBytesRef = TraceDataRef<TraceDataBytes>;
37+
38+
struct StaticDataValue<T> {
39+
value: T,
40+
rc: u32,
41+
}
42+
43+
pub struct StaticDataVec<T: TraceData, D: TraceDataType> {
44+
vec: Vec<StaticDataValue<D::Data::<T>>>,
45+
// This HashMap is probably the bottleneck. However we are required to ensure every string only exists once.
46+
table: HashMap<D::Data::<T>, TraceDataRef<D>>,
47+
}
48+
49+
impl<T: TraceData, D: TraceDataType> Default for StaticDataVec<T, D> {
50+
fn default() -> Self {
51+
Self {
52+
vec: vec![StaticDataValue {
53+
value: D::Data::<T>::default(),
54+
rc: 1 << 30, // so that we can just have TraceDataRef::new(0) as default without the rc ever reaching 0
55+
}],
56+
table: HashMap::from([(D::Data::<T>::default(), TraceDataRef::new(0))]),
57+
}
58+
}
59+
}
60+
61+
impl<T: TraceData, D: TraceDataType> StaticDataVec<T, D> {
62+
pub fn get(&self, r#ref: TraceDataRef<D>) -> &D::Data::<T> {
63+
&self.vec[r#ref.index as usize].value
64+
}
65+
66+
pub fn add(&mut self, value: D::Data::<T>) -> TraceDataRef<D> {
67+
if let Some(r#ref) = self.table.get(&value) {
68+
self.vec[r#ref.index as usize].rc += 1;
69+
return *r#ref;
70+
}
71+
let index = self.vec.len() as u32;
72+
self.table.insert(value.clone(), TraceDataRef::new(index));
73+
self.vec.push(StaticDataValue {
74+
value,
75+
rc: 1,
76+
});
77+
TraceDataRef::new(index)
78+
}
79+
80+
pub fn update(&mut self, r#ref: &mut TraceDataRef<D>, value: D::Data::<T>) {
81+
let entry = &mut self.vec[r#ref.index as usize];
82+
if entry.rc == 1 {
83+
self.table.remove(&entry.value);
84+
self.table.insert(value.clone(), *r#ref);
85+
entry.value = value;
86+
} else {
87+
entry.rc -= 1;
88+
*r#ref = self.add(entry.value);
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)