Skip to content

Commit b5f0814

Browse files
OPL/Columnar Query Engine Support some TextExpression variants (open-telemetry#2586)
# Change Summary <!-- Replace with a brief summary of the change in this PR --> OPL / Columnar Query Engine support the `Concat`, `Join` and `Replace` variants of the [`TextExpression`](https://github.com/open-telemetry/otel-arrow/blob/72fba8d2a94cd5e20403875e9214756a86cd405f/rust/experimental/query_engine/expressions/src/scalars/text_scalar_expression.rs#L7-L23). In all these cases, we parse from specially named functions: ```js logs | set attributes["x"] = concat("the", " attribute value ", "is: ", attributes["x"]) logs | set event_name = join(" ", "event happened:", event_name) logs | set event_name = replace(event_name, "otel", "otap") ``` In each of these cases, we use the equivalent datafusion scalar function `concat`, `concat_ws` (for join) and `replace`. Note: `concat_ws` is also used as an alias for `join`. I was thinking this'd be helpful for folks coming from a datafusion/SQL background. So it's equally possibly to write an expression like: ```js logs | set event_name = concat_ws(" ", "event happened:", event_name) ``` In `planner.rs`, I refactored the planning of function arguments into a reusable helper function. ## What issue does this PR close? <!-- We highly recommend correlation of every PR to an issue --> * Related to open-telemetry#2578 ## How are these changes tested? Unit ## Are there any user-facing changes? <!-- If yes, provide further info below --> These new expression types are now supported via the transform processor. ## Future work Will add support for the `TextExpression::Capture` variant of this expression in future PR.
1 parent 5c4f5f2 commit b5f0814

4 files changed

Lines changed: 769 additions & 67 deletions

File tree

rust/otap-dataflow/crates/opl/src/opl.pest

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ attribute_selection_expression = {
5555
identifier_expression ~ ("." ~ member_expression)+
5656
}
5757

58-
argument_list = { expression ~ ("," ~ expression)* }
58+
argument_list = { expression? ~ ("," ~ expression)* }
5959

6060
function_call = { identifier_expression ~ "(" ~ argument_list ~ ")" }
6161

rust/otap-dataflow/crates/opl/src/parser/expression.rs

Lines changed: 245 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@ use std::sync::LazyLock;
55

66
use data_engine_expressions::{
77
AndLogicalExpression, BinaryMathematicalScalarExpression, BooleanScalarExpression,
8-
ContainsLogicalExpression, DoubleScalarExpression, DoubleValue, EqualToLogicalExpression,
9-
Expression, GreaterThanLogicalExpression, GreaterThanOrEqualToLogicalExpression,
10-
IntegerScalarExpression, IntegerValue, InvokeFunctionArgument, InvokeFunctionScalarExpression,
11-
LogicalExpression, MatchesLogicalExpression, MathScalarExpression, NotLogicalExpression,
12-
NullScalarExpression, OrLogicalExpression, QueryLocation, ScalarExpression,
13-
SliceScalarExpression, SourceScalarExpression, StaticScalarExpression, StringScalarExpression,
14-
ValueAccessor,
8+
CollectionScalarExpression, CombineScalarExpression, ContainsLogicalExpression,
9+
DoubleScalarExpression, DoubleValue, EqualToLogicalExpression, Expression,
10+
GreaterThanLogicalExpression, GreaterThanOrEqualToLogicalExpression, IntegerScalarExpression,
11+
IntegerValue, InvokeFunctionArgument, InvokeFunctionScalarExpression, JoinTextScalarExpression,
12+
ListScalarExpression, LogicalExpression, MatchesLogicalExpression, MathScalarExpression,
13+
NotLogicalExpression, NullScalarExpression, OrLogicalExpression, QueryLocation,
14+
ReplaceTextScalarExpression, ScalarExpression, SliceScalarExpression, SourceScalarExpression,
15+
StaticScalarExpression, StringScalarExpression, TextScalarExpression, ValueAccessor,
1516
};
1617
use data_engine_parser_abstractions::{
1718
ParserError, parse_standard_double_literal, parse_standard_integer_literal,
@@ -749,6 +750,37 @@ fn parse_function_call(
749750
))
750751
.into())
751752
}
753+
"concat" => Ok(ScalarExpression::Text(TextScalarExpression::Concat(
754+
CombineScalarExpression::new(
755+
query_location.clone(),
756+
ScalarExpression::Collection(CollectionScalarExpression::List(
757+
ListScalarExpression::new(query_location, args),
758+
)),
759+
),
760+
))
761+
.into()),
762+
"join" | "concat_ws" => {
763+
if args.is_empty() {
764+
return Err(ParserError::SyntaxError(
765+
query_location,
766+
format!(
767+
"Function '{fn_name}' expects at least 1 argument, got {}",
768+
args.len()
769+
),
770+
));
771+
}
772+
let delimiter = args.remove(0);
773+
Ok(
774+
ScalarExpression::Text(TextScalarExpression::Join(JoinTextScalarExpression::new(
775+
query_location.clone(),
776+
delimiter,
777+
ScalarExpression::Collection(CollectionScalarExpression::List(
778+
ListScalarExpression::new(query_location, args),
779+
)),
780+
)))
781+
.into(),
782+
)
783+
}
752784
"matches" => {
753785
if args.len() != 2 {
754786
return Err(ParserError::SyntaxError(
@@ -768,6 +800,31 @@ fn parse_function_call(
768800
))
769801
.into())
770802
}
803+
"replace" => {
804+
if args.len() != 3 {
805+
return Err(ParserError::SyntaxError(
806+
query_location,
807+
format!(
808+
"Function '{fn_name}' expects 3 arguments, got {}",
809+
args.len()
810+
),
811+
));
812+
}
813+
814+
let source = args.remove(0);
815+
let substr = args.remove(0);
816+
let replacement = args.remove(0);
817+
Ok(ScalarExpression::Text(TextScalarExpression::Replace(
818+
ReplaceTextScalarExpression::new(
819+
query_location,
820+
source,
821+
substr,
822+
replacement,
823+
false, // case_insensitive = set to false for OPL
824+
),
825+
))
826+
.into())
827+
}
771828
"substring" => {
772829
if args.len() < 2 || args.len() > 3 {
773830
return Err(ParserError::SyntaxError(
@@ -832,15 +889,17 @@ mod test {
832889

833890
use data_engine_expressions::{
834891
AndLogicalExpression, BinaryMathematicalScalarExpression, BooleanScalarExpression,
835-
ContainsLogicalExpression, DateTimeScalarExpression, DoubleScalarExpression,
836-
EqualToLogicalExpression, GreaterThanLogicalExpression,
837-
GreaterThanOrEqualToLogicalExpression, IntegerScalarExpression, LogicalExpression,
892+
CollectionScalarExpression, CombineScalarExpression, ContainsLogicalExpression,
893+
DateTimeScalarExpression, DoubleScalarExpression, EqualToLogicalExpression,
894+
GreaterThanLogicalExpression, GreaterThanOrEqualToLogicalExpression,
895+
IntegerScalarExpression, JoinTextScalarExpression, ListScalarExpression, LogicalExpression,
838896
MatchesLogicalExpression, MathScalarExpression, NotLogicalExpression, NullScalarExpression,
839897
OrLogicalExpression, PipelineFunction, PipelineFunctionParameter,
840-
PipelineFunctionParameterType, QueryLocation, ScalarExpression, SourceScalarExpression,
841-
StaticScalarExpression, StringScalarExpression, ValueAccessor,
898+
PipelineFunctionParameterType, QueryLocation, ReplaceTextScalarExpression,
899+
ScalarExpression, SourceScalarExpression, StaticScalarExpression, StringScalarExpression,
900+
TextScalarExpression, ValueAccessor,
842901
};
843-
use data_engine_parser_abstractions::{ParserFunction, ParserState};
902+
use data_engine_parser_abstractions::{ParserError, ParserFunction, ParserState};
844903
use pest::Parser;
845904
use pretty_assertions::assert_eq;
846905

@@ -1904,4 +1963,177 @@ mod test {
19041963
.contains("Function 'myfunc' expects 2 arguments, got 1")
19051964
)
19061965
}
1966+
1967+
#[test]
1968+
fn test_parse_concat_function_call() {
1969+
let input = "concat(\"event happened: \", event_name)";
1970+
let mut rules = OplPestParser::parse(Rule::member_expression, input).unwrap();
1971+
assert_eq!(rules.len(), 1);
1972+
1973+
let result: ScalarExpression =
1974+
parse_member_expression(rules.next().unwrap(), default_pipeline_builder().as_ref())
1975+
.unwrap()
1976+
.into();
1977+
1978+
let expected =
1979+
ScalarExpression::Text(TextScalarExpression::Concat(CombineScalarExpression::new(
1980+
QueryLocation::new_fake(),
1981+
ScalarExpression::Collection(CollectionScalarExpression::List(
1982+
ListScalarExpression::new(
1983+
QueryLocation::new_fake(),
1984+
vec![
1985+
ScalarExpression::Static(StaticScalarExpression::String(
1986+
StringScalarExpression::new(
1987+
QueryLocation::new_fake(),
1988+
"event happened: ",
1989+
),
1990+
)),
1991+
ScalarExpression::Source(SourceScalarExpression::new(
1992+
QueryLocation::new_fake(),
1993+
ValueAccessor::new_with_selectors(vec![ScalarExpression::Static(
1994+
StaticScalarExpression::String(StringScalarExpression::new(
1995+
QueryLocation::new_fake(),
1996+
"event_name",
1997+
)),
1998+
)]),
1999+
)),
2000+
],
2001+
),
2002+
)),
2003+
)));
2004+
2005+
assert_eq!(result, expected);
2006+
}
2007+
2008+
#[test]
2009+
fn test_parse_concat_with_delimiter_function_call() {
2010+
// "join" is alias for "concat_ws"
2011+
for fn_name in ["concat_ws", "join"] {
2012+
let input = format!("{fn_name}(\" \", severity_text, \"event happened:\", event_name)");
2013+
let mut rules = OplPestParser::parse(Rule::member_expression, &input).unwrap();
2014+
assert_eq!(rules.len(), 1);
2015+
2016+
let result: ScalarExpression =
2017+
parse_member_expression(rules.next().unwrap(), default_pipeline_builder().as_ref())
2018+
.unwrap()
2019+
.into();
2020+
2021+
let expected =
2022+
ScalarExpression::Text(TextScalarExpression::Join(JoinTextScalarExpression::new(
2023+
QueryLocation::new_fake(),
2024+
ScalarExpression::Static(StaticScalarExpression::String(
2025+
StringScalarExpression::new(QueryLocation::new_fake(), " "),
2026+
)),
2027+
ScalarExpression::Collection(CollectionScalarExpression::List(
2028+
ListScalarExpression::new(
2029+
QueryLocation::new_fake(),
2030+
vec![
2031+
ScalarExpression::Source(SourceScalarExpression::new(
2032+
QueryLocation::new_fake(),
2033+
ValueAccessor::new_with_selectors(vec![
2034+
ScalarExpression::Static(StaticScalarExpression::String(
2035+
StringScalarExpression::new(
2036+
QueryLocation::new_fake(),
2037+
"severity_text",
2038+
),
2039+
)),
2040+
]),
2041+
)),
2042+
ScalarExpression::Static(StaticScalarExpression::String(
2043+
StringScalarExpression::new(
2044+
QueryLocation::new_fake(),
2045+
"event happened:",
2046+
),
2047+
)),
2048+
ScalarExpression::Source(SourceScalarExpression::new(
2049+
QueryLocation::new_fake(),
2050+
ValueAccessor::new_with_selectors(vec![
2051+
ScalarExpression::Static(StaticScalarExpression::String(
2052+
StringScalarExpression::new(
2053+
QueryLocation::new_fake(),
2054+
"event_name",
2055+
),
2056+
)),
2057+
]),
2058+
)),
2059+
],
2060+
),
2061+
)),
2062+
)));
2063+
2064+
assert_eq!(result, expected);
2065+
}
2066+
}
2067+
2068+
#[test]
2069+
fn test_parse_replace_with_delimiter_function_call() {
2070+
let input = "replace(severity_text, \"N\", \"M\")";
2071+
let mut rules = OplPestParser::parse(Rule::member_expression, input).unwrap();
2072+
assert_eq!(rules.len(), 1);
2073+
2074+
let result: ScalarExpression =
2075+
parse_member_expression(rules.next().unwrap(), default_pipeline_builder().as_ref())
2076+
.unwrap()
2077+
.into();
2078+
2079+
let expected = ScalarExpression::Text(TextScalarExpression::Replace(
2080+
ReplaceTextScalarExpression::new(
2081+
QueryLocation::new_fake(),
2082+
ScalarExpression::Source(SourceScalarExpression::new(
2083+
QueryLocation::new_fake(),
2084+
ValueAccessor::new_with_selectors(vec![ScalarExpression::Static(
2085+
StaticScalarExpression::String(StringScalarExpression::new(
2086+
QueryLocation::new_fake(),
2087+
"severity_text",
2088+
)),
2089+
)]),
2090+
)),
2091+
ScalarExpression::Static(StaticScalarExpression::String(
2092+
StringScalarExpression::new(QueryLocation::new_fake(), "N"),
2093+
)),
2094+
ScalarExpression::Static(StaticScalarExpression::String(
2095+
StringScalarExpression::new(QueryLocation::new_fake(), "M"),
2096+
)),
2097+
false,
2098+
),
2099+
));
2100+
2101+
assert_eq!(result, expected);
2102+
}
2103+
2104+
fn parse_known_func_with_args(
2105+
fn_name: &str,
2106+
args: &[&str],
2107+
) -> Result<LogicalOrScalarExpr, ParserError> {
2108+
let input = format!("{}({})", fn_name, args.join(", "));
2109+
let mut parser_state = ParserState::new("");
2110+
let pipeline_builder = RootPipelineBuilder::new(&mut parser_state);
2111+
let mut rules = OplPestParser::parse(Rule::member_expression, &input).unwrap();
2112+
parse_member_expression(rules.next().unwrap(), &pipeline_builder)
2113+
}
2114+
2115+
#[test]
2116+
fn parse_replace_function_call_with_wrong_arity() {
2117+
for args in [
2118+
vec![],
2119+
vec!["one"],
2120+
vec!["one", "two"],
2121+
vec!["one", "two", "three", "four"],
2122+
] {
2123+
let err = parse_known_func_with_args("replace", &args).unwrap_err();
2124+
assert_eq!(
2125+
err.to_string(),
2126+
format!("Function 'replace' expects 3 arguments, got {}", args.len())
2127+
)
2128+
}
2129+
}
2130+
2131+
#[test]
2132+
fn parse_contains_ws_function_call_with_wrong_arity() {
2133+
let err = parse_known_func_with_args("concat_ws", &[]).unwrap_err();
2134+
assert_eq!(
2135+
err.to_string(),
2136+
"Function 'concat_ws' expects at least 1 argument, got 0".to_string(),
2137+
);
2138+
}
19072139
}

0 commit comments

Comments
 (0)