Skip to content

Commit 1f1c6bc

Browse files
committed
parse to new 'GetRecordType' ScalarExpression
1 parent 5ad6031 commit 1f1c6bc

5 files changed

Lines changed: 70 additions & 46 deletions

File tree

rust/experimental/query_engine/engine-recordset/src/scalars/scalar_expressions.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,12 @@ where
326326
ScalarExpression::Math(m) => {
327327
return execute_math_scalar_expression(execution_context, m);
328328
}
329+
ScalarExpression::GetRecordType(r) => {
330+
return Err(ExpressionError::NotSupported(
331+
r.get_query_location().clone(),
332+
format!("{} not yet supported in RecordSet engine", r.get_name()),
333+
));
334+
}
329335
ScalarExpression::GetType(g) => {
330336
let value_type =
331337
execute_scalar_expression(execution_context, g.get_value())?.get_value_type();

rust/experimental/query_engine/expressions/src/scalars/scalar_expressions.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ pub enum ScalarExpression {
4343
/// Returns the type string for the inner expression.
4444
GetType(GetTypeScalarExpression),
4545

46+
/// Returns the type string for the record.
47+
GetRecordType(GetRecordTypeScalarExpression),
48+
4649
/// Invoke a user-defined function.
4750
InvokeFunction(InvokeFunctionScalarExpression),
4851

@@ -101,6 +104,7 @@ impl ScalarExpression {
101104
ScalarExpression::Constant(c) => c.try_resolve_value_type(scope),
102105
ScalarExpression::Collection(c) => c.try_resolve_value_type(scope),
103106
ScalarExpression::GetType(_) => Ok(Some(ValueType::String)),
107+
ScalarExpression::GetRecordType(_) => Ok(Some(ValueType::String)),
104108
ScalarExpression::Logical(_) => Ok(Some(ValueType::Boolean)),
105109
ScalarExpression::Coalesce(c) => c.try_resolve_value_type(scope),
106110
ScalarExpression::Conditional(c) => c.try_resolve_value_type(scope),
@@ -205,6 +209,7 @@ impl ScalarExpression {
205209
ScalarExpression::Case(c) => c.try_resolve_static(scope),
206210
ScalarExpression::Convert(c) => c.try_resolve_static(scope),
207211
ScalarExpression::GetType(g) => g.try_resolve_static(scope),
212+
ScalarExpression::GetRecordType(g) => g.try_resolve_static(scope),
208213
ScalarExpression::Length(l) => l.try_resolve_static(scope),
209214
ScalarExpression::Slice(s) => s.try_resolve_static(scope),
210215
ScalarExpression::Parse(p) => p.try_resolve_static(scope),
@@ -231,6 +236,7 @@ impl Expression for ScalarExpression {
231236
ScalarExpression::Constant(c) => c.get_query_location(),
232237
ScalarExpression::Collection(c) => c.get_query_location(),
233238
ScalarExpression::GetType(g) => g.get_query_location(),
239+
ScalarExpression::GetRecordType(g) => g.get_query_location(),
234240
ScalarExpression::Logical(l) => l.get_query_location(),
235241
ScalarExpression::Coalesce(c) => c.get_query_location(),
236242
ScalarExpression::Conditional(c) => c.get_query_location(),
@@ -256,6 +262,7 @@ impl Expression for ScalarExpression {
256262
ScalarExpression::Static(s) => s.get_name(),
257263
ScalarExpression::Collection(_) => "ScalarExpression(Collection)",
258264
ScalarExpression::GetType(_) => "ScalarExpression(GetType)",
265+
ScalarExpression::GetRecordType(_) => "ScalarExpression(GetRecordType)",
259266
ScalarExpression::Logical(_) => "ScalarExpression(Logical)",
260267
ScalarExpression::Coalesce(_) => "ScalarExpression(Coalesce)",
261268
ScalarExpression::Conditional(_) => "ScalarExpression(Conditional)",
@@ -284,6 +291,7 @@ impl Expression for ScalarExpression {
284291
ScalarExpression::Conditional(c) => c.fmt_with_indent(f, indent),
285292
ScalarExpression::Convert(c) => c.fmt_with_indent(f, indent),
286293
ScalarExpression::GetType(g) => g.fmt_with_indent(f, indent),
294+
ScalarExpression::GetRecordType(g) => g.fmt_with_indent(f, indent),
287295
ScalarExpression::Temporal(t) => t.fmt_with_indent(f, indent),
288296
ScalarExpression::Length(l) => l.fmt_with_indent(f, indent),
289297
ScalarExpression::Logical(l) => l.fmt_with_indent(f, indent),
@@ -1307,6 +1315,39 @@ impl Expression for GetTypeScalarExpression {
13071315
}
13081316
}
13091317

1318+
#[derive(Debug, Clone, PartialEq)]
1319+
pub struct GetRecordTypeScalarExpression {
1320+
query_location: QueryLocation,
1321+
}
1322+
1323+
impl GetRecordTypeScalarExpression {
1324+
pub fn new(query_location: QueryLocation) -> GetRecordTypeScalarExpression {
1325+
Self { query_location }
1326+
}
1327+
1328+
pub(crate) fn try_resolve_static(
1329+
&mut self,
1330+
_scope: &PipelineResolutionScope,
1331+
) -> ScalarStaticResolutionResult<'_> {
1332+
ScalarStaticResolutionResult::Ok(None)
1333+
}
1334+
}
1335+
1336+
impl Expression for GetRecordTypeScalarExpression {
1337+
fn get_query_location(&self) -> &QueryLocation {
1338+
&self.query_location
1339+
}
1340+
1341+
fn get_name(&self) -> &'static str {
1342+
"GetRecordTypeScalarExpression"
1343+
}
1344+
1345+
fn fmt_with_indent(&self, f: &mut std::fmt::Formatter<'_>, _indent: &str) -> std::fmt::Result {
1346+
writeln!(f, "GetRecordTypeScalarExpression")?;
1347+
Ok(())
1348+
}
1349+
}
1350+
13101351
#[derive(Debug, Clone, PartialEq)]
13111352
pub struct SelectScalarExpression {
13121353
query_location: QueryLocation,

rust/experimental/query_engine/kql-parser/src/scalar_expression.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,7 @@ pub(crate) fn try_resolve_identifier(
659659
ScalarExpression::Slice(_) => Ok(None),
660660
ScalarExpression::Static(_) => Ok(None),
661661
ScalarExpression::Text(_) => Ok(None),
662+
ScalarExpression::GetRecordType(_) => Ok(None),
662663
ScalarExpression::GetType(g) => {
663664
if let Some(mut i) = try_resolve_identifier(g.get_value(), scope)? {
664665
i.insert(0, "type".into());
@@ -667,6 +668,7 @@ pub(crate) fn try_resolve_identifier(
667668

668669
Ok(None)
669670
}
671+
670672
ScalarExpression::Select(s) => {
671673
if let Some(mut value) = try_resolve_identifier(s.get_value(), scope)?
672674
&& let ScalarExpression::Static(StaticScalarExpression::Array(selectors)) =

rust/otap-dataflow/crates/opl/src/parser/expression.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ use data_engine_expressions::{
77
AndLogicalExpression, BinaryMathematicalScalarExpression, BooleanScalarExpression,
88
CaptureTextScalarExpression, CollectionScalarExpression, CombineScalarExpression,
99
ContainsLogicalExpression, DateTimeScalarExpression, DoubleScalarExpression, DoubleValue,
10-
EqualToLogicalExpression, Expression, GetTypeScalarExpression, GreaterThanLogicalExpression,
11-
GreaterThanOrEqualToLogicalExpression, IntegerScalarExpression, IntegerValue,
12-
InvokeFunctionArgument, InvokeFunctionScalarExpression, JoinTextScalarExpression,
10+
EqualToLogicalExpression, Expression, GetRecordTypeScalarExpression,
11+
GreaterThanLogicalExpression, GreaterThanOrEqualToLogicalExpression, IntegerScalarExpression,
12+
IntegerValue, InvokeFunctionArgument, InvokeFunctionScalarExpression, JoinTextScalarExpression,
1313
ListScalarExpression, LogicalExpression, MatchesLogicalExpression, MathScalarExpression,
1414
NotLogicalExpression, NullScalarExpression, OrLogicalExpression, QueryLocation,
1515
RegexScalarExpression, ReplaceTextScalarExpression, ScalarExpression, SliceScalarExpression,
@@ -215,13 +215,9 @@ pub(crate) fn parse_type_check_expression(
215215
// IF there are two rules, we have an expression like (is <Type>) meaning we're checking
216216
// that an element of the stream is some type
217217
2 => {
218-
let type_check_expr = ScalarExpression::GetType(GetTypeScalarExpression::new(
219-
type_check_rule_query_location.clone(),
220-
ScalarExpression::Source(SourceScalarExpression::new(
221-
type_check_rule_query_location.clone(),
222-
ValueAccessor::new_with_selectors(Vec::new()),
223-
)),
224-
));
218+
let type_check_expr = ScalarExpression::GetRecordType(
219+
GetRecordTypeScalarExpression::new(type_check_rule_query_location.clone()),
220+
);
225221

226222
let type_name_rule = inner_rules.nth(1).expect("two rules");
227223
let type_check_expected_type = ScalarExpression::Static(

rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs

Lines changed: 15 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -479,46 +479,25 @@ impl FilterPlan {
479479
) -> Result<Option<Self>> {
480480
match (left_expr, binary_op, right_expr) {
481481
(
482-
ScalarExpression::GetType(get_type_expr),
482+
ScalarExpression::GetRecordType(_),
483483
Operator::Eq,
484484
ScalarExpression::Static(StaticScalarExpression::String(typename_expr)),
485485
) => {
486-
let source_value_accessor = match get_type_expr.get_value() {
487-
ScalarExpression::Source(source_scalar_expr) => {
488-
source_scalar_expr.get_value_accessor()
489-
}
490-
_ => {
491-
// the source for the get type expression isn't something that we can check
492-
// determine to statically be the identifier of a type, defer to expression
493-
// evaluation for this filter
494-
return Ok(None);
495-
}
496-
};
486+
// since the source accessor has no selectors, it means we're checking the type
487+
// of elements of the stream for this pipeline. We'll try to determine if the
488+
// type name is a stream type that is handled by this query engine ...
489+
let type_name = typename_expr.get_value();
490+
let stream_element_type =
491+
StreamElementType::from_str(type_name).ok_or_else(|| {
492+
Error::InvalidPipelineError {
493+
cause: format!("Unknown stream type name {type_name}"),
494+
query_location: Some(right_expr.get_query_location().clone()),
495+
}
496+
})?;
497497

498-
if !source_value_accessor.has_selectors() {
499-
// since the source accessor has no selectors, it means we're checking the type
500-
// of elements of the stream for this pipeline. We'll try to determine if the
501-
// type name is a stream type that is handled by this query engine ...
502-
let type_name = typename_expr.get_value();
503-
let stream_element_type =
504-
StreamElementType::from_str(type_name).ok_or_else(|| {
505-
Error::InvalidPipelineError {
506-
cause: format!("Unknown stream type name {type_name}"),
507-
query_location: Some(right_expr.get_query_location().clone()),
508-
}
509-
})?;
510-
511-
Ok(Some(FilterPlan::from(ElementTypeFilter::new(
512-
stream_element_type,
513-
))))
514-
} else {
515-
// TODO - we will support this soon
516-
Err(Error::NotYetSupportedError {
517-
message:
518-
"Checking if a field of stream element is a type is not yet supported"
519-
.into(),
520-
})
521-
}
498+
Ok(Some(FilterPlan::from(ElementTypeFilter::new(
499+
stream_element_type,
500+
))))
522501
}
523502
_ => Ok(None),
524503
}

0 commit comments

Comments
 (0)