Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions datafusion-federation/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,13 @@ impl FederationAnalyzerRule {
return Ok((None, ScanResult::Distinct(provider)));
}

let Some(analyzer) = provider.analyzer() else {
// No analyzer provided
return Ok((None, ScanResult::None));
};
if let Some(analyzer) = provider.analyzer(&plan) {
// If this is the root plan node; federate the entire plan
let optimized = analyzer.execute_and_check(plan.clone(), config, |_, _| {})?;
return Ok((Some(optimized), ScanResult::None));
}

// If this is the root plan node; federate the entire plan
let optimized = analyzer.execute_and_check(plan.clone(), config, |_, _| {})?;
return Ok((Some(optimized), ScanResult::None));
sole_provider = ScanResult::Ambiguous;
}

// The plan is ambiguous; any input that is not yet optimized and has a
Expand Down Expand Up @@ -279,7 +278,7 @@ impl FederationAnalyzerRule {
return Ok(original_input);
};

let Some(analyzer) = provider.analyzer() else {
let Some(analyzer) = provider.analyzer(&original_input) else {
// No analyzer for this input; use the original input.
return Ok(original_input);
};
Expand Down Expand Up @@ -421,7 +420,7 @@ impl FederationProvider for NopFederationProvider {
None
}

fn analyzer(&self) -> Option<Arc<datafusion::optimizer::Analyzer>> {
fn analyzer(&self, _plan: &LogicalPlan) -> Option<Arc<datafusion::optimizer::Analyzer>> {
None
}
}
Expand Down Expand Up @@ -470,7 +469,7 @@ fn contains_federated_table(plan: &LogicalPlan) -> Result<bool> {
let federated_table_exists = plan.exists(|x| {
if let (Some(provider), _) = get_leaf_provider(x)? {
// federated table provider should have an analyzer
return Ok(provider.analyzer().is_some());
return Ok(provider.analyzer(plan).is_some());
}
Ok(false)
})?;
Expand Down
6 changes: 4 additions & 2 deletions datafusion-federation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{

use datafusion::{
execution::session_state::{SessionState, SessionStateBuilder},
logical_expr::LogicalPlan,
optimizer::{
analyzer::{
resolve_grouping_function::ResolveGroupingFunction, type_coercion::TypeCoercion,
Expand Down Expand Up @@ -60,8 +61,9 @@ pub trait FederationProvider: Send + Sync + std::fmt::Debug {
fn compute_context(&self) -> Option<String>;

// Returns an analyzer that can cut out part of the plan
// to federate it.
fn analyzer(&self) -> Option<Arc<Analyzer>>;
// to federate it. The plan is provided to allow the federation provider
// to check if it can federate the plan before returning an analyzer.
fn analyzer(&self, plan: &LogicalPlan) -> Option<Arc<Analyzer>>;
}

impl fmt::Display for dyn FederationProvider {
Expand Down
8 changes: 8 additions & 0 deletions datafusion-federation/src/sql/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ pub trait SQLExecutor: Sync + Send {
/// The specific SQL dialect (currently supports 'sqlite', 'postgres', 'flight')
fn dialect(&self) -> Arc<dyn Dialect>;

/// Returns if this executor can execute the query that would be produced from this logical plan.
///
/// This is used to indicate to the federation logic that part of this plan cannot be federated,
/// i.e. if there are UDFs that only DataFusion can execute.
fn can_execute_plan(&self, _logical_plan: &LogicalPlan) -> bool {
true
}

/// Returns the analyzer rule specific for this engine to modify the logical plan before execution
fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
None
Expand Down
12 changes: 10 additions & 2 deletions datafusion-federation/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,12 @@ impl FederationProvider for SQLFederationProvider {
self.executor.compute_context()
}

fn analyzer(&self) -> Option<Arc<Analyzer>> {
Some(Arc::clone(&self.analyzer))
fn analyzer(&self, plan: &LogicalPlan) -> Option<Arc<Analyzer>> {
if self.executor.can_execute_plan(plan) {
Some(Arc::clone(&self.analyzer))
} else {
None
}
}
}

Expand All @@ -103,6 +107,10 @@ impl AnalyzerRule for SQLFederationAnalyzerRule {
}
}

if !self.planner.executor.can_execute_plan(&plan) {
return Ok(plan);
}

let fed_plan = FederatedPlanNode::new(plan.clone(), self.planner.clone());
let ext_node = Extension {
node: Arc::new(fed_plan),
Expand Down
Loading