Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 121 additions & 8 deletions vortex-datafusion/src/convert/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,14 @@ pub(crate) fn make_vortex_predicate(
return None;
}

let expr = Expression::try_from_df(e.as_ref());

Some(expr)
// If conversion fails, skip this expression (equivalent to lit(true) in AND conjunction)
Expression::try_from_df(e.as_ref()).ok()
})
.collect::<VortexResult<Vec<_>>>()?;
.collect::<Vec<_>>();

Ok(exprs.into_iter().reduce(and))
}

// TODO(joe): Don't return an error when we have an unsupported node, bubble up "TRUE" as in keep
// for that node, up to any `and` or `or` node.
impl TryFromDataFusion<dyn PhysicalExpr> for Expression {
fn try_from_df(df: &dyn PhysicalExpr) -> VortexResult<Self> {
if let Some(binary_expr) = df.as_any().downcast_ref::<df_expr::BinaryExpr>() {
Expand Down Expand Up @@ -112,8 +109,13 @@ impl TryFromDataFusion<dyn PhysicalExpr> for Expression {
})
.try_collect()?;

// Handle empty IN list: `x IN ()` is always false, `x NOT IN ()` is always true
let Some(first_element) = list_elements.first() else {
return Ok(lit(in_list.negated()));
};

let list = Scalar::list(
list_elements[0].dtype().clone(),
first_element.dtype().clone(),
list_elements,
Nullability::Nullable,
);
Expand Down Expand Up @@ -294,7 +296,7 @@ fn supported_data_types(dt: &DataType) -> bool {
);

if !is_supported {
log::debug!("DataFusion data type {dt:?} is not supported");
tracing::debug!(data_type = ?dt, "DataFusion data type is not supported");
}

is_supported
Expand Down Expand Up @@ -502,6 +504,28 @@ mod tests {
);
}

#[test]
fn test_expr_from_df_in_list_empty_returns_false() {
// `x IN ()` should return lit(false)
let value = Arc::new(df_expr::Column::new("col", 0)) as Arc<dyn PhysicalExpr>;
let in_list = df_expr::InListExpr::new(value, vec![], false, None);

let result = Expression::try_from_df(&in_list).unwrap();

assert_snapshot!(result.display_tree().to_string(), @"vortex.literal false");
}

#[test]
fn test_expr_from_df_in_list_empty_negated_returns_true() {
// `x NOT IN ()` should return lit(true)
let value = Arc::new(df_expr::Column::new("col", 0)) as Arc<dyn PhysicalExpr>;
let in_list = df_expr::InListExpr::new(value, vec![], true, None);

let result = Expression::try_from_df(&in_list).unwrap();

assert_snapshot!(result.display_tree().to_string(), @"vortex.literal true");
}

#[rstest]
// Supported types
#[case::null(DataType::Null, true)]
Expand Down Expand Up @@ -705,4 +729,93 @@ mod tests {

assert_eq!(can_be_pushed_down(&get_field_expr, &schema), expected);
}

/// Create an unsupported scalar function expression (simulating functions like to_timestamp)
fn create_unsupported_scalar_fn() -> Arc<dyn PhysicalExpr> {
use datafusion_functions::datetime::to_timestamp::ToTimestampFunc;

let arg = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
"2024-01-01".to_string(),
)))) as Arc<dyn PhysicalExpr>;

Arc::new(ScalarFunctionExpr::new(
"to_timestamp",
Arc::new(ScalarUDF::from(ToTimestampFunc::new())),
vec![arg],
Arc::new(Field::new(
"result",
DataType::Timestamp(ArrowTimeUnit::Nanosecond, None),
true,
)),
Arc::new(ConfigOptions::new()),
))
}

#[test]
fn test_make_vortex_predicate_skips_unsupported_scalar_function() {
// Unsupported scalar function like to_timestamp should be skipped, not error
let unsupported_fn = create_unsupported_scalar_fn();
let result = make_vortex_predicate(&[&unsupported_fn]);

// Should succeed (not error) and return None since the only expression was skipped
assert!(result.is_ok());
assert!(result.unwrap().is_none());
}

#[test]
fn test_make_vortex_predicate_combines_supported_and_skips_unsupported() {
// Mix of supported column expression and unsupported scalar function
let supported_col = Arc::new(df_expr::Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
let unsupported_fn = create_unsupported_scalar_fn();

let result = make_vortex_predicate(&[&supported_col, &unsupported_fn]);

// Should succeed and return the supported expression only
assert!(result.is_ok());
let predicate = result.unwrap();
assert!(predicate.is_some());

// The result should just be the column expression since the unsupported one was skipped
assert_snapshot!(predicate.unwrap().display_tree().to_string(), @r#"
vortex.get_item "test"
└── input: vortex.root
"#);
}

#[test]
fn test_make_vortex_predicate_multiple_supported_with_unsupported() {
// Two supported columns and one unsupported function
let col1 = Arc::new(df_expr::Column::new("col1", 0)) as Arc<dyn PhysicalExpr>;
let col2 = Arc::new(df_expr::Column::new("col2", 1)) as Arc<dyn PhysicalExpr>;
let unsupported_fn = create_unsupported_scalar_fn();

let result = make_vortex_predicate(&[&col1, &unsupported_fn, &col2]);

// Should succeed and return AND of the two supported expressions
assert!(result.is_ok());
let predicate = result.unwrap();
assert!(predicate.is_some());

// The result should be an AND of col1 and col2
assert_snapshot!(predicate.unwrap().display_tree().to_string(), @r#"
vortex.binary and
├── lhs: vortex.get_item "col1"
│ └── input: vortex.root
└── rhs: vortex.get_item "col2"
└── input: vortex.root
"#);
}

#[test]
fn test_make_vortex_predicate_all_unsupported_returns_none() {
// When all expressions are unsupported, should return None (no filter)
let unsupported_fn1 = create_unsupported_scalar_fn();
let unsupported_fn2 = create_unsupported_scalar_fn();

let result = make_vortex_predicate(&[&unsupported_fn1, &unsupported_fn2]);

// Should succeed and return None
assert!(result.is_ok());
assert!(result.unwrap().is_none());
}
}
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use vortex::error::{VortexExpect, VortexResult, vortex_err};
use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::scalar::Scalar;
use vortex::session::VortexSession;
use vortex::stats::{ArrayStats, Stat, StatsSet};
use vortex::stats::{Stat, StatsSet};
use vortex::{VortexSessionDefault, stats};

use super::cache::VortexFileCache;
Expand Down
Loading