Skip to content

Commit b75222c

Browse files
Subham SinghalSubham Singhal
authored andcommitted
Eliminate outer joins with empty relations via null-padded projection
1 parent bc2b36c commit b75222c

1 file changed

Lines changed: 176 additions & 7 deletions

File tree

datafusion/optimizer/src/propagate_empty_relation.rs

Lines changed: 176 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ use std::sync::Arc;
2121

2222
use datafusion_common::JoinType;
2323
use datafusion_common::tree_node::Transformed;
24-
use datafusion_common::{Result, plan_err};
24+
use datafusion_common::{Column, DFSchemaRef, Result, ScalarValue, plan_err};
25+
use datafusion_expr::expr::Cast;
2526
use datafusion_expr::logical_plan::LogicalPlan;
26-
use datafusion_expr::{EmptyRelation, Projection, Union};
27+
use datafusion_expr::{EmptyRelation, Expr, Projection, Union};
2728

2829
use crate::optimizer::ApplyOrder;
2930
use crate::{OptimizerConfig, OptimizerRule};
@@ -73,12 +74,8 @@ impl OptimizerRule for PropagateEmptyRelation {
7374
Ok(Transformed::no(plan))
7475
}
7576
LogicalPlan::Join(ref join) => {
76-
// TODO: For Join, more join type need to be careful:
77-
// For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
78-
// columns + right side columns replaced with null values.
79-
// For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side
80-
// columns + left side columns replaced with null values.
8177
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
78+
let left_field_count = join.left.schema().fields().len();
8279

8380
match join.join_type {
8481
// For Full Join, only both sides are empty, the Join result is empty.
@@ -88,6 +85,24 @@ impl OptimizerRule for PropagateEmptyRelation {
8885
schema: Arc::clone(&join.schema),
8986
}),
9087
)),
88+
// For Full Join, if one side is empty, replace with a
89+
// Projection that null-pads the empty side's columns.
90+
JoinType::Full if right_empty => {
91+
Ok(Transformed::yes(build_null_padded_projection(
92+
Arc::clone(&join.left),
93+
&join.schema,
94+
left_field_count,
95+
true,
96+
)?))
97+
}
98+
JoinType::Full if left_empty => {
99+
Ok(Transformed::yes(build_null_padded_projection(
100+
Arc::clone(&join.right),
101+
&join.schema,
102+
left_field_count,
103+
false,
104+
)?))
105+
}
91106
JoinType::Inner if left_empty || right_empty => Ok(Transformed::yes(
92107
LogicalPlan::EmptyRelation(EmptyRelation {
93108
produce_one_row: false,
@@ -100,12 +115,32 @@ impl OptimizerRule for PropagateEmptyRelation {
100115
schema: Arc::clone(&join.schema),
101116
}),
102117
)),
118+
// Left Join with empty right: all left rows survive
119+
// with NULLs for right columns.
120+
JoinType::Left if right_empty => {
121+
Ok(Transformed::yes(build_null_padded_projection(
122+
Arc::clone(&join.left),
123+
&join.schema,
124+
left_field_count,
125+
true,
126+
)?))
127+
}
103128
JoinType::Right if right_empty => Ok(Transformed::yes(
104129
LogicalPlan::EmptyRelation(EmptyRelation {
105130
produce_one_row: false,
106131
schema: Arc::clone(&join.schema),
107132
}),
108133
)),
134+
// Right Join with empty left: all right rows survive
135+
// with NULLs for left columns.
136+
JoinType::Right if left_empty => {
137+
Ok(Transformed::yes(build_null_padded_projection(
138+
Arc::clone(&join.right),
139+
&join.schema,
140+
left_field_count,
141+
false,
142+
)?))
143+
}
109144
JoinType::LeftSemi if left_empty || right_empty => Ok(
110145
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
111146
produce_one_row: false,
@@ -230,6 +265,60 @@ fn empty_child(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
230265
}
231266
}
232267

268+
/// Builds a Projection that replaces one side of an outer join with NULL literals.
269+
///
270+
/// When one side of an outer join is an `EmptyRelation`, the join can be eliminated
271+
/// by projecting the surviving side's columns as-is and replacing the empty side's
272+
/// columns with `CAST(NULL AS <type>)`.
273+
///
274+
/// The join schema is used as the projection's output schema to preserve nullability
275+
/// guarantees (important for FULL JOIN where the surviving side's columns are marked
276+
/// nullable in the join schema even if they aren't in the source schema).
277+
///
278+
/// # Example
279+
///
280+
/// For a `LEFT JOIN` where the right side is empty:
281+
/// ```text
282+
/// Left Join (orders.id = returns.order_id) Projection(orders.id, orders.amount,
283+
/// ├── TableScan: orders => CAST(NULL AS Int64) AS order_id,
284+
/// └── EmptyRelation CAST(NULL AS Utf8) AS reason)
285+
/// └── TableScan: orders
286+
/// ```
287+
fn build_null_padded_projection(
288+
surviving_plan: Arc<LogicalPlan>,
289+
join_schema: &DFSchemaRef,
290+
left_field_count: usize,
291+
empty_side_is_right: bool,
292+
) -> Result<LogicalPlan> {
293+
let exprs = join_schema
294+
.iter()
295+
.enumerate()
296+
.map(|(i, (qualifier, field))| {
297+
let on_empty_side = if empty_side_is_right {
298+
i >= left_field_count
299+
} else {
300+
i < left_field_count
301+
};
302+
303+
if on_empty_side {
304+
Expr::Cast(Cast::new(
305+
Box::new(Expr::Literal(ScalarValue::Null, None)),
306+
field.data_type().clone(),
307+
))
308+
.alias_qualified(qualifier.cloned(), field.name())
309+
} else {
310+
Expr::Column(Column::new(qualifier.cloned(), field.name()))
311+
}
312+
})
313+
.collect::<Vec<_>>();
314+
315+
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
316+
exprs,
317+
surviving_plan,
318+
Arc::clone(join_schema),
319+
)?))
320+
}
321+
233322
#[cfg(test)]
234323
mod tests {
235324

@@ -570,6 +659,86 @@ mod tests {
570659
assert_empty_left_empty_right_lp(true, false, JoinType::RightAnti, false)
571660
}
572661

662+
#[test]
663+
fn test_left_join_right_empty_null_pad() -> Result<()> {
664+
let left =
665+
LogicalPlanBuilder::from(test_table_scan_with_name("left")?).build()?;
666+
let right_empty = LogicalPlanBuilder::from(test_table_scan_with_name("right")?)
667+
.filter(lit(false))?
668+
.build()?;
669+
670+
let plan = LogicalPlanBuilder::from(left)
671+
.join_using(
672+
right_empty,
673+
JoinType::Left,
674+
vec![Column::from_name("a".to_string())],
675+
)?
676+
.build()?;
677+
678+
let expected = "Projection: left.a, left.b, left.c, CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c\n TableScan: left";
679+
assert_together_optimized_plan(plan, expected, true)
680+
}
681+
682+
#[test]
683+
fn test_right_join_left_empty_null_pad() -> Result<()> {
684+
let left_empty = LogicalPlanBuilder::from(test_table_scan_with_name("left")?)
685+
.filter(lit(false))?
686+
.build()?;
687+
let right =
688+
LogicalPlanBuilder::from(test_table_scan_with_name("right")?).build()?;
689+
690+
let plan = LogicalPlanBuilder::from(left_empty)
691+
.join_using(
692+
right,
693+
JoinType::Right,
694+
vec![Column::from_name("a".to_string())],
695+
)?
696+
.build()?;
697+
698+
let expected = "Projection: CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c, right.a, right.b, right.c\n TableScan: right";
699+
assert_together_optimized_plan(plan, expected, true)
700+
}
701+
702+
#[test]
703+
fn test_full_join_right_empty_null_pad() -> Result<()> {
704+
let left =
705+
LogicalPlanBuilder::from(test_table_scan_with_name("left")?).build()?;
706+
let right_empty = LogicalPlanBuilder::from(test_table_scan_with_name("right")?)
707+
.filter(lit(false))?
708+
.build()?;
709+
710+
let plan = LogicalPlanBuilder::from(left)
711+
.join_using(
712+
right_empty,
713+
JoinType::Full,
714+
vec![Column::from_name("a".to_string())],
715+
)?
716+
.build()?;
717+
718+
let expected = "Projection: left.a, left.b, left.c, CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c\n TableScan: left";
719+
assert_together_optimized_plan(plan, expected, true)
720+
}
721+
722+
#[test]
723+
fn test_full_join_left_empty_null_pad() -> Result<()> {
724+
let left_empty = LogicalPlanBuilder::from(test_table_scan_with_name("left")?)
725+
.filter(lit(false))?
726+
.build()?;
727+
let right =
728+
LogicalPlanBuilder::from(test_table_scan_with_name("right")?).build()?;
729+
730+
let plan = LogicalPlanBuilder::from(left_empty)
731+
.join_using(
732+
right,
733+
JoinType::Full,
734+
vec![Column::from_name("a".to_string())],
735+
)?
736+
.build()?;
737+
738+
let expected = "Projection: CAST(NULL AS UInt32) AS a, CAST(NULL AS UInt32) AS b, CAST(NULL AS UInt32) AS c, right.a, right.b, right.c\n TableScan: right";
739+
assert_together_optimized_plan(plan, expected, true)
740+
}
741+
573742
#[test]
574743
fn test_empty_with_non_empty() -> Result<()> {
575744
let table_scan = test_table_scan()?;

0 commit comments

Comments
 (0)