diff --git a/datafusion-federation/src/optimizer/mod.rs b/datafusion-federation/src/optimizer/mod.rs index 9c16dfc..03244f4 100644 --- a/datafusion-federation/src/optimizer/mod.rs +++ b/datafusion-federation/src/optimizer/mod.rs @@ -191,14 +191,22 @@ impl FederationOptimizerRule { return Ok((None, ScanResult::Distinct(provider))); } - let Some(optimizer) = provider.optimizer() else { - // No optimizer provided - return Ok((None, ScanResult::None)); - }; + // Analyze plans (EXPLAIN ANALYZE) cannot be converted to SQL by + // the Unparser, so they must not be federated as a whole. Only the + // inner query should be federated; DataFusion's AnalyzeExec will + // handle executing it and collecting metrics. + if matches!(plan, LogicalPlan::Analyze(_)) { + // Fall through to federate children instead. + } else { + let Some(optimizer) = provider.optimizer() else { + // No optimizer provided + return Ok((None, ScanResult::None)); + }; - // If this is the root plan node; federate the entire plan - let optimized = optimizer.optimize(plan.clone(), _config, |_, _| {})?; - return Ok((Some(optimized), ScanResult::None)); + // If this is the root plan node; federate the entire plan + let optimized = optimizer.optimize(plan.clone(), _config, |_, _| {})?; + return Ok((Some(optimized), ScanResult::None)); + } } // The plan is ambiguous; any input that is not yet optimized and has a diff --git a/datafusion-federation/src/sql/mod.rs b/datafusion-federation/src/sql/mod.rs index d5198c7..60d6143 100644 --- a/datafusion-federation/src/sql/mod.rs +++ b/datafusion-federation/src/sql/mod.rs @@ -766,6 +766,57 @@ mod tests { Ok(()) } + /// EXPLAIN ANALYZE must not federate the Analyze wrapper — only the inner + /// query should be federated. Otherwise the SQL Unparser fails because it + /// cannot convert Analyze to SQL. + #[tokio::test] + async fn explain_analyze_not_federated() -> Result<(), DataFusionError> { + let executor = TestExecutor { + compute_context: "a".into(), + }; + + let table_ref = "test_table".to_string(); + let table = get_test_table_provider(table_ref.clone(), executor); + + let state = crate::default_session_state(); + let ctx = SessionContext::new_with_state(state); + ctx.register_table(table_ref, table).unwrap(); + + let plan = ctx + .sql("EXPLAIN ANALYZE SELECT * FROM test_table") + .await? + .into_optimized_plan()?; + + // The top-level node must be Analyze, not Federated. + assert!( + matches!(plan, LogicalPlan::Analyze(_)), + "Expected Analyze at root, got: {}", + plan.display_indent() + ); + + // The inner plan should contain a Federated extension node. + let mut found_federated = false; + plan.apply(|node| { + if let LogicalPlan::Extension(ext) = node { + if ext.node.name() == "Federated" { + found_federated = true; + return Ok(TreeNodeRecursion::Stop); + } + } + Ok(TreeNodeRecursion::Continue) + })?; + assert!( + found_federated, + "Expected a Federated node inside the Analyze plan" + ); + + // Physical planning should succeed (this is where it used to fail). + let physical_plan = ctx.state().create_physical_plan(&plan).await?; + assert_eq!(physical_plan.name(), "AnalyzeExec"); + + Ok(()) + } + #[tokio::test] async fn sql_query_rewriter_hook_invoked_and_rewrites_sql() -> Result<(), DataFusionError> { let executor = TestExecutor {