Skip to content

Commit 6b6bfb0

Browse files
authored
fix: handle EXISTS/NOT EXISTS subqueries in federation analyzer (#74)
1 parent a4fce79 commit 6b6bfb0

9 files changed

Lines changed: 616 additions & 1 deletion

datafusion-federation/src/analyzer/mod.rs

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ mod scan_result;
22

33
use crate::{FederatedTableProviderAdaptor, FederatedTableSource, FederationProviderRef};
44
use crate::{FederationAnalyzerForLogicalPlan, FederationProvider};
5-
use datafusion::logical_expr::{col, expr::InSubquery, LogicalPlanBuilder};
5+
use datafusion::logical_expr::{col, expr::Exists, expr::InSubquery, LogicalPlanBuilder};
66
use datafusion::optimizer::optimize_unions::OptimizeUnions;
77
use datafusion::optimizer::push_down_filter::PushDownFilter;
88
use datafusion::optimizer::{Optimizer, OptimizerContext, OptimizerRule};
@@ -163,6 +163,13 @@ impl FederationAnalyzerRule {
163163
sole_provider.merge(plan_result);
164164
Ok(sole_provider.check_recursion())
165165
}
166+
Expr::Exists(ref exists) => {
167+
let plan_result =
168+
self.scan_plan_recursively(&exists.subquery.subquery, providers)?;
169+
170+
sole_provider.merge(plan_result);
171+
Ok(sole_provider.check_recursion())
172+
}
166173
Expr::OuterReferenceColumn(_, ref col) => {
167174
if let Some(table) = &col.relation {
168175
if let Some(plan_result) = providers.get(table) {
@@ -423,6 +430,43 @@ impl FederationAnalyzerRule {
423430
in_subquery.negated,
424431
))))
425432
}
433+
Expr::Exists(ref exists) => {
434+
let (new_subquery, _) = self.analyze_plan_recursively(
435+
&exists.subquery.subquery,
436+
true,
437+
_config,
438+
providers,
439+
)?;
440+
let Some(new_subquery) = new_subquery else {
441+
return Ok(Transformed::no(expr));
442+
};
443+
444+
// DecorrelatePredicateSubquery optimizer rule doesn't support federated node
445+
// (LogicalPlan::Extension(_)) as subquery.
446+
// Wrap a no-op Projection outside the federated node to facilitate optimization.
447+
if matches!(new_subquery, LogicalPlan::Extension(_)) {
448+
let all_columns = new_subquery
449+
.schema()
450+
.fields()
451+
.iter()
452+
.map(|field| col(field.name()))
453+
.collect::<Vec<_>>();
454+
455+
let projection_plan = LogicalPlanBuilder::from(new_subquery)
456+
.project(all_columns)?
457+
.build()?;
458+
459+
return Ok(Transformed::yes(Expr::Exists(Exists {
460+
subquery: exists.subquery.with_plan(projection_plan.into()),
461+
negated: exists.negated,
462+
})));
463+
}
464+
465+
Ok(Transformed::yes(Expr::Exists(Exists {
466+
subquery: exists.subquery.with_plan(new_subquery.into()),
467+
negated: exists.negated,
468+
})))
469+
}
426470
_ => Ok(Transformed::no(expr)),
427471
}
428472
}
@@ -455,6 +499,25 @@ fn get_plan_provider_recursively(
455499
let mut providers: HashMap<TableReference, Arc<dyn FederationProvider>> = HashMap::new();
456500

457501
plan.apply_with_subqueries(&mut |p: &LogicalPlan| -> Result<TreeNodeRecursion> {
502+
// Register SubqueryAlias names (e.g. `lineitem l1`) so that OuterReferenceColumn resolved
503+
// against the alias (e.g. `l1.l_orderkey`) can find the correct provider. Without this,
504+
// correlated subqueries that reference an aliased outer table mark the scan as Ambiguous,
505+
// breaking same-provider federation.
506+
if let LogicalPlan::SubqueryAlias(subquery_alias) = p {
507+
let alias_ref = TableReference::bare(subquery_alias.alias.table().to_string());
508+
subquery_alias
509+
.input
510+
.apply(&mut |child| -> Result<TreeNodeRecursion> {
511+
if let (Some(provider), Some(table_reference)) = get_leaf_provider(child)? {
512+
providers.insert(alias_ref.clone(), Arc::clone(&provider));
513+
providers.insert(table_reference, provider);
514+
return Ok(TreeNodeRecursion::Stop);
515+
}
516+
Ok(TreeNodeRecursion::Continue)
517+
})?;
518+
return Ok(TreeNodeRecursion::Continue);
519+
}
520+
458521
if let (Some(federation_provider), Some(table_reference)) = get_leaf_provider(p)? {
459522
providers.insert(table_reference, federation_provider);
460523
}

0 commit comments

Comments
 (0)