Skip to content

Commit 28c72d7

Browse files
committed
fix: handle EXISTS/NOT EXISTS subqueries in federation analyzer (#68)
1 parent a4fce79 commit 28c72d7

1 file changed

Lines changed: 45 additions & 1 deletion

File tree

  • datafusion-federation/src/analyzer

datafusion-federation/src/analyzer/mod.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ mod scan_result;
22

33
use crate::{FederatedTableProviderAdaptor, FederatedTableSource, FederationProviderRef};
44
use crate::{FederationAnalyzerForLogicalPlan, FederationProvider};
5-
use datafusion::logical_expr::{col, expr::InSubquery, LogicalPlanBuilder};
5+
use datafusion::logical_expr::{col, expr::Exists, expr::InSubquery, LogicalPlanBuilder};
66
use datafusion::optimizer::optimize_unions::OptimizeUnions;
77
use datafusion::optimizer::push_down_filter::PushDownFilter;
88
use datafusion::optimizer::{Optimizer, OptimizerContext, OptimizerRule};
@@ -163,6 +163,13 @@ impl FederationAnalyzerRule {
163163
sole_provider.merge(plan_result);
164164
Ok(sole_provider.check_recursion())
165165
}
166+
Expr::Exists(ref exists) => {
167+
let plan_result =
168+
self.scan_plan_recursively(&exists.subquery.subquery, providers)?;
169+
170+
sole_provider.merge(plan_result);
171+
Ok(sole_provider.check_recursion())
172+
}
166173
Expr::OuterReferenceColumn(_, ref col) => {
167174
if let Some(table) = &col.relation {
168175
if let Some(plan_result) = providers.get(table) {
@@ -423,6 +430,43 @@ impl FederationAnalyzerRule {
423430
in_subquery.negated,
424431
))))
425432
}
433+
Expr::Exists(ref exists) => {
434+
let (new_subquery, _) = self.analyze_plan_recursively(
435+
&exists.subquery.subquery,
436+
true,
437+
_config,
438+
providers,
439+
)?;
440+
let Some(new_subquery) = new_subquery else {
441+
return Ok(Transformed::no(expr));
442+
};
443+
444+
// DecorrelatePredicateSubquery optimizer rule doesn't support federated node
445+
// (LogicalPlan::Extension(_)) as subquery.
446+
// Wrap a no-op Projection outside the federated node to facilitate optimization.
447+
if matches!(new_subquery, LogicalPlan::Extension(_)) {
448+
let all_columns = new_subquery
449+
.schema()
450+
.fields()
451+
.iter()
452+
.map(|field| col(field.name()))
453+
.collect::<Vec<_>>();
454+
455+
let projection_plan = LogicalPlanBuilder::from(new_subquery)
456+
.project(all_columns)?
457+
.build()?;
458+
459+
return Ok(Transformed::yes(Expr::Exists(Exists {
460+
subquery: exists.subquery.with_plan(projection_plan.into()),
461+
negated: exists.negated,
462+
})));
463+
}
464+
465+
Ok(Transformed::yes(Expr::Exists(Exists {
466+
subquery: exists.subquery.with_plan(new_subquery.into()),
467+
negated: exists.negated,
468+
})))
469+
}
426470
_ => Ok(Transformed::no(expr)),
427471
}
428472
}

0 commit comments

Comments
 (0)