Skip to content

Commit 75cc57b

Browse files
columnar query engine support basic filtering of spans and metrics (open-telemetry#1560)
part of open-telemetry#1508 Adds support to the columnar query engine to filter metrics and traces. The thing that needed to be implemented to support this was just to traverse down the tree of payload types and remove rows whose parent was filtered out. Because attributes w/ 32 bit parent IDs can have this column dictionary encoded, I decided to expose the arrays::MaybeDictionaryAccessor from the OTAP Dataflow Pdata crate. I wasn't ready to make everything in this module public, so many methods and types are changed to `pub(crate)` in this module. Note - only basic filtering support is added currently. There's no capability to say, filter metrics by datapoint attributes, filter datapoint attributes themselves, and likewise no ability to filter span's spanevents/span links. All this can be added in future
1 parent 539eefe commit 75cc57b

7 files changed

Lines changed: 1033 additions & 140 deletions

File tree

rust/experimental/query_engine/engine-columnar/src/pipeline.rs

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,12 @@ mod test {
257257
use datafusion::logical_expr::{col, lit};
258258
use otap_df_pdata::proto::OtlpProtoMessage;
259259
use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType;
260+
use otap_df_pdata::proto::opentelemetry::metrics::v1::{
261+
Metric, MetricsData, ResourceMetrics, ScopeMetrics,
262+
};
263+
use otap_df_pdata::proto::opentelemetry::trace::v1::{
264+
ResourceSpans, ScopeSpans, Span, TracesData,
265+
};
260266

261267
use otap_df_pdata::proto::opentelemetry::logs::v1::{
262268
LogRecord, LogsData, ResourceLogs, ScopeLogs,
@@ -278,19 +284,55 @@ mod test {
278284
}
279285
}
280286

287+
/// helper function for converting [`Metric`]s to [`MetricsData`]
288+
pub fn to_metrics_data(metrics: Vec<Metric>) -> MetricsData {
289+
MetricsData {
290+
resource_metrics: vec![ResourceMetrics {
291+
scope_metrics: vec![ScopeMetrics {
292+
metrics,
293+
..Default::default()
294+
}],
295+
..Default::default()
296+
}],
297+
}
298+
}
299+
300+
/// helper function for converting [`Span`]s to [`TracesData`]
301+
pub fn to_traces_data(spans: Vec<Span>) -> TracesData {
302+
TracesData {
303+
resource_spans: vec![ResourceSpans {
304+
scope_spans: vec![ScopeSpans {
305+
spans,
306+
..Default::default()
307+
}],
308+
..Default::default()
309+
}],
310+
}
311+
}
312+
281313
/// helper function for converting OTLP logs to OTAP batch
282-
pub fn to_otap(log_records: Vec<LogRecord>) -> OtapArrowRecords {
314+
pub fn to_otap_logs(log_records: Vec<LogRecord>) -> OtapArrowRecords {
283315
otlp_to_otap(&OtlpProtoMessage::Logs(to_logs_data(log_records)))
284316
}
285317

318+
/// helper function for converting OTLP spans to OTAP batch
319+
pub fn to_otap_traces(spans: Vec<Span>) -> OtapArrowRecords {
320+
otlp_to_otap(&OtlpProtoMessage::Traces(to_traces_data(spans)))
321+
}
322+
323+
/// helper function for converting OTLP metrics to OTAP batch
324+
pub fn to_otap_metrics(metrics: Vec<Metric>) -> OtapArrowRecords {
325+
otlp_to_otap(&OtlpProtoMessage::Metrics(to_metrics_data(metrics)))
326+
}
327+
286328
#[tokio::test]
287329
async fn test_pipeline_execute_multi_batch() {
288330
// TODO eventually we might want to drive this test from a pipeline expression, which we
289331
// can do once we have the query planning implemented. For now, we are manually creating
290332
// the `PlannedPipeline` and its `PipelineStage`s and any additional datafusion context
291333
// they need
292334

293-
let otap_batch1 = to_otap(vec![
335+
let otap_batch1 = to_otap_logs(vec![
294336
LogRecord {
295337
severity_text: "ERROR".into(),
296338
event_name: "1".into(),
@@ -308,7 +350,7 @@ mod test {
308350
},
309351
]);
310352

311-
let otap_batch2 = to_otap(vec![
353+
let otap_batch2 = to_otap_logs(vec![
312354
LogRecord {
313355
severity_text: "DEBUG".into(),
314356
event_name: "4".into(),

0 commit comments

Comments
 (0)