diff --git a/datafusion-federation/src/analyzer/mod.rs b/datafusion-federation/src/analyzer/mod.rs index 3c39684..b8f8a61 100644 --- a/datafusion-federation/src/analyzer/mod.rs +++ b/datafusion-federation/src/analyzer/mod.rs @@ -242,14 +242,13 @@ impl FederationAnalyzerRule { return Ok((None, ScanResult::Distinct(provider))); } - let Some(analyzer) = provider.analyzer() else { - // No analyzer provided - return Ok((None, ScanResult::None)); - }; + if let Some(analyzer) = provider.analyzer(&plan) { + // If this is the root plan node; federate the entire plan + let optimized = analyzer.execute_and_check(plan.clone(), config, |_, _| {})?; + return Ok((Some(optimized), ScanResult::None)); + } - // If this is the root plan node; federate the entire plan - let optimized = analyzer.execute_and_check(plan.clone(), config, |_, _| {})?; - return Ok((Some(optimized), ScanResult::None)); + sole_provider = ScanResult::Ambiguous; } // The plan is ambiguous; any input that is not yet optimized and has a @@ -279,7 +278,7 @@ impl FederationAnalyzerRule { return Ok(original_input); }; - let Some(analyzer) = provider.analyzer() else { + let Some(analyzer) = provider.analyzer(&original_input) else { // No analyzer for this input; use the original input. return Ok(original_input); }; @@ -421,7 +420,7 @@ impl FederationProvider for NopFederationProvider { None } - fn analyzer(&self) -> Option> { + fn analyzer(&self, _plan: &LogicalPlan) -> Option> { None } } @@ -470,7 +469,7 @@ fn contains_federated_table(plan: &LogicalPlan) -> Result { let federated_table_exists = plan.exists(|x| { if let (Some(provider), _) = get_leaf_provider(x)? { // federated table provider should have an analyzer - return Ok(provider.analyzer().is_some()); + return Ok(provider.analyzer(plan).is_some()); } Ok(false) })?; diff --git a/datafusion-federation/src/lib.rs b/datafusion-federation/src/lib.rs index af098ce..6dbb79d 100644 --- a/datafusion-federation/src/lib.rs +++ b/datafusion-federation/src/lib.rs @@ -13,6 +13,7 @@ use std::{ use datafusion::{ execution::session_state::{SessionState, SessionStateBuilder}, + logical_expr::LogicalPlan, optimizer::{ analyzer::{ resolve_grouping_function::ResolveGroupingFunction, type_coercion::TypeCoercion, @@ -60,8 +61,9 @@ pub trait FederationProvider: Send + Sync + std::fmt::Debug { fn compute_context(&self) -> Option; // Returns an analyzer that can cut out part of the plan - // to federate it. - fn analyzer(&self) -> Option>; + // to federate it. The plan is provided to allow the federation provider + // to check if it can federate the plan before returning an analyzer. + fn analyzer(&self, plan: &LogicalPlan) -> Option>; } impl fmt::Display for dyn FederationProvider { diff --git a/datafusion-federation/src/sql/executor.rs b/datafusion-federation/src/sql/executor.rs index e51c5ff..5016d75 100644 --- a/datafusion-federation/src/sql/executor.rs +++ b/datafusion-federation/src/sql/executor.rs @@ -29,6 +29,14 @@ pub trait SQLExecutor: Sync + Send { /// The specific SQL dialect (currently supports 'sqlite', 'postgres', 'flight') fn dialect(&self) -> Arc; + /// Returns if this executor can execute the query that would be produced from this logical plan. + /// + /// This is used to indicate to the federation logic that part of this plan cannot be federated, + /// i.e. if there are UDFs that only DataFusion can execute. + fn can_execute_plan(&self, _logical_plan: &LogicalPlan) -> bool { + true + } + /// Returns the analyzer rule specific for this engine to modify the logical plan before execution fn logical_optimizer(&self) -> Option { None diff --git a/datafusion-federation/src/sql/mod.rs b/datafusion-federation/src/sql/mod.rs index be46c71..ceab70c 100644 --- a/datafusion-federation/src/sql/mod.rs +++ b/datafusion-federation/src/sql/mod.rs @@ -75,8 +75,12 @@ impl FederationProvider for SQLFederationProvider { self.executor.compute_context() } - fn analyzer(&self) -> Option> { - Some(Arc::clone(&self.analyzer)) + fn analyzer(&self, plan: &LogicalPlan) -> Option> { + if self.executor.can_execute_plan(plan) { + Some(Arc::clone(&self.analyzer)) + } else { + None + } } } @@ -103,6 +107,10 @@ impl AnalyzerRule for SQLFederationAnalyzerRule { } } + if !self.planner.executor.can_execute_plan(&plan) { + return Ok(plan); + } + let fed_plan = FederatedPlanNode::new(plan.clone(), self.planner.clone()); let ext_node = Extension { node: Arc::new(fed_plan),