Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 61 additions & 21 deletions datafusion-federation/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ impl AnalyzerRule for FederationAnalyzerRule {

// Find all federation providers for TableReferences that appear in the plan, to resolve OuterRefColumns
let providers = get_plan_provider_recursively(&plan)?;
let explain_context = Self::explain_context_template(&plan);

match self.analyze_plan_recursively(&plan, true, config, &providers)? {
match self.analyze_plan_recursively(&plan, true, config, &providers, explain_context)? {
(Some(optimized_plan), _) => Ok(optimized_plan),
(None, _) => Ok(plan),
}
Expand Down Expand Up @@ -85,6 +86,27 @@ impl FederationAnalyzerRule {
self
}

fn explain_context_template(plan: &LogicalPlan) -> Option<LogicalPlan> {
match plan {
LogicalPlan::Explain(_) | LogicalPlan::Analyze(_) => Some(plan.clone()),
_ => None,
}
}

fn wrap_federated_plan(
plan: LogicalPlan,
explain_context: Option<&LogicalPlan>,
) -> Result<LogicalPlan> {
if matches!(plan, LogicalPlan::Explain(_) | LogicalPlan::Analyze(_)) {
return Ok(plan);
}

match explain_context {
Some(wrapper) => wrapper.with_new_exprs(wrapper.expressions(), vec![plan]),
None => Ok(plan),
}
}

/// Scans a plan to see if it belongs to a single [`FederationProvider`].
fn scan_plan_recursively(
&self,
Expand Down Expand Up @@ -184,7 +206,9 @@ impl FederationAnalyzerRule {
is_root: bool,
config: &ConfigOptions,
providers: &HashMap<TableReference, Arc<dyn FederationProvider>>,
explain_context: Option<LogicalPlan>,
) -> Result<(Option<LogicalPlan>, ScanResult)> {
let explain_context = explain_context.or_else(|| Self::explain_context_template(plan));
let mut sole_provider: ScanResult = ScanResult::None;

if let LogicalPlan::Extension(Extension { ref node }) = plan {
Expand Down Expand Up @@ -217,7 +241,9 @@ impl FederationAnalyzerRule {
// Recursively analyze inputs
let input_results = inputs
.iter()
.map(|i| self.analyze_plan_recursively(i, false, config, providers))
.map(|i| {
self.analyze_plan_recursively(i, false, config, providers, explain_context.clone())
})
.collect::<Result<Vec<_>>>()?;

// Aggregate the input providers
Expand All @@ -238,23 +264,25 @@ impl FederationAnalyzerRule {

// If all sources are federated to the same provider
if let ScanResult::Distinct(provider) = sole_provider {
// Analyze plans (EXPLAIN ANALYZE) cannot be converted to SQL by the
// Unparser, so they must not be federated as a whole. Only the inner
// query should be federated; DataFusion's AnalyzeExec will handle
// executing it and collecting metrics.
let provider_analyzer = if matches!(plan, LogicalPlan::Analyze(_)) {
None
} else {
provider.analyzer(plan)
};
// Explain and Analyze wrappers stay in the DataFusion plan so their
// physical operators can still run. The corresponding directive is
// injected into the federated subquery instead.
let federated_plan = Self::wrap_federated_plan(plan.clone(), explain_context.as_ref())?;
let provider_analyzer =
if matches!(plan, LogicalPlan::Analyze(_) | LogicalPlan::Explain(_)) {
None
} else {
provider.analyzer(&federated_plan)
};
match (is_root, provider_analyzer) {
(false, Some(_)) => {
// The largest sub-plan is higher up.
return Ok((None, ScanResult::Distinct(provider)));
}
(true, Some(FederationAnalyzerForLogicalPlan::With(analyzer))) => {
// If this is the root plan node; federate the entire plan
let optimized = analyzer.execute_and_check(plan.clone(), config, |_, _| {})?;
let optimized =
analyzer.execute_and_check(federated_plan, config, |_, _| {})?;
return Ok((Some(optimized), ScanResult::None));
}
(_, None | Some(FederationAnalyzerForLogicalPlan::Unable)) => {
Expand Down Expand Up @@ -291,22 +319,26 @@ impl FederationAnalyzerRule {
return Ok(original_input);
};

let federated_input = Self::wrap_federated_plan(
wrap_projection(original_input.clone())?,
explain_context.as_ref(),
)?;

let Some(FederationAnalyzerForLogicalPlan::With(analyzer)) =
provider.analyzer(&original_input)
provider.analyzer(&federated_input)
else {
// Either provider has no analyzer, or cannot federate [`LogicalPlan`].
return Ok(original_input);
};

// Replace the input with the federated counterpart
let wrapped = wrap_projection(original_input)?;
analyzer.execute_and_check(wrapped, config, |_, _| {})
analyzer.execute_and_check(federated_input, config, |_, _| {})
})
.collect::<Result<Vec<_>>>()?;

// Optimize expressions if needed
let new_expressions = if optimize_expressions {
self.analyze_plan_exprs(plan, config, providers)?
self.analyze_plan_exprs(plan, config, providers, explain_context)?
} else {
plan.expressions()
};
Expand All @@ -324,13 +356,14 @@ impl FederationAnalyzerRule {
plan: &LogicalPlan,
config: &ConfigOptions,
providers: &HashMap<TableReference, Arc<dyn FederationProvider>>,
explain_context: Option<LogicalPlan>,
) -> Result<Vec<Expr>> {
plan.expressions()
.iter()
.map(|expr| {
let transformed = expr
.clone()
.transform(&|e| self.analyze_expr_recursively(e, config, providers))?;
let transformed = expr.clone().transform(&|e| {
self.analyze_expr_recursively(e, config, providers, explain_context.clone())
})?;
Ok(transformed.data)
})
.collect::<Result<Vec<_>>>()
Expand All @@ -343,12 +376,18 @@ impl FederationAnalyzerRule {
expr: Expr,
_config: &ConfigOptions,
providers: &HashMap<TableReference, Arc<dyn FederationProvider>>,
explain_context: Option<LogicalPlan>,
) -> Result<Transformed<Expr>> {
match expr {
Expr::ScalarSubquery(ref subquery) => {
// Analyze as root to force federating the sub-query
let (new_subquery, _) =
self.analyze_plan_recursively(&subquery.subquery, true, _config, providers)?;
let (new_subquery, _) = self.analyze_plan_recursively(
&subquery.subquery,
true,
_config,
providers,
explain_context.clone(),
)?;
let Some(new_subquery) = new_subquery else {
return Ok(Transformed::no(expr));
};
Expand Down Expand Up @@ -382,6 +421,7 @@ impl FederationAnalyzerRule {
true,
_config,
providers,
explain_context,
)?;
let Some(new_subquery) = new_subquery else {
return Ok(Transformed::no(expr));
Expand Down
3 changes: 2 additions & 1 deletion datafusion-federation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use datafusion::{

pub use analyzer::{get_table_source, FederationAnalyzerRule};
pub use plan_node::{
FederatedPlanNode, FederatedPlanner, FederatedQueryPlanner, FederationPlanner,
FederatedPlanNode, FederatedPlanner, FederatedQueryPlanner, FederatedQueryType,
FederationPlanner,
};
pub use table_provider::{FederatedTableProviderAdaptor, FederatedTableSource};

Expand Down
109 changes: 103 additions & 6 deletions datafusion-federation/src/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,49 @@ use datafusion::{
common::DFSchemaRef,
error::{DataFusionError, Result},
execution::context::{QueryPlanner, SessionState},
logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore},
logical_expr::{
Expr, Extension, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
},
physical_plan::ExecutionPlan,
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
};

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum FederatedQueryType {
Explain,
Analyze,
}

impl FederatedQueryType {
pub fn prefix(self) -> &'static str {
match self {
Self::Explain => "EXPLAIN",
Self::Analyze => "EXPLAIN ANALYZE",
}
}
}

pub struct FederatedPlanNode {
pub(crate) plan: LogicalPlan,
pub(crate) planner: Arc<dyn FederationPlanner>,
pub(crate) query_type: Option<FederatedQueryType>,
}

impl FederatedPlanNode {
pub fn new(plan: LogicalPlan, planner: Arc<dyn FederationPlanner>) -> Self {
Self { plan, planner }
Self::new_with_query_type(plan, planner, None)
}

pub fn new_with_query_type(
plan: LogicalPlan,
planner: Arc<dyn FederationPlanner>,
query_type: Option<FederatedQueryType>,
) -> Self {
Self {
plan,
planner,
query_type,
}
}

pub fn plan(&self) -> &LogicalPlan {
Expand All @@ -32,6 +62,10 @@ impl FederatedPlanNode {
pub fn planner(&self) -> &Arc<dyn FederationPlanner> {
&self.planner
}

pub fn query_type(&self) -> Option<FederatedQueryType> {
self.query_type
}
}

impl Debug for FederatedPlanNode {
Expand Down Expand Up @@ -72,6 +106,7 @@ impl UserDefinedLogicalNodeCore for FederatedPlanNode {
Ok(Self {
plan: self.plan.clone(),
planner: Arc::clone(&self.planner),
query_type: self.query_type,
})
}
}
Expand All @@ -83,6 +118,64 @@ impl FederatedQueryPlanner {
pub fn new() -> Self {
Self::default()
}

fn annotate_query_type(
plan: &LogicalPlan,
query_type: FederatedQueryType,
) -> Result<LogicalPlan> {
let new_inputs = plan
.inputs()
.into_iter()
.map(|input| Self::annotate_query_type(input, query_type))
.collect::<Result<Vec<_>>>()?;
let plan = if new_inputs.is_empty() {
plan.clone()
} else {
plan.with_new_exprs(plan.expressions(), new_inputs)?
};

if let LogicalPlan::Extension(Extension { node }) = &plan {
if let Some(federated_node) = node.as_any().downcast_ref::<FederatedPlanNode>() {
return Ok(LogicalPlan::Extension(Extension {
node: Arc::new(FederatedPlanNode::new_with_query_type(
federated_node.plan.clone(),
Arc::clone(&federated_node.planner),
Some(federated_node.query_type.unwrap_or(query_type)),
)),
}));
}
}

Ok(plan)
}

pub(crate) fn annotate_query_directives(plan: &LogicalPlan) -> Result<LogicalPlan> {
match plan {
LogicalPlan::Explain(_) => {
let inputs = plan.inputs();
let [input] = inputs.as_slice() else {
return Err(DataFusionError::Plan(
"Explain plan must have exactly one input".into(),
));
};
let annotated_input =
Self::annotate_query_type(input, FederatedQueryType::Explain)?;
plan.with_new_exprs(plan.expressions(), vec![annotated_input])
}
LogicalPlan::Analyze(_) => {
let inputs = plan.inputs();
let [input] = inputs.as_slice() else {
return Err(DataFusionError::Plan(
"Analyze plan must have exactly one input".into(),
));
};
let annotated_input =
Self::annotate_query_type(input, FederatedQueryType::Analyze)?;
plan.with_new_exprs(plan.expressions(), vec![annotated_input])
}
_ => Ok(plan.clone()),
}
}
}

#[async_trait]
Expand All @@ -92,14 +185,14 @@ impl QueryPlanner for FederatedQueryPlanner {
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
// Get provider here?
let logical_plan = Self::annotate_query_directives(logical_plan)?;

let physical_planner =
DefaultPhysicalPlanner::with_extension_planners(vec![
Arc::new(FederatedPlanner::new()),
]);
physical_planner
.create_physical_plan(logical_plan, session_state)
.create_physical_plan(&logical_plan, session_state)
.await
Comment on lines +188 to 196
Copy link

Copilot AI Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FederatedQueryPlanner::create_physical_plan now always clones the entire LogicalPlan because annotate_query_directives returns plan.clone() for the non-EXPLAIN/ANALYZE case. This adds an avoidable allocation/copy on every query. Consider only calling annotate_query_directives (and only allocating a new plan) when the root is LogicalPlan::Explain/LogicalPlan::Analyze; otherwise pass the original logical_plan through to create_physical_plan.

Copilot uses AI. Check for mistakes.
}
}
Expand All @@ -122,13 +215,16 @@ impl std::fmt::Debug for dyn FederationPlanner {
impl PartialEq<FederatedPlanNode> for FederatedPlanNode {
/// Comparing name, args and return_type
fn eq(&self, other: &FederatedPlanNode) -> bool {
self.plan == other.plan
self.plan == other.plan && self.query_type == other.query_type
}
}

impl PartialOrd<FederatedPlanNode> for FederatedPlanNode {
fn partial_cmp(&self, other: &FederatedPlanNode) -> Option<std::cmp::Ordering> {
self.plan.partial_cmp(&other.plan)
match self.plan.partial_cmp(&other.plan) {
Some(std::cmp::Ordering::Equal) => self.query_type.partial_cmp(&other.query_type),
ordering => ordering,
}
}
}

Expand All @@ -137,6 +233,7 @@ impl Eq for FederatedPlanNode {}
impl Hash for FederatedPlanNode {
fn hash<H: Hasher>(&self, state: &mut H) {
self.plan.hash(state);
self.query_type.hash(state);
}
}

Expand Down
Loading
Loading