Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 2 additions & 28 deletions rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ use crate::pipeline::expr::{
};
use crate::pipeline::planner::{AttributesIdentifier, ColumnAccessor};
use crate::pipeline::project::anyval::{
fill_null_type_as_empty, is_any_value_data_type, project_any_value_columns,
wrap_as_any_value_struct,
attempt_coerce_value_column_from_any_value_struct_column, fill_null_type_as_empty,
is_any_value_data_type, wrap_as_any_value_struct,
};
use crate::pipeline::project::{ProjectedSchemaColumn, Projection};
use crate::pipeline::state::ExecutionState;
Expand Down Expand Up @@ -1660,32 +1660,6 @@ fn coerce_to_any_value_struct_column(values: ArrayRef) -> Result<ArrayRef> {
}
}

/// Attempt to coerce the values array (a struct array representing an AnyValue) into the concrete
/// values type. If the passed array contains multiple types, the original array is returned
/// because coercion was not successful.
fn attempt_coerce_value_column_from_any_value_struct_column(values: &ArrayRef) -> Result<ArrayRef> {
// build a temporary record batch containing the column to maybe partition by type
let rb = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"",
values.data_type().clone(),
true,
)])),
vec![Arc::clone(values)],
)?;

// attempt to partition the AnyValue column by type
let partitions = project_any_value_columns(&rb, &[0])?;
let result = if partitions.len() == 1 {
partitions[0].batch.column(0)
} else {
// just return the original array
values
};

Ok(Arc::clone(result))
}

/// Inserts the column into the record batch if the column does not exist, otherwise replaces the
/// existing column with the new one.
///
Expand Down
237 changes: 155 additions & 82 deletions rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ use std::sync::Arc;
use crate::consts::BODY_FIELD_NAME;
use crate::error::{Error, Result};
use crate::pipeline::PipelineStage;
use crate::pipeline::expr::types::coerce_arithmetic;
use crate::pipeline::expr::{
DataScope, ExprLogicalPlanner, ExprPhysicalPlanner, LogicalExprDataSource, ScopedLogicalExpr,
ScopedPhysicalExpr,
DataScope, ExprLogicalPlanner, ExprPhysicalPlanner, LogicalExprDataSource,
PhysicalExprEvalResult, ScopedLogicalExpr, ScopedPhysicalExpr, join::join,
};
use crate::pipeline::expr::{LEFT_COLUMN_NAME, RIGHT_COLUMN_NAME};
use crate::pipeline::filter::compare::compare;
use crate::pipeline::functions::expr_fn::contains;
use crate::pipeline::planner::{
AttributesIdentifier, BinaryArg, ColumnAccessor, try_attrs_value_filter_from_literal,
try_static_scalar_to_any_val_column, try_static_scalar_to_attr_literal,
try_static_scalar_to_literal_for_column,
};
use crate::pipeline::project::Projection;
use crate::pipeline::project::anyval::{
attempt_coerce_value_column_from_any_value_struct_column, is_any_value_data_type,
};
use crate::pipeline::state::ExecutionState;
use arrow::array::{Array, BooleanArray, BooleanBufferBuilder, RecordBatch, UInt16Array};
use arrow::buffer::BooleanBuffer;
Expand Down Expand Up @@ -44,6 +50,7 @@ use otap_df_pdata::otap::filter::{ChildBatchFilterIdHelper, IdBitmap, IdBitmapPo
use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType;
use otap_df_pdata::schema::consts;

mod compare;
pub mod optimize;

/// A compositional tree structure for combining expressions with boolean operators.
Expand Down Expand Up @@ -175,7 +182,7 @@ pub struct FilterPlan {
///
/// This uses the same expression evaluation infrastructure as `set` expressions, including
/// support for joins across data scopes.
pub expr_filter: Option<ScopedLogicalExpr>,
pub expr_filter: Option<ExprFilterPlan>,
}

impl From<Expr> for FilterPlan {
Expand Down Expand Up @@ -210,7 +217,7 @@ impl From<Composite<AttributesFilterPlan>> for FilterPlan {

impl FilterPlan {
/// Create a FilterPlan that uses the general expression evaluation path.
fn from_expr(expr: ScopedLogicalExpr) -> Self {
fn from_expr(expr: ExprFilterPlan) -> Self {
Self {
source_filter: None,
attribute_filter: None,
Expand Down Expand Up @@ -470,73 +477,15 @@ impl FilterPlan {
functions: &[PipelineFunction],
) -> Result<Self> {
let planner = ExprLogicalPlanner::default();
let left = planner.plan_scalar_expr(left_expr, functions)?;
let right = planner.plan_scalar_expr(right_expr, functions)?;

// build the comparison expression using the expr system's scoping logic
let expr = Self::build_scoped_comparison_expr(left, binary_op, right)?;
Ok(FilterPlan::from_expr(expr))
}

/// Build a `ScopedLogicalExpr` that performs a boolean comparison (eq, gt, etc.) on the
/// results of two child expressions. Handles both same-scope and cross-scope cases.
///
/// Type coercion is applied to ensure both sides have compatible types for the comparison
/// (e.g., Int32 vs Int64 will have the narrower side cast to Int64).
fn build_scoped_comparison_expr(
mut left: ScopedLogicalExpr,
binary_op: Operator,
mut right: ScopedLogicalExpr,
) -> Result<ScopedLogicalExpr> {
use crate::pipeline::expr::types::{ExprLogicalType, coerce_arithmetic};
use crate::pipeline::expr::{LEFT_COLUMN_NAME, RIGHT_COLUMN_NAME};

// Apply type coercion so both sides of the comparison have compatible types.
// We reuse the arithmetic coercion rules (which handle Int32 vs Int64, AnyValue vs
// concrete types, etc.) -- the side-effect of adding cast expressions is what we need.
// We ignore the returned result type since comparisons always produce Boolean.
let _ = coerce_arithmetic(&mut left, &mut right);

// check if both sides can be evaluated in the same scope (no join needed)
let possible_combined_scope = match (&left.source, &right.source) {
(
LogicalExprDataSource::DataSource(left_scope),
LogicalExprDataSource::DataSource(right_scope),
) => left_scope
.can_combine(right_scope)
.then_some(if !left_scope.is_scalar() {
left_scope
} else {
right_scope
}),
_ => None,
};

if let Some(combined_scope) = possible_combined_scope {
let dict_downcast = left.requires_dict_downcast || right.requires_dict_downcast;
Ok(ScopedLogicalExpr {
logical_expr: Expr::BinaryExpr(BinaryExpr::new(
Box::new(left.logical_expr),
binary_op,
Box::new(right.logical_expr),
)),
source: LogicalExprDataSource::DataSource(combined_scope.clone()),
expr_type: ExprLogicalType::Boolean,
requires_dict_downcast: dict_downcast,
})
} else {
// different scopes -- need a join
Ok(ScopedLogicalExpr {
logical_expr: Expr::BinaryExpr(BinaryExpr::new(
Box::new(col(LEFT_COLUMN_NAME)),
binary_op,
Box::new(col(RIGHT_COLUMN_NAME)),
)),
source: LogicalExprDataSource::Join(Box::new(left), Box::new(right)),
expr_type: ExprLogicalType::Boolean,
requires_dict_downcast: true,
})
}
Ok(FilterPlan::from_expr(ExprFilterPlan::Binary(
ExprBinaryFilterPlan {
left: planner.plan_scalar_expr(left_expr, functions)?,
operator: binary_op,
right: planner.plan_scalar_expr(right_expr, functions)?,
},
)))
}

fn try_from_contains_expr(
Expand Down Expand Up @@ -613,7 +562,7 @@ impl FilterPlan {

// Build a contains function call as a ScopedLogicalExpr
let expr = Self::build_scoped_contains_expr(haystack, needle)?;
Ok(FilterPlan::from_expr(expr))
Ok(FilterPlan::from_expr(ExprFilterPlan::Unary(expr)))
}

/// Build a `ScopedLogicalExpr` that performs a contains check on two expressions.
Expand Down Expand Up @@ -904,7 +853,9 @@ impl Composite<FilterPlan> {
other => {
let planner = ExprLogicalPlanner::default();
let expr = planner.plan_scalar_expr(other, functions)?;
Ok(Self::from(FilterPlan::from_expr(expr)))
Ok(Self::from(FilterPlan::from_expr(ExprFilterPlan::Unary(
expr,
))))
}
},
}
Expand Down Expand Up @@ -934,9 +885,8 @@ impl ToExec for FilterPlan {
.expr_filter
.as_ref()
.map(|logical_expr| {
let planner = ExprPhysicalPlanner::default();
// clone the logical expr since into_physical consumes it
planner.plan(logical_expr.clone())
ExprFilterExec::try_from_plan(logical_expr.clone())
})
.transpose()?;

Expand Down Expand Up @@ -1043,6 +993,24 @@ impl Composite<AttributesFilterPlan> {
}
}

/// Filter plan representing a selection vector that will be created from the result of expression
/// evaluations.
#[derive(Clone, Debug, PartialEq)]
pub enum ExprFilterPlan {
/// A unary expression evaluation that should create boolean value
Unary(ScopedLogicalExpr),

/// The selection vector will be created by evaluating both sides comparing the results
Binary(ExprBinaryFilterPlan),
}

#[derive(Clone, Debug, PartialEq)]
pub struct ExprBinaryFilterPlan {
left: ScopedLogicalExpr,
operator: Operator,
right: ScopedLogicalExpr,
}

fn to_physical_exprs(
expr: &Expr,
record_batch: &RecordBatch,
Expand All @@ -1066,7 +1034,7 @@ pub struct FilterExec {
/// General-purpose expression-based predicate. Evaluated using the expression evaluation
/// system (supporting joins across data scopes). The result is converted to a
/// `BooleanArray` aligned to the root record batch.
expr_predicate: Option<ScopedPhysicalExpr>,
expr_predicate: Option<ExprFilterExec>,

/// determines how we treat rows that where there are no attributes. if false, this cause the
/// row not to pass the filter, unless this is true which it should be set it as for filters/
Expand Down Expand Up @@ -1206,7 +1174,7 @@ impl FilterExec {
Ok(result)
}

/// Evaluates a [`ScopedPhysicalExpr`] and converts the result to a `BooleanArray` selection
/// Evaluates a [`ExprFilterExec`] and converts the result to a `BooleanArray` selection
/// vector aligned to the root record batch.
///
/// The expression may produce results from any data scope (root, attributes, join result, or
Expand All @@ -1215,18 +1183,88 @@ impl FilterExec {
/// - Scalar scope: broadcast the scalar boolean to all rows
/// - Attributes scope: use parent_id -> root join to align
fn evaluate_expr_predicate(
expr_pred: &mut ScopedPhysicalExpr,
expr_pred: &mut ExprFilterExec,
otap_batch: &OtapArrowRecords,
session_ctx: &SessionContext,
root_rb: &RecordBatch,
) -> Result<BooleanArray> {
let num_rows = root_rb.num_rows();

let eval_result = match expr_pred.execute(otap_batch, session_ctx)? {
Some(result) => result,
None => {
// expression result was null/absent -- treat as all rows failing the filter
return Ok(BooleanArray::new(BooleanBuffer::new_unset(num_rows), None));
let eval_result = match expr_pred {
ExprFilterExec::Unary(expr_pred) => {
match expr_pred.execute(otap_batch, session_ctx)? {
Some(result) => result,
None => {
// expression result was null/absent -- treat as all rows failing the filter
return Ok(BooleanArray::new(BooleanBuffer::new_unset(num_rows), None));
}
}
}
ExprFilterExec::Binary(expr_pred) => {
let mut left_result = expr_pred
.left
.execute(otap_batch, session_ctx)?
.unwrap_or(PhysicalExprEvalResult::new_scalar(ScalarValue::Null));
let mut right_result = expr_pred
.right
.execute(otap_batch, session_ctx)?
.unwrap_or(PhysicalExprEvalResult::new_scalar(ScalarValue::Null));

// coerce the any-value structs that may have been returned from either side of
// the operation into a single column of values if the type distribution is uniform
// across all rows ...
if let ColumnarValue::Array(arr) = &left_result.values {
if is_any_value_data_type(arr.data_type()) {
let coerced_value_col =
attempt_coerce_value_column_from_any_value_struct_column(arr)?;
left_result.values = ColumnarValue::Array(coerced_value_col);
}
}
if let ColumnarValue::Array(arr) = &right_result.values {
if is_any_value_data_type(arr.data_type()) {
let coerced_value_col =
attempt_coerce_value_column_from_any_value_struct_column(arr)?;
right_result.values = ColumnarValue::Array(coerced_value_col);
}
}

// Check if the results of the left & right expressions effectively have
// equivalent row orders. If not, we need to align them by performing a join.
if !left_result.data_scope.can_combine(&right_result.data_scope) {
let (join_rb, joined_scope) = join(&left_result, &right_result, otap_batch)?;
// safety: we can expect here because `join` will always create columns with
// names "left" and "right"
let left = join_rb
.column_by_name(LEFT_COLUMN_NAME)
.expect("left column present");
let right = join_rb
.column_by_name(RIGHT_COLUMN_NAME)
.expect("right column present");
let left = ColumnarValue::Array(Arc::clone(left));
let right = ColumnarValue::Array(Arc::clone(right));
let selection_vec = compare(&left, expr_pred.operator, &right)?;
PhysicalExprEvalResult::new(
ColumnarValue::Array(Arc::new(selection_vec)),
joined_scope,
&join_rb,
)
} else {
let selection_vec = compare(
&left_result.values,
expr_pred.operator,
&right_result.values,
)?;
let result = ColumnarValue::Array(Arc::new(selection_vec));

// reuse the existing result after comparison
if left_result.data_scope.is_scalar() {
right_result.values = result;
right_result
} else {
left_result.values = result;
left_result
}
}
}
};

Expand Down Expand Up @@ -1309,7 +1347,7 @@ impl FilterExec {
/// predicate.
fn align_attrs_result_to_root(
boolean_arr: &BooleanArray,
eval_result: &crate::pipeline::expr::PhysicalExprEvalResult,
eval_result: &PhysicalExprEvalResult,
attrs_id: AttributesIdentifier,
otap_batch: &OtapArrowRecords,
root_rb: &RecordBatch,
Expand Down Expand Up @@ -1694,6 +1732,41 @@ impl Composite<AttributeFilterExec> {
}
}

#[allow(clippy::large_enum_variant)]
pub enum ExprFilterExec {
Unary(ScopedPhysicalExpr),
Binary(ExprBinaryFilterExec),
}

pub struct ExprBinaryFilterExec {
left: ScopedPhysicalExpr,
operator: Operator,
right: ScopedPhysicalExpr,
}

impl ExprFilterExec {
fn try_from_plan(logical_expr: ExprFilterPlan) -> Result<Self> {
let planner = ExprPhysicalPlanner::default();

Ok(match logical_expr.clone() {
ExprFilterPlan::Unary(plan) => Self::Unary(planner.plan(plan)?),
ExprFilterPlan::Binary(mut binary_plan) => {
// Apply type coercion so both sides of the comparison have compatible types. We
// We reuse the arithmetic coercion rules (which handle Int32 vs Int64, AnyValue vs
// concrete types, etc.). The side-effect of adding cast expressions is what we
// need. We ignore the returned result type since comparisons always produce bool.
_ = coerce_arithmetic(&mut binary_plan.left, &mut binary_plan.right);

Self::Binary(ExprBinaryFilterExec {
left: planner.plan(binary_plan.left)?,
operator: binary_plan.operator,
right: planner.plan(binary_plan.right)?,
})
}
})
}
}

/// This is responsible for evaluating a [`PhysicalExpr`](datafusion::physical_expr::PhysicalExpr)
/// while adapting to schema changes that may be encountered between evaluations.
///
Expand Down
Loading
Loading