Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
bf7c0e7
Make payload types consistent with lookup table
JakeDern Feb 6, 2026
6be535a
c
JakeDern Feb 11, 2026
e87efd8
c
JakeDern Feb 11, 2026
8284259
c
JakeDern Feb 11, 2026
84790dc
Test scaffolding
JakeDern Feb 11, 2026
1de57bd
c
JakeDern Feb 11, 2026
58054cd
more test scaffolding
JakeDern Feb 11, 2026
a17ee68
More test scaffolding
JakeDern Feb 11, 2026
fb8b45b
tests
JakeDern Feb 11, 2026
e732898
tests
JakeDern Feb 11, 2026
2f49369
tests
JakeDern Feb 11, 2026
b2fe97d
tests
JakeDern Feb 11, 2026
864970b
tests
JakeDern Feb 11, 2026
8d8f25f
tests
JakeDern Feb 11, 2026
da423a7
comments
JakeDern Feb 11, 2026
7ab6b56
More tests
JakeDern Feb 11, 2026
c54a4d6
Clippy lints
JakeDern Feb 11, 2026
b7508f3
Lints
JakeDern Feb 11, 2026
89ec87b
More tests
JakeDern Feb 11, 2026
0020df3
Handle u16 max cases differently
JakeDern Feb 11, 2026
a4f10ab
refactor
JakeDern Feb 11, 2026
5f63ad7
commonize root payload checking
JakeDern Feb 11, 2026
d810314
tests
JakeDern Feb 11, 2026
23167e3
traces
JakeDern Feb 11, 2026
93c872e
Better dictionary handling
JakeDern Feb 11, 2026
e0129eb
Support dictionary encodings for id columns in otlp models
JakeDern Feb 11, 2026
2ded77a
formatting
JakeDern Feb 11, 2026
21bfde4
metrics
JakeDern Feb 12, 2026
d81214a
metrics
JakeDern Feb 12, 2026
2caf485
fix lints
JakeDern Feb 12, 2026
74b1444
Fix id collection
JakeDern Feb 12, 2026
58d65b3
lints
JakeDern Feb 12, 2026
e641803
Move pretty import to the print loop
JakeDern Feb 12, 2026
fce9b14
better debug output
JakeDern Feb 12, 2026
cb4e63a
Fix reindexing bug when there are referential integrity problems
JakeDern Feb 12, 2026
250de6d
c
JakeDern Feb 12, 2026
22379df
c
JakeDern Feb 12, 2026
0e04d7e
comments
JakeDern Feb 12, 2026
ffd7b59
c
JakeDern Feb 12, 2026
8f04d39
c
JakeDern Feb 12, 2026
bb758b8
c
JakeDern Feb 12, 2026
ef203f2
Add license header
JakeDern Feb 12, 2026
90e1899
Scaffold better payload relation info
JakeDern Feb 12, 2026
b6d86c3
add more info to payload relations
JakeDern Feb 12, 2026
7da252c
Reduce some allocations
JakeDern Feb 12, 2026
5cfa2e5
Comments
JakeDern Feb 12, 2026
13cbee8
don't forget to remove
JakeDern Feb 12, 2026
9fa065d
c
JakeDern Feb 12, 2026
d15a9b1
remove the dictionary encodings for id columns
JakeDern Feb 12, 2026
6bfab3c
Update tests for dict ids
JakeDern Feb 12, 2026
2843a48
Add transport optimized tests
JakeDern Feb 12, 2026
f76a4cf
c
JakeDern Feb 12, 2026
ad5d032
c
JakeDern Feb 12, 2026
3fc0f85
Remove referential integrity violations
JakeDern Feb 12, 2026
ec807f1
refactor primary id length checking
JakeDern Feb 12, 2026
b10440c
comment
JakeDern Feb 12, 2026
0c4b5ff
PR feedback
JakeDern Feb 13, 2026
81f17e1
Merge branch 'main' into jakedern/reindex
albertlockett Feb 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions rust/otap-dataflow/crates/pdata/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,20 @@ pub enum Error {
#[error("Mixed signals")]
MixedSignals,

#[error(
"Too many items. signal: {:?}, size: {}, max: {}, message: {}",
payload_type,
count,
max,
message
)]
TooManyItems {
payload_type: ArrowPayloadType,
count: usize,
max: u64,
message: String,
},

#[error("Encoding error: {}", error)]
Encoding {
#[from]
Expand Down
49 changes: 45 additions & 4 deletions rust/otap-dataflow/crates/pdata/src/otap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,24 @@ impl OtapArrowRecords {
}
}

impl From<Logs> for OtapArrowRecords {
fn from(logs: Logs) -> Self {
Self::Logs(logs)
}
}

impl From<Metrics> for OtapArrowRecords {
fn from(metrics: Metrics) -> Self {
Self::Metrics(metrics)
}
}

impl From<Traces> for OtapArrowRecords {
fn from(traces: Traces) -> Self {
Self::Traces(traces)
}
}

/// The ArrowBatchStore helper trait is used to define a common interface for
/// storing and retrieving Arrow record batches in a type-safe manner. It is
/// implemented by various structs that represent each signal type and provides
Expand Down Expand Up @@ -187,6 +205,12 @@ pub trait OtapBatchStore: Default + Clone {
/// Return a list of the allowed payload types associated with this type of batch
fn allowed_payload_types() -> &'static [ArrowPayloadType];

/// Returns the payload type at the given index in [BatchArray]
#[must_use]
fn payload_type_at_idx(index: usize) -> ArrowPayloadType {
Self::allowed_payload_types()[index]
}

/// Decode the delta-encoded and quasi-delta encoded IDs & parent IDs on each Arrow
/// Record Batch contained in this Otap Batch
fn decode_transport_optimized_ids(otap_batch: &mut OtapArrowRecords) -> Result<()>;
Expand Down Expand Up @@ -340,9 +364,9 @@ impl OtapBatchStore for Logs {

fn allowed_payload_types() -> &'static [ArrowPayloadType] {
&[
ArrowPayloadType::Logs,
ArrowPayloadType::ResourceAttrs,
ArrowPayloadType::ScopeAttrs,
ArrowPayloadType::Logs,
ArrowPayloadType::LogAttrs,
]
}
Expand Down Expand Up @@ -414,6 +438,9 @@ const DATA_POINTS_TYPES: [ArrowPayloadType; 4] = [
];

/// Fetch the number of items as defined by the batching system
/// TODO [JD] we don't need a unifying function like this once we remove
/// all the usages in to groups.rs. Instead we can have each batch store
/// define these.
#[must_use]
fn num_items<const N: usize>(batches: &[Option<RecordBatch>; N]) -> usize {
match N {
Expand Down Expand Up @@ -482,10 +509,9 @@ impl OtapBatchStore for Metrics {

fn allowed_payload_types() -> &'static [ArrowPayloadType] {
&[
ArrowPayloadType::UnivariateMetrics,
ArrowPayloadType::MultivariateMetrics,
ArrowPayloadType::ResourceAttrs,
ArrowPayloadType::ScopeAttrs,
ArrowPayloadType::UnivariateMetrics,
ArrowPayloadType::NumberDataPoints,
ArrowPayloadType::SummaryDataPoints,
ArrowPayloadType::HistogramDataPoints,
Expand All @@ -500,6 +526,7 @@ impl OtapBatchStore for Metrics {
ArrowPayloadType::NumberDpExemplarAttrs,
ArrowPayloadType::HistogramDpExemplarAttrs,
ArrowPayloadType::ExpHistogramDpExemplarAttrs,
ArrowPayloadType::MultivariateMetrics,
ArrowPayloadType::MetricAttrs,
]
}
Expand Down Expand Up @@ -717,9 +744,9 @@ impl OtapBatchStore for Traces {

fn allowed_payload_types() -> &'static [ArrowPayloadType] {
&[
ArrowPayloadType::Spans,
ArrowPayloadType::ResourceAttrs,
ArrowPayloadType::ScopeAttrs,
ArrowPayloadType::Spans,
ArrowPayloadType::SpanAttrs,
ArrowPayloadType::SpanEvents,
ArrowPayloadType::SpanLinks,
Expand Down Expand Up @@ -830,6 +857,7 @@ impl OtapBatchStore for Traces {
}

/// Return the child payload types for the given payload type
/// TODO [JD]: This is pretty much made obsolete by payload_relations
#[must_use]
pub const fn child_payload_types(payload_type: ArrowPayloadType) -> &'static [ArrowPayloadType] {
match payload_type {
Expand Down Expand Up @@ -3374,4 +3402,17 @@ mod test {
}
}
}

#[test]
fn test_idx_to_payload_type() {
test_idx_to_payload_type_impl::<Logs>();
test_idx_to_payload_type_impl::<Metrics>();
test_idx_to_payload_type_impl::<Traces>();
}

fn test_idx_to_payload_type_impl<T: OtapBatchStore>() {
for i in 0..T::allowed_payload_types().len() {
assert_eq!(T::payload_type_at_idx(i), T::allowed_payload_types()[i]);
}
}
}
53 changes: 26 additions & 27 deletions rust/otap-dataflow/crates/pdata/src/otap/batching_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,18 @@ fn generate_metrics_batching_test_cases() -> Vec<MetricsBatchingTestCase> {
}

// All four metric types present - temporarily disabled due to schema compatibility issues
// TODO: Investigate and fix schema merging for mixed metric types
// for limit in [10, 20, 50] {
// add_case(
// &format!("all_types_small_limit_{}", limit),
// MetricsConfig::new()
// .with_gauges(vec![2, 3])
// .with_sums(vec![2])
// .with_histograms(vec![1, 2])
// .with_summaries(vec![1]),
// limit as u64,
// 3,
// );
// }
Comment thread
JakeDern marked this conversation as resolved.
for limit in [10, 20, 50] {
add_case(
&format!("all_types_small_limit_{}", limit),
MetricsConfig::new()
.with_gauges(vec![2, 3])
.with_sums(vec![2])
.with_histograms(vec![1, 2])
.with_summaries(vec![1]),
limit as u64,
3,
);
}

// Mixed sizes: some fit, some oversized
for limit in [10, 15, 20] {
Expand Down Expand Up @@ -143,19 +142,18 @@ fn generate_metrics_batching_test_cases() -> Vec<MetricsBatchingTestCase> {
}

// Complex multi-type scenarios - temporarily disabled
// TODO: Investigate and fix schema merging for mixed metric types
// for limit in [25, 50, 100] {
// add_case(
// &format!("complex_all_types_limit_{}", limit),
// MetricsConfig::new()
// .with_gauges(vec![3, 5, 7, 2])
// .with_sums(vec![4, 6, 3])
// .with_histograms(vec![2, 8])
// .with_summaries(vec![5, 3, 2]),
// limit as u64,
// 6,
// );
// }
for limit in [25, 50, 100] {
add_case(
&format!("complex_all_types_limit_{}", limit),
MetricsConfig::new()
.with_gauges(vec![3, 5, 7, 2])
.with_sums(vec![4, 6, 3])
.with_histograms(vec![2, 8])
.with_summaries(vec![5, 3, 2]),
limit as u64,
6,
);
}

// Stress test: many metrics of varying sizes - single type only
for limit in [50, 100, 200] {
Expand Down Expand Up @@ -220,7 +218,8 @@ fn generate_metrics_batching_test_cases() -> Vec<MetricsBatchingTestCase> {
}

// Test with varying attributes enabled - temporarily disabled
// TODO: Investigate parent_id column type issue with varying attributes
// TODO: Investigate parent_id column type issue with varying attributes,
// there is a problem with splitting
// for limit in [20, 50] {
// add_case(
// &format!("with_attrs_gauges_limit_{}", limit),
Expand Down
Loading
Loading