Skip to content

Commit f46666c

Browse files
committed
fix: Expression handling improvements for DataFusion integration
Ported from spiceai-51 patches: - Skip unsupported expressions in make_vortex_predicate instead of erroring (#8) - Handle empty IN lists: x IN () returns false, x NOT IN () returns true (#8) - Allow DynamicFilterPhysicalExpr pushdown (#8) - Remove is_dynamic_physical_expr check that blocked all dynamic filters - Add Debug impl for BtrBlocksCompressor to satisfy CompressorPlugin bounds
1 parent a9ef29d commit f46666c

2 files changed

Lines changed: 28 additions & 11 deletions

File tree

vortex-btrblocks/src/canonical_compressor.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,16 @@ pub struct BtrBlocksCompressor {
101101
pub string_schemes: Vec<&'static dyn StringScheme>,
102102
}
103103

104+
impl std::fmt::Debug for BtrBlocksCompressor {
105+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106+
f.debug_struct("BtrBlocksCompressor")
107+
.field("int_schemes", &self.int_schemes.len())
108+
.field("float_schemes", &self.float_schemes.len())
109+
.field("string_schemes", &self.string_schemes.len())
110+
.finish()
111+
}
112+
}
113+
104114
impl Default for BtrBlocksCompressor {
105115
fn default() -> Self {
106116
BtrBlocksCompressorBuilder::default().build()

vortex-datafusion/src/convert/exprs.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use datafusion_physical_expr::ScalarFunctionExpr;
1616
use datafusion_physical_expr::projection::ProjectionExpr;
1717
use datafusion_physical_expr::projection::ProjectionExprs;
1818
use datafusion_physical_expr::utils::collect_columns;
19-
use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
2019
use datafusion_physical_plan::expressions as df_expr;
2120
use itertools::Itertools;
2221
use vortex::dtype::DType;
@@ -52,10 +51,13 @@ pub(crate) fn make_vortex_predicate(
5251
expr_convertor: &dyn ExpressionConvertor,
5352
predicate: &[Arc<dyn PhysicalExpr>],
5453
) -> DFResult<Option<Expression>> {
55-
let exprs = predicate
54+
let exprs: Vec<_> = predicate
5655
.iter()
57-
.map(|e| expr_convertor.convert(e.as_ref()))
58-
.collect::<DFResult<Vec<_>>>()?;
56+
.filter_map(|e| {
57+
// If conversion fails, skip this expression (equivalent to lit(true) in AND conjunction)
58+
expr_convertor.convert(e.as_ref()).ok()
59+
})
60+
.collect();
5961

6062
Ok(and_collect(exprs))
6163
}
@@ -221,8 +223,13 @@ impl ExpressionConvertor for DefaultExpressionConvertor {
221223
})
222224
.try_collect()?;
223225

226+
// Handle empty IN list: `x IN ()` is always false, `x NOT IN ()` is always true
227+
let Some(first_element) = list_elements.first() else {
228+
return Ok(lit(in_list.negated()));
229+
};
230+
224231
let list = Scalar::list(
225-
list_elements[0].dtype().clone(),
232+
first_element.dtype().clone(),
226233
list_elements,
227234
Nullability::Nullable,
228235
);
@@ -361,12 +368,6 @@ fn try_operator_from_df(value: &DFOperator) -> DFResult<Operator> {
361368
}
362369

363370
fn can_be_pushed_down_impl(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
364-
// We currently do not support pushdown of dynamic expressions in DF.
365-
// See issue: https://github.com/vortex-data/vortex/issues/4034
366-
if is_dynamic_physical_expr(df_expr) {
367-
return false;
368-
}
369-
370371
let expr = df_expr.as_any();
371372
if let Some(binary) = expr.downcast_ref::<df_expr::BinaryExpr>() {
372373
can_binary_be_pushed_down(binary, schema)
@@ -396,6 +397,12 @@ fn can_be_pushed_down_impl(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) ->
396397
.all(|e| can_be_pushed_down_impl(e, schema))
397398
} else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() {
398399
can_scalar_fn_be_pushed_down(scalar_fn)
400+
} else if expr
401+
.downcast_ref::<df_expr::DynamicFilterPhysicalExpr>()
402+
.is_some()
403+
{
404+
// Assume dynamic filters can be pushed down - the child won't be specified until execution time
405+
true
399406
} else {
400407
tracing::debug!(%df_expr, "DataFusion expression can't be pushed down");
401408
false

0 commit comments

Comments
 (0)