Skip to content

Commit 89d92af

Browse files
authored
[otap-dataflow] fix exclude filter returning empty batch open-telemetry#1483 (open-telemetry#1504)
Fixes open-telemetry#1483 Adds check for fields where an empty vec provided so nothing should be excluded
1 parent be7c211 commit 89d92af

4 files changed

Lines changed: 167 additions & 23 deletions

File tree

rust/otap-dataflow/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ quiver = { package = "otap-df-quiver", path = "crates/quiver" }
4949

5050
ahash = "0.8.11"
5151
arrayvec = "0.7.6"
52-
arrow = "57.0"
52+
arrow = { version = "57.0", features=["prettyprint"] }
5353
arrow-ipc = { version = "57.0", features=["zstd"] }
5454
arrow-schema = { version = "57.0" }
5555
arrow-array = { version = "57.0" }

rust/otap-dataflow/crates/otap/src/filter_processor.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,6 @@ impl local::Processor<OtapPdata> for FilterProcessor {
181181
mod tests {
182182
use crate::filter_processor::{FILTER_PROCESSOR_URN, FilterProcessor, config::Config};
183183
use crate::pdata::OtapPdata;
184-
use bytes::BytesMut;
185184
use otap_df_config::node::NodeUserConfig;
186185
use otap_df_engine::context::ControllerContext;
187186
use otap_df_engine::message::Message;
@@ -623,12 +622,11 @@ mod tests {
623622
move |mut ctx| {
624623
Box::pin(async move {
625624
//convert logsdata to otappdata
626-
let mut bytes = BytesMut::new();
625+
let mut bytes = vec![];
627626
sent.encode(&mut bytes)
628627
.expect("failed to encode log data into bytes");
629-
let bytes = bytes.freeze();
630628
let otlp_logs_bytes =
631-
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes).into());
629+
OtapPdata::new_default(OtlpProtoBytes::ExportLogsRequest(bytes.into()).into());
632630
ctx.process(Message::PData(otlp_logs_bytes))
633631
.await
634632
.expect("failed to process");
@@ -658,13 +656,13 @@ mod tests {
658656
Box::pin(async move {
659657
//convert tracesdata to otappdata
660658
let traces_data = build_traces();
661-
let mut bytes = BytesMut::new();
659+
let mut bytes = vec![];
662660
traces_data
663661
.encode(&mut bytes)
664662
.expect("failed to encode trace data into bytes");
665-
let bytes = bytes.freeze();
666-
let otlp_traces_bytes =
667-
OtapPdata::new_default(OtlpProtoBytes::ExportTracesRequest(bytes).into());
663+
let otlp_traces_bytes = OtapPdata::new_default(
664+
OtlpProtoBytes::ExportTracesRequest(bytes.into()).into(),
665+
);
668666
ctx.process(Message::PData(otlp_traces_bytes))
669667
.await
670668
.expect("failed to process");

rust/otap-dataflow/crates/pdata/src/otap/filter/logs.rs

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -338,12 +338,30 @@ impl LogMatchProperties {
338338

339339
// invert flag depending on whether we are excluding or including
340340
if invert {
341-
resource_attr_filter =
342-
arrow::compute::not(&resource_attr_filter).expect("not doesn't fail");
341+
// default filter is all true
343342

344-
log_record_filter = arrow::compute::not(&log_record_filter).expect("not doesn't fail");
343+
// if no resource_attributes to filter on are defined then we can ignore them
344+
// that is we will resort to the default filter otherwise we can invert if the flag is set
345+
if !self.resource_attributes.is_empty() {
346+
resource_attr_filter =
347+
arrow::compute::not(&resource_attr_filter).expect("not doesn't fail");
348+
}
349+
350+
// if no log records fields to filter on are defined then we can ignore them
351+
// that is we will resort to the default filter otherwise we can invert if the flag is set
352+
if !self.bodies.is_empty()
353+
|| !self.severity_texts.is_empty()
354+
|| self.severity_number.is_some()
355+
{
356+
log_record_filter =
357+
arrow::compute::not(&log_record_filter).expect("not doesn't fail");
358+
}
345359

346-
log_attr_filter = arrow::compute::not(&log_attr_filter).expect("not doesn't fail");
360+
// if no record_attributes to filter on are defined then we can ignore them
361+
// that is we will resort to the default filter otherwise we can invert if the flag is set
362+
if !self.record_attributes.is_empty() {
363+
log_attr_filter = arrow::compute::not(&log_attr_filter).expect("not doesn't fail");
364+
}
347365
}
348366

349367
Ok((resource_attr_filter, log_record_filter, log_attr_filter))
@@ -583,4 +601,54 @@ mod test {
583601

584602
assert_equivalent(&[otap_to_otlp(&result)], &[otap_to_otlp(&expected)]);
585603
}
604+
605+
#[test]
606+
fn test_filter_exclude_no_attributes() {
607+
// Filter only for WARN logs
608+
let exclude = LogMatchProperties::new(
609+
MatchType::Strict,
610+
Vec::new(),
611+
Vec::new(),
612+
vec!["WARN".into()],
613+
None,
614+
Vec::new(),
615+
);
616+
617+
let filter = LogFilter::new(None, Some(exclude), Vec::new());
618+
619+
let log_records = vec![
620+
LogRecord::build().severity_text("WARN").finish(),
621+
LogRecord::build().severity_text("WARN").finish(),
622+
LogRecord::build().severity_text("INFO").finish(),
623+
LogRecord::build().severity_text("INFO").finish(),
624+
];
625+
626+
let logs_data = LogsData {
627+
resource_logs: vec![ResourceLogs {
628+
scope_logs: vec![ScopeLogs {
629+
log_records: log_records.clone(),
630+
..Default::default()
631+
}],
632+
..Default::default()
633+
}],
634+
};
635+
636+
let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data));
637+
638+
let (result, logs_consumed, logs_filtered) = filter.filter(input).unwrap();
639+
assert_eq!(logs_consumed, 4);
640+
assert_eq!(logs_filtered, 2);
641+
642+
let expected = otlp_to_otap(&OtlpProtoMessage::Logs(LogsData {
643+
resource_logs: vec![ResourceLogs {
644+
scope_logs: vec![ScopeLogs {
645+
log_records: log_records[2..4].to_vec(),
646+
..Default::default()
647+
}],
648+
..Default::default()
649+
}],
650+
}));
651+
652+
assert_equivalent(&[otap_to_otlp(&result)], &[otap_to_otlp(&expected)]);
653+
}
586654
}

rust/otap-dataflow/crates/pdata/src/otap/filter/traces.rs

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -562,20 +562,48 @@ impl TraceMatchProperties {
562562

563563
// invert flag depending on whether we are excluding or including
564564
if invert {
565-
resource_attr_filter =
566-
arrow::compute::not(&resource_attr_filter).expect("not doesn't fail");
565+
// default filter is all true
567566

568-
span_filter = arrow::compute::not(&span_filter).expect("not doesn't fail");
567+
// if no resource_attributes to filter on are defined then we can ignore them
568+
// that is we will resort to the default filter otherwise we can invert if the flag is set
569+
if !self.resource_attributes.is_empty() {
570+
resource_attr_filter =
571+
arrow::compute::not(&resource_attr_filter).expect("not doesn't fail");
572+
}
569573

570-
span_attr_filter = arrow::compute::not(&span_attr_filter).expect("not doesn't fail");
574+
// if no span_names to filter on are defined then we can ignore them
575+
// that is we will resort to the default filter otherwise we can invert if the flag is set
576+
if !self.span_names.is_empty() {
577+
span_filter = arrow::compute::not(&span_filter).expect("not doesn't fail");
578+
}
571579

572-
span_event_filter = arrow::compute::not(&span_event_filter).expect("not doesn't fail");
580+
// if no span_attributes to filter on are defined then we can ignore them
581+
// that is we will resort to the default filter otherwise we can invert if the flag is set
582+
if !self.span_attributes.is_empty() {
583+
span_attr_filter =
584+
arrow::compute::not(&span_attr_filter).expect("not doesn't fail");
585+
}
573586

574-
span_event_attr_filter =
575-
arrow::compute::not(&span_event_attr_filter).expect("not doesn't fail");
587+
// if no event_names to filter on are defined then we can ignore them
588+
// that is we will resort to the default filter otherwise we can invert if the flag is set
589+
if !self.event_names.is_empty() {
590+
span_event_filter =
591+
arrow::compute::not(&span_event_filter).expect("not doesn't fail");
592+
}
576593

577-
span_link_attr_filter =
578-
arrow::compute::not(&span_link_attr_filter).expect("not doesn't fail");
594+
// if no event_attributes to filter on are defined then we can ignore them
595+
// that is we will resort to the default filter otherwise we can invert if the flag is set
596+
if !self.event_attributes.is_empty() {
597+
span_event_attr_filter =
598+
arrow::compute::not(&span_event_attr_filter).expect("not doesn't fail");
599+
}
600+
601+
// if no link_attributes to filter on are defined then we can ignore them
602+
// that is we will resort to the default filter otherwise we can invert if the flag is set
603+
if !self.link_attributes.is_empty() {
604+
span_link_attr_filter =
605+
arrow::compute::not(&span_link_attr_filter).expect("not doesn't fail");
606+
}
579607
}
580608

581609
Ok((
@@ -689,7 +717,7 @@ mod test {
689717
use crate::testing::round_trip::{otap_to_otlp, otlp_to_otap};
690718

691719
#[test]
692-
fn test_filter_no_attributes() {
720+
fn test_filter_include_no_attributes() {
693721
let include = TraceMatchProperties::new(
694722
MatchType::Strict,
695723
Vec::new(),
@@ -737,4 +765,54 @@ mod test {
737765

738766
assert_equivalent(&[otap_to_otlp(&result)], &[otap_to_otlp(&expected)]);
739767
}
768+
769+
#[test]
770+
fn test_filter_exclude_no_attributes() {
771+
let exclude = TraceMatchProperties::new(
772+
MatchType::Strict,
773+
Vec::new(),
774+
Vec::new(),
775+
vec!["span_name_1".into()],
776+
Vec::new(),
777+
Vec::new(),
778+
Vec::new(),
779+
);
780+
781+
let filter = TraceFilter::new(None, Some(exclude));
782+
783+
let spans = vec![
784+
Span::build().name("span_name_1").finish(),
785+
Span::build().name("span_name_1").finish(),
786+
Span::build().name("span_name_2").finish(),
787+
Span::build().name("span_name_2").finish(),
788+
];
789+
790+
let traces_data = TracesData {
791+
resource_spans: vec![ResourceSpans {
792+
scope_spans: vec![ScopeSpans {
793+
spans: spans.clone(),
794+
..Default::default()
795+
}],
796+
..Default::default()
797+
}],
798+
};
799+
800+
let input = otlp_to_otap(&OtlpProtoMessage::Traces(traces_data));
801+
let (result, spans_consumed, spans_filtered) = filter.filter(input).unwrap();
802+
803+
assert_eq!(spans_consumed, 4);
804+
assert_eq!(spans_filtered, 2);
805+
806+
let expected = otlp_to_otap(&OtlpProtoMessage::Traces(TracesData {
807+
resource_spans: vec![ResourceSpans {
808+
scope_spans: vec![ScopeSpans {
809+
spans: spans[2..4].to_vec(),
810+
..Default::default()
811+
}],
812+
..Default::default()
813+
}],
814+
}));
815+
816+
assert_equivalent(&[otap_to_otlp(&result)], &[otap_to_otlp(&expected)]);
817+
}
740818
}

0 commit comments

Comments
 (0)