Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions datafusion/core/tests/physical_optimizer/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,43 @@ fn test_sort_pushdown_through_projection_with_alias() {
);
}

#[test]
fn test_no_sort_pushdown_for_projection_name_index_mismatch() {
// Regression: if a sort column's index points into a projection output but
// its name does not match the projected alias, do not rewrite by index only.
let schema = schema();

// Source has [a ASC] ordering
let a = sort_expr("a", &schema);
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);

// Projection: SELECT a, b
let projection = simple_projection_exec(source, vec![0, 1]);

// Mismatched column metadata: name "_score" at index 0.
// Even though index 0 maps to projected column `a`, this must not push down.
let mismatched = sort_expr_named("_score", 0);
let ordering = LexOrdering::new(vec![mismatched.reverse()]).unwrap();
let plan = sort_exec(ordering, projection);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: expr=[_score@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
output:
Ok:
- SortExec: expr=[_score@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
"
);
}

#[test]
fn test_no_sort_pushdown_through_computed_projection() {
use datafusion_expr::Operator;
Expand Down
10 changes: 9 additions & 1 deletion datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,15 @@ impl ExecutionPlan for ProjectionExec {

let proj_expr = &self.expr()[col.index()];

// Check if projection expression is a simple column
// Guard against stale/mismatched column metadata (e.g. same index
// but different field name). In that case we must not rewrite by
// index only, as it can push down an order on a different column.
if col.name() != proj_expr.alias {
can_pushdown = false;
return Ok(Transformed::no(expr));
}

// Check if projection expression is a simple column.
// We cannot push down order by clauses that depend on
// projected computations as they would have nothing to reference.
if let Some(child_col) =
Expand Down