Skip to content

Commit d691dba

Browse files
authored
fix: handle EXISTS/NOT EXISTS subqueries in federation analyzer (#68)
1 parent 3578ed4 commit d691dba

1 file changed

Lines changed: 45 additions & 1 deletion

File tree

  • datafusion-federation/src/analyzer

datafusion-federation/src/analyzer/mod.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ mod scan_result;
22

33
use crate::{FederatedTableProviderAdaptor, FederatedTableSource, FederationProviderRef};
44
use crate::{FederationAnalyzerForLogicalPlan, FederationProvider};
5-
use datafusion::logical_expr::{col, expr::InSubquery, LogicalPlanBuilder};
5+
use datafusion::logical_expr::{col, expr::Exists, expr::InSubquery, LogicalPlanBuilder};
66
use datafusion::optimizer::optimize_unions::OptimizeUnions;
77
use datafusion::optimizer::push_down_filter::PushDownFilter;
88
use datafusion::optimizer::{Optimizer, OptimizerContext, OptimizerRule};
@@ -154,6 +154,13 @@ impl FederationAnalyzerRule {
154154
sole_provider.merge(plan_result);
155155
Ok(sole_provider.check_recursion())
156156
}
157+
Expr::Exists(ref exists) => {
158+
let plan_result =
159+
self.scan_plan_recursively(&exists.subquery.subquery, providers)?;
160+
161+
sole_provider.merge(plan_result);
162+
Ok(sole_provider.check_recursion())
163+
}
157164
Expr::OuterReferenceColumn(_, ref col) => {
158165
if let Some(table) = &col.relation {
159166
if let Some(plan_result) = providers.get(table) {
@@ -414,6 +421,43 @@ impl FederationAnalyzerRule {
414421
in_subquery.negated,
415422
))))
416423
}
424+
Expr::Exists(ref exists) => {
425+
let (new_subquery, _) = self.analyze_plan_recursively(
426+
&exists.subquery.subquery,
427+
true,
428+
_config,
429+
providers,
430+
)?;
431+
let Some(new_subquery) = new_subquery else {
432+
return Ok(Transformed::no(expr));
433+
};
434+
435+
// DecorrelatePredicateSubquery optimizer rule doesn't support federated node
436+
// (LogicalPlan::Extension(_)) as subquery.
437+
// Wrap a no-op Projection outside the federated node to facilitate optimization.
438+
if matches!(new_subquery, LogicalPlan::Extension(_)) {
439+
let all_columns = new_subquery
440+
.schema()
441+
.fields()
442+
.iter()
443+
.map(|field| col(field.name()))
444+
.collect::<Vec<_>>();
445+
446+
let projection_plan = LogicalPlanBuilder::from(new_subquery)
447+
.project(all_columns)?
448+
.build()?;
449+
450+
return Ok(Transformed::yes(Expr::Exists(Exists {
451+
subquery: exists.subquery.with_plan(projection_plan.into()),
452+
negated: exists.negated,
453+
})));
454+
}
455+
456+
Ok(Transformed::yes(Expr::Exists(Exists {
457+
subquery: exists.subquery.with_plan(new_subquery.into()),
458+
negated: exists.negated,
459+
})))
460+
}
417461
_ => Ok(Transformed::no(expr)),
418462
}
419463
}

0 commit comments

Comments
 (0)