Skip to content

Commit 4585f33

Browse files
committed
Return Transformed::no when possible
Tests ported from apache#21964
1 parent 4f727f4 commit 4585f33

1 file changed

Lines changed: 60 additions & 19 deletions

File tree

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -448,16 +448,13 @@ fn push_down_all_join(
448448

449449
let mut on_filter_join_conditions = vec![];
450450
let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join.join_type);
451-
452-
if !on_filter.is_empty() {
453-
for on in on_filter {
454-
if on_left_preserved && checker.is_left_only(&on) {
455-
left_push.push(on)
456-
} else if on_right_preserved && checker.is_right_only(&on) {
457-
right_push.push(on)
458-
} else {
459-
on_filter_join_conditions.push(on)
460-
}
451+
for on in on_filter {
452+
if on_left_preserved && checker.is_left_only(&on) {
453+
left_push.push(on)
454+
} else if on_right_preserved && checker.is_right_only(&on) {
455+
right_push.push(on)
456+
} else {
457+
on_filter_join_conditions.push(on)
461458
}
462459
}
463460

@@ -473,6 +470,7 @@ fn push_down_all_join(
473470
&left_schema_columns,
474471
));
475472
}
473+
476474
if right_preserved {
477475
right_push.extend(extract_or_clauses_for_join(
478476
&keep_predicates,
@@ -492,13 +490,27 @@ fn push_down_all_join(
492490
&left_schema_columns,
493491
));
494492
}
493+
495494
if on_right_preserved {
496495
right_push.extend(extract_or_clauses_for_join(
497496
&on_filter_join_conditions,
498497
&right_schema_columns,
499498
));
500499
}
501500

501+
// Add any new join conditions as the non join predicates
502+
let join_conditions_empty = join_conditions.is_empty();
503+
join_conditions.extend(on_filter_join_conditions);
504+
join.filter = conjunction(join_conditions);
505+
506+
if join_conditions_empty && left_push.is_empty() && right_push.is_empty() {
507+
// wrap the join on the filter whose predicates must be kept, if any
508+
return Ok(Transformed::no(with_filters(
509+
keep_predicates,
510+
LogicalPlan::Join(join),
511+
)));
512+
}
513+
502514
if let Some(predicate) = conjunction(left_push) {
503515
join.left = Arc::new(LogicalPlan::Filter(Filter::new_unchecked(
504516
predicate, join.left,
@@ -511,10 +523,6 @@ fn push_down_all_join(
511523
)));
512524
}
513525

514-
// Add any new join conditions as the non join predicates
515-
join_conditions.extend(on_filter_join_conditions);
516-
join.filter = conjunction(join_conditions);
517-
518526
// wrap the join on the filter whose predicates must be kept, if any
519527
Ok(Transformed::yes(with_filters(
520528
keep_predicates,
@@ -1140,14 +1148,25 @@ impl OptimizerRule for PushDownFilter {
11401148
.map(|(&pred, _)| pred);
11411149

11421150
// Add new scan filters
1143-
scan.filters = scan
1151+
let new_scan_filters = scan
11441152
.filters
11451153
.iter()
11461154
.chain(new_scan_filters)
11471155
.unique()
11481156
.cloned()
11491157
.collect();
11501158

1159+
if supported_filters
1160+
.iter()
1161+
.all(|res| res == &TableProviderFilterPushDown::Inexact)
1162+
&& scan.filters == new_scan_filters
1163+
{
1164+
filter.input = Arc::new(LogicalPlan::TableScan(scan));
1165+
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
1166+
} else {
1167+
scan.filters = new_scan_filters;
1168+
}
1169+
11511170
// Compose predicates to be of `Unsupported` or `Inexact` pushdown type,
11521171
// and also include volatile and subquery-containing filters
11531172
let new_predicate: Vec<Expr> = zip
@@ -1605,6 +1624,10 @@ mod tests {
16051624
.aggregate(vec![col("a")], vec![sum(col("b")).alias("b")])?
16061625
.filter(col("b").gt(lit(10i64)))?
16071626
.build()?;
1627+
let transformed = PushDownFilter::new()
1628+
.rewrite(plan.clone(), &OptimizerContext::new())
1629+
.expect("failed to optimize plan");
1630+
assert!(!transformed.transformed);
16081631
// filter of aggregate is after aggregation since they are non-commutative
16091632
assert_optimized_plan_equal!(
16101633
plan,
@@ -1797,6 +1820,10 @@ mod tests {
17971820
.window(vec![window])?
17981821
.filter(col("c").gt(lit(10i64)))?
17991822
.build()?;
1823+
let transformed = PushDownFilter::new()
1824+
.rewrite(plan.clone(), &OptimizerContext::new())
1825+
.expect("failed to optimize plan");
1826+
assert!(!transformed.transformed);
18001827

18011828
assert_optimized_plan_equal!(
18021829
plan,
@@ -3022,6 +3049,10 @@ mod tests {
30223049
Some(filter),
30233050
)?
30243051
.build()?;
3052+
let transformed = PushDownFilter::new()
3053+
.rewrite(plan.clone(), &OptimizerContext::new())
3054+
.expect("failed to optimize plan");
3055+
assert!(!transformed.transformed);
30253056

30263057
// not part of the test, just good to know:
30273058
assert_snapshot!(plan,
@@ -3128,15 +3159,20 @@ mod tests {
31283159
let plan =
31293160
table_scan_with_pushdown_provider(TableProviderFilterPushDown::Inexact)?;
31303161

3131-
let optimized_plan = PushDownFilter::new()
3162+
let optimized = PushDownFilter::new()
31323163
.rewrite(plan, &OptimizerContext::new())
3133-
.expect("failed to optimize plan")
3134-
.data;
3164+
.expect("failed to optimize plan");
3165+
assert!(optimized.transformed);
3166+
3167+
let optimized_again = PushDownFilter::new()
3168+
.rewrite(optimized.data.clone(), &OptimizerContext::new())
3169+
.expect("failed to optimize plan");
3170+
assert!(!optimized_again.transformed);
31353171

31363172
// Optimizing the same plan multiple times should produce the same plan
31373173
// each time.
31383174
assert_optimized_plan_equal!(
3139-
optimized_plan,
3175+
optimized_again.data,
31403176
@r"
31413177
Filter: a = Int64(1)
31423178
TableScan: test, partial_filters=[a = Int64(1)]
@@ -3149,6 +3185,11 @@ mod tests {
31493185
let plan =
31503186
table_scan_with_pushdown_provider(TableProviderFilterPushDown::Unsupported)?;
31513187

3188+
let transformed = PushDownFilter::new()
3189+
.rewrite(plan.clone(), &OptimizerContext::new())
3190+
.expect("failed to optimize plan");
3191+
assert!(!transformed.transformed);
3192+
31523193
assert_optimized_plan_equal!(
31533194
plan,
31543195
@r"

0 commit comments

Comments
 (0)