Skip to content

Commit f3080ac

Browse files
authored
Wrap resulting PipelineExpression in ParserResult (open-telemetry#1521)
Superficial change for futureproofing. Doing some initial exploration about tracking `ParserMapSchema` changes during parser executions - if it is possible to return modified output schema to the caller IN ADDITION TO `PipelineExpression`, this abstraction makes it easier to add.
1 parent 7b23c81 commit f3080ac

7 files changed

Lines changed: 72 additions & 50 deletions

File tree

rust/experimental/query_engine/engine-columnar/benches/filter.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ fn bench_log_pipeline(
3030
let _ = group.bench_with_input(benchmark_id, &batch_size, |b, batch_size| {
3131
b.iter_custom(|iters| {
3232
let batch = generate_logs_batch(**batch_size);
33-
let query = KqlParser::parse(bench_pipeline_kql).expect("can parse pipeline");
34-
let mut pipeline = Pipeline::new(query);
33+
let parser_result =
34+
KqlParser::parse(bench_pipeline_kql).expect("can parse pipeline");
35+
let mut pipeline = Pipeline::new(parser_result.pipeline);
3536
rt.block_on(async move {
3637
// execute the query once to initiate planning
3738
pipeline.execute(batch.clone()).await.unwrap();

rust/experimental/query_engine/engine-columnar/src/pipeline/filter.rs

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,8 +1177,8 @@ mod test {
11771177

11781178
pub async fn exec_logs_pipeline(kql_expr: &str, logs_data: LogsData) -> LogsData {
11791179
let otap_batch = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data));
1180-
let pipeline_expr = KqlParser::parse(kql_expr).unwrap();
1181-
let mut pipeline = Pipeline::new(pipeline_expr);
1180+
let parser_result = KqlParser::parse(kql_expr).unwrap();
1181+
let mut pipeline = Pipeline::new(parser_result.pipeline);
11821182
let result = pipeline.execute(otap_batch.clone()).await.unwrap();
11831183
otap_to_logs_data(result)
11841184
}
@@ -1200,8 +1200,8 @@ mod test {
12001200
.finish(),
12011201
]);
12021202

1203-
let pipeline_expr = KqlParser::parse("logs | where severity_text == \"ERROR\"").unwrap();
1204-
let mut pipeline = Pipeline::new(pipeline_expr);
1203+
let parser_result = KqlParser::parse("logs | where severity_text == \"ERROR\"").unwrap();
1204+
let mut pipeline = Pipeline::new(parser_result.pipeline);
12051205
let result = pipeline.execute(otap_batch.clone()).await.unwrap();
12061206
let expected = to_otap(vec![
12071207
LogRecord::build()
@@ -1212,8 +1212,8 @@ mod test {
12121212
assert_eq!(result, expected);
12131213

12141214
// test same filter where the literal is on the left and column name on the right
1215-
let pipeline_expr = KqlParser::parse("logs | where \"ERROR\" == severity_text").unwrap();
1216-
let mut pipeline = Pipeline::new(pipeline_expr);
1215+
let parser_result = KqlParser::parse("logs | where \"ERROR\" == severity_text").unwrap();
1216+
let mut pipeline = Pipeline::new(parser_result.pipeline);
12171217
let result = pipeline.execute(otap_batch.clone()).await.unwrap();
12181218
assert_eq!(result, expected);
12191219
}
@@ -1242,8 +1242,8 @@ mod test {
12421242
.finish(),
12431243
];
12441244

1245-
let pipeline_expr = KqlParser::parse("logs | where attributes[\"x\"] == \"b\"").unwrap();
1246-
let mut pipeline = Pipeline::new(pipeline_expr);
1245+
let parser_result = KqlParser::parse("logs | where attributes[\"x\"] == \"b\"").unwrap();
1246+
let mut pipeline = Pipeline::new(parser_result.pipeline);
12471247
let result = pipeline.execute(otap_batch.clone()).await.unwrap();
12481248
let result_otlp = otap_to_logs_data(result);
12491249
pretty_assertions::assert_eq!(
@@ -1252,8 +1252,8 @@ mod test {
12521252
);
12531253

12541254
// test same filter where the literal is on the left and the attribute is on the right
1255-
let pipeline_expr = KqlParser::parse("logs | where \"b\" == attributes[\"x\"]").unwrap();
1256-
let mut pipeline = Pipeline::new(pipeline_expr);
1255+
let parser_result = KqlParser::parse("logs | where \"b\" == attributes[\"x\"]").unwrap();
1256+
let mut pipeline = Pipeline::new(parser_result.pipeline);
12571257
let result = pipeline.execute(otap_batch.clone()).await.unwrap();
12581258
let result_otlp = otap_to_logs_data(result);
12591259
pretty_assertions::assert_eq!(
@@ -1449,10 +1449,10 @@ mod test {
14491449
let otap_batch = to_otap(log_records.clone());
14501450

14511451
// check simple filter "and" properties
1452-
let pipeline_expr =
1452+
let parser_result =
14531453
KqlParser::parse("logs | where severity_text == \"ERROR\" and event_name == \"2\"")
14541454
.unwrap();
1455-
let mut pipeline = Pipeline::new(pipeline_expr);
1455+
let mut pipeline = Pipeline::new(parser_result.pipeline);
14561456
let result = pipeline.execute(otap_batch.clone()).await.unwrap();
14571457
let result_otlp = otap_to_logs_data(result);
14581458
pretty_assertions::assert_eq!(
@@ -1461,11 +1461,11 @@ mod test {
14611461
);
14621462

14631463
// check simple filter "and" with attributes
1464-
let pipeline_expr = KqlParser::parse(
1464+
let parser_result = KqlParser::parse(
14651465
"logs | where severity_text == \"ERROR\" and attributes[\"x\"] == \"c\"",
14661466
)
14671467
.unwrap();
1468-
let mut pipeline = Pipeline::new(pipeline_expr);
1468+
let mut pipeline = Pipeline::new(parser_result.pipeline);
14691469
let result = pipeline.execute(otap_batch.clone()).await.unwrap();
14701470
let result_otlp = otap_to_logs_data(result);
14711471
pretty_assertions::assert_eq!(
@@ -1474,11 +1474,11 @@ mod test {
14741474
);
14751475

14761476
// check simple filter "and" two attributes
1477-
let pipeline_expr = KqlParser::parse(
1477+
let parser_result = KqlParser::parse(
14781478
"logs | where attributes[\"y\"] == \"d\" and attributes[\"x\"] == \"a\"",
14791479
)
14801480
.unwrap();
1481-
let mut pipeline = Pipeline::new(pipeline_expr);
1481+
let mut pipeline = Pipeline::new(parser_result.pipeline);
14821482
let result = pipeline.execute(otap_batch.clone()).await.unwrap();
14831483
let result_otlp = otap_to_logs_data(result);
14841484
pretty_assertions::assert_eq!(
@@ -1518,11 +1518,11 @@ mod test {
15181518
let otap_batch = to_otap(log_records.clone());
15191519

15201520
// check simple filter "or" with properties predicates
1521-
let pipeline_expr = KqlParser::parse(
1521+
let parser_result = KqlParser::parse(
15221522
"logs | where severity_text == \"INFO\" or severity_text == \"ERROR\"",
15231523
)
15241524
.unwrap();
1525-
let mut pipeline = Pipeline::new(pipeline_expr);
1525+
let mut pipeline = Pipeline::new(parser_result.pipeline);
15261526
let result = pipeline.execute(otap_batch.clone()).await.unwrap();
15271527
let result_otlp = otap_to_logs_data(result);
15281528
pretty_assertions::assert_eq!(
@@ -1531,11 +1531,11 @@ mod test {
15311531
);
15321532

15331533
// check simple filter "or" with mixed attributes/properties predicates
1534-
let pipeline_expr = KqlParser::parse(
1534+
let parser_result = KqlParser::parse(
15351535
"logs | where severity_text == \"ERROR\" or attributes[\"x\"] == \"c\"",
15361536
)
15371537
.unwrap();
1538-
let mut pipeline = Pipeline::new(pipeline_expr);
1538+
let mut pipeline = Pipeline::new(parser_result.pipeline);
15391539
let result = pipeline.execute(otap_batch.clone()).await.unwrap();
15401540
let result_otlp = otap_to_logs_data(result);
15411541
pretty_assertions::assert_eq!(
@@ -1544,11 +1544,11 @@ mod test {
15441544
);
15451545

15461546
// check simple filter "or" two attributes predicates
1547-
let pipeline_expr = KqlParser::parse(
1547+
let parser_result = KqlParser::parse(
15481548
"logs | where attributes[\"x\"] == \"a\" or attributes[\"y\"] == \"e\"",
15491549
)
15501550
.unwrap();
1551-
let mut pipeline = Pipeline::new(pipeline_expr);
1551+
let mut pipeline = Pipeline::new(parser_result.pipeline);
15521552
let result = pipeline.execute(otap_batch.clone()).await.unwrap();
15531553
let result_otlp = otap_to_logs_data(result);
15541554
pretty_assertions::assert_eq!(
@@ -1835,8 +1835,8 @@ mod test {
18351835
.finish(),
18361836
];
18371837

1838-
let pipeline_expr = KqlParser::parse("logs | where event_name == \"5\"").unwrap();
1839-
let mut pipeline = Pipeline::new(pipeline_expr);
1838+
let parser_result = KqlParser::parse("logs | where event_name == \"5\"").unwrap();
1839+
let mut pipeline = Pipeline::new(parser_result.pipeline);
18401840
let result = pipeline
18411841
.execute(to_otap(log_records.clone()))
18421842
.await
@@ -1845,8 +1845,8 @@ mod test {
18451845
assert_eq!(result, OtapArrowRecords::Logs(Logs::default()));
18461846

18471847
// assert we have the correct behaviour when filtering by attributes as well
1848-
let pipeline_expr = KqlParser::parse("logs | where attributes[\"a\"] == \"1234\"").unwrap();
1849-
let mut pipeline = Pipeline::new(pipeline_expr);
1848+
let parser_result = KqlParser::parse("logs | where attributes[\"a\"] == \"1234\"").unwrap();
1849+
let mut pipeline = Pipeline::new(parser_result.pipeline);
18501850
let result = pipeline
18511851
.execute(to_otap(log_records.clone()))
18521852
.await
@@ -1857,8 +1857,8 @@ mod test {
18571857
#[tokio::test]
18581858
async fn test_empty_batch() {
18591859
let input = OtapArrowRecords::Logs(Logs::default());
1860-
let pipeline_expr = KqlParser::parse("logs | where event_name == \"5\"").unwrap();
1861-
let mut pipeline = Pipeline::new(pipeline_expr);
1860+
let parser_result = KqlParser::parse("logs | where event_name == \"5\"").unwrap();
1861+
let mut pipeline = Pipeline::new(parser_result.pipeline);
18621862
let result = pipeline.execute(input.clone()).await.unwrap();
18631863
assert_eq!(result, input);
18641864
}
@@ -1881,29 +1881,29 @@ mod test {
18811881
];
18821882

18831883
// check that if there are no attributes to filter by then, we get the empty batch
1884-
let pipeline_expr = KqlParser::parse("logs | where attributes[\"a\"] == \"1234\"").unwrap();
1885-
let mut pipeline = Pipeline::new(pipeline_expr);
1884+
let parser_result = KqlParser::parse("logs | where attributes[\"a\"] == \"1234\"").unwrap();
1885+
let mut pipeline = Pipeline::new(parser_result.pipeline);
18861886
let result = pipeline
18871887
.execute(to_otap(log_records.clone()))
18881888
.await
18891889
.unwrap();
18901890
assert_eq!(result, OtapArrowRecords::Logs(Logs::default()));
18911891

18921892
// check that the same result happens when filtering by resource and scope attrs
1893-
let pipeline_expr =
1893+
let parser_result =
18941894
KqlParser::parse("logs | where resource.attributes[\"a\"] == \"1234\"").unwrap();
1895-
let mut pipeline = Pipeline::new(pipeline_expr);
1895+
let mut pipeline = Pipeline::new(parser_result.pipeline);
18961896
let result = pipeline
18971897
.execute(to_otap(log_records.clone()))
18981898
.await
18991899
.unwrap();
19001900
assert_eq!(result, OtapArrowRecords::Logs(Logs::default()));
19011901

19021902
// check that the same result happens when filtering by resource and scope attrs
1903-
let pipeline_expr =
1903+
let parser_result =
19041904
KqlParser::parse("logs | where instrumentation_scope.attributes[\"a\"] == \"1234\"")
19051905
.unwrap();
1906-
let mut pipeline = Pipeline::new(pipeline_expr);
1906+
let mut pipeline = Pipeline::new(parser_result.pipeline);
19071907
let result = pipeline
19081908
.execute(to_otap(log_records.clone()))
19091909
.await
@@ -1916,8 +1916,8 @@ mod test {
19161916
"logs | where not(resource.attributes[\"a\"] == \"1234\")",
19171917
"logs | where not(instrumentation_scope.attributes[\"a\"] == \"1234\")",
19181918
] {
1919-
let pipeline_expr = KqlParser::parse(inverted_attrs_filter).unwrap();
1920-
let mut pipeline = Pipeline::new(pipeline_expr);
1919+
let parser_result = KqlParser::parse(inverted_attrs_filter).unwrap();
1920+
let mut pipeline = Pipeline::new(parser_result.pipeline);
19211921
let input = to_otap(log_records.clone());
19221922
let result = pipeline.execute(input.clone()).await.unwrap();
19231923
assert_eq!(result, input);
@@ -1930,8 +1930,8 @@ mod test {
19301930
// next, then present in the next, etc.
19311931

19321932
let query = "logs | where attributes[\"a\"] == \"1234\"";
1933-
let pipeline_expr = KqlParser::parse(query).unwrap();
1934-
let mut pipeline = Pipeline::new(pipeline_expr.clone());
1933+
let parser_result = KqlParser::parse(query).unwrap();
1934+
let mut pipeline = Pipeline::new(parser_result.pipeline);
19351935

19361936
// no attrs to start
19371937
let batch1 = to_otap(vec![LogRecord::build().event_name("a").finish()]);
@@ -1987,8 +1987,8 @@ mod test {
19871987

19881988
// assert the behaviour is correct when nothing is filtered out
19891989
let otap_input = to_otap(log_records);
1990-
let pipeline_expr = KqlParser::parse("logs | where severity_text == \"INFO\"").unwrap();
1991-
let mut pipeline = Pipeline::new(pipeline_expr);
1990+
let parser_result = KqlParser::parse("logs | where severity_text == \"INFO\"").unwrap();
1991+
let mut pipeline = Pipeline::new(parser_result.pipeline);
19921992
let result = pipeline.execute(otap_input.clone()).await.unwrap();
19931993

19941994
assert_eq!(result, otap_input)

rust/experimental/query_engine/engine-recordset-otlp-bridge/src/bridge.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ pub fn parse_kql_query_into_pipeline(
1919
query: &str,
2020
options: Option<BridgeOptions>,
2121
) -> Result<PipelineExpression, Vec<ParserError>> {
22-
KqlParser::parse_with_options(query, build_parser_options(options).map_err(|e| vec![e])?)
22+
let result =
23+
KqlParser::parse_with_options(query, build_parser_options(options).map_err(|e| vec![e])?)?;
24+
Ok(result.pipeline)
2325
}
2426

2527
pub fn register_pipeline_for_kql_query(
@@ -28,7 +30,8 @@ pub fn register_pipeline_for_kql_query(
2830
) -> Result<usize, Vec<ParserError>> {
2931
let options = build_parser_options(options).map_err(|e| vec![e])?;
3032

31-
let pipeline = KqlParser::parse_with_options(query, options.clone())?;
33+
let result = KqlParser::parse_with_options(query, options.clone())?;
34+
let pipeline = result.pipeline;
3235

3336
let mut expressions = EXPRESSIONS.write().unwrap();
3437
expressions.push((options, pipeline));
@@ -730,7 +733,8 @@ mod tests {
730733
"source | where gettype(TimeGenerated) == 'datetime'",
731734
options.clone(),
732735
)
733-
.unwrap();
736+
.unwrap()
737+
.pipeline;
734738

735739
let (included_records, dropped_records) =
736740
process_export_logs_service_request_using_pipeline(

rust/experimental/query_engine/kql-parser/src/kql_parser.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use data_engine_expressions::*;
54
use data_engine_parser_abstractions::*;
65
use pest_derive::Parser;
76

@@ -17,8 +16,9 @@ impl Parser for KqlParser {
1716
fn parse_with_options(
1817
query: &str,
1918
options: ParserOptions,
20-
) -> Result<PipelineExpression, Vec<ParserError>> {
21-
parse_query(query, options)
19+
) -> Result<ParserResult, Vec<ParserError>> {
20+
let pipeline = parse_query(query, options)?;
21+
Ok(ParserResult::new(pipeline))
2222
}
2323
}
2424

rust/experimental/query_engine/kql-parser/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ pub use data_engine_parser_abstractions::ParserError;
2828
pub use data_engine_parser_abstractions::ParserMapKeySchema;
2929
pub use data_engine_parser_abstractions::ParserMapSchema;
3030
pub use data_engine_parser_abstractions::ParserOptions;
31+
pub use data_engine_parser_abstractions::ParserResult;

rust/experimental/query_engine/ottl-parser/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ pub use data_engine_parser_abstractions::ParserError;
1616
pub use data_engine_parser_abstractions::ParserMapKeySchema;
1717
pub use data_engine_parser_abstractions::ParserMapSchema;
1818
pub use data_engine_parser_abstractions::ParserOptions;
19+
pub use data_engine_parser_abstractions::ParserResult;

rust/experimental/query_engine/parser-abstractions/src/parser.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ use data_engine_expressions::*;
88
use crate::*;
99

1010
pub trait Parser {
11-
fn parse(query: &str) -> Result<PipelineExpression, Vec<ParserError>> {
11+
fn parse(query: &str) -> Result<ParserResult, Vec<ParserError>> {
1212
Self::parse_with_options(query, ParserOptions::new())
1313
}
1414

1515
fn parse_with_options(
1616
query: &str,
1717
options: ParserOptions,
18-
) -> Result<PipelineExpression, Vec<ParserError>>;
18+
) -> Result<ParserResult, Vec<ParserError>>;
1919
}
2020

2121
type ParserFunctionDefinition = (
@@ -318,3 +318,18 @@ impl TryFrom<&str> for ParserMapKeySchema {
318318
}
319319
}
320320
}
321+
322+
/// Result returned by parsers, containing the parsed pipeline expression
323+
/// and any additional metadata that may be useful for consumers.
324+
#[derive(Debug, Clone, PartialEq)]
325+
pub struct ParserResult {
326+
/// The parsed pipeline expression
327+
pub pipeline: PipelineExpression,
328+
}
329+
330+
impl ParserResult {
331+
/// Create a new ParserResult with the given pipeline expression
332+
pub fn new(pipeline: PipelineExpression) -> Self {
333+
Self { pipeline }
334+
}
335+
}

0 commit comments

Comments
 (0)