Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions datafusion-federation/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,22 @@ impl FederationOptimizerRule {
return Ok((None, ScanResult::Distinct(provider)));
}

let Some(optimizer) = provider.optimizer() else {
// No optimizer provided
return Ok((None, ScanResult::None));
};
// Analyze plans (EXPLAIN ANALYZE) cannot be converted to SQL by
// the Unparser, so they must not be federated as a whole. Only the
// inner query should be federated; DataFusion's AnalyzeExec will
// handle executing it and collecting metrics.
if matches!(plan, LogicalPlan::Analyze(_)) {
// Fall through to federate children instead.
} else {
let Some(optimizer) = provider.optimizer() else {
// No optimizer provided
return Ok((None, ScanResult::None));
};

// If this is the root plan node; federate the entire plan
let optimized = optimizer.optimize(plan.clone(), _config, |_, _| {})?;
return Ok((Some(optimized), ScanResult::None));
// If this is the root plan node; federate the entire plan
let optimized = optimizer.optimize(plan.clone(), _config, |_, _| {})?;
return Ok((Some(optimized), ScanResult::None));
}
}

// The plan is ambiguous; any input that is not yet optimized and has a
Expand Down
51 changes: 51 additions & 0 deletions datafusion-federation/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,57 @@ mod tests {
Ok(())
}

/// EXPLAIN ANALYZE must not federate the Analyze wrapper — only the inner
/// query should be federated. Otherwise the SQL Unparser fails because it
/// cannot convert Analyze to SQL.
#[tokio::test]
async fn explain_analyze_not_federated() -> Result<(), DataFusionError> {
let executor = TestExecutor {
compute_context: "a".into(),
};

let table_ref = "test_table".to_string();
let table = get_test_table_provider(table_ref.clone(), executor);

let state = crate::default_session_state();
let ctx = SessionContext::new_with_state(state);
ctx.register_table(table_ref, table).unwrap();

let plan = ctx
.sql("EXPLAIN ANALYZE SELECT * FROM test_table")
.await?
.into_optimized_plan()?;

// The top-level node must be Analyze, not Federated.
assert!(
matches!(plan, LogicalPlan::Analyze(_)),
"Expected Analyze at root, got: {}",
plan.display_indent()
);

// The inner plan should contain a Federated extension node.
let mut found_federated = false;
plan.apply(|node| {
if let LogicalPlan::Extension(ext) = node {
if ext.node.name() == "Federated" {
found_federated = true;
return Ok(TreeNodeRecursion::Stop);
}
}
Ok(TreeNodeRecursion::Continue)
})?;
assert!(
found_federated,
"Expected a Federated node inside the Analyze plan"
);

// Physical planning should succeed (this is where it used to fail).
let physical_plan = ctx.state().create_physical_plan(&plan).await?;
assert_eq!(physical_plan.name(), "AnalyzeExec");

Ok(())
}

#[tokio::test]
async fn sql_query_rewriter_hook_invoked_and_rewrites_sql() -> Result<(), DataFusionError> {
let executor = TestExecutor {
Expand Down
Loading