Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion datafusion-federation/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,16 @@ impl FederationAnalyzerRule {

// If all sources are federated to the same provider
if let ScanResult::Distinct(provider) = sole_provider {
match (is_root, provider.analyzer(plan)) {
// Analyze plans (EXPLAIN ANALYZE) cannot be converted to SQL by the
// Unparser, so they must not be federated as a whole. Only the inner
// query should be federated; DataFusion's AnalyzeExec will handle
// executing it and collecting metrics.
let provider_analyzer = if matches!(plan, LogicalPlan::Analyze(_)) {
None
} else {
provider.analyzer(plan)
};
match (is_root, provider_analyzer) {
(false, Some(_)) => {
// The largest sub-plan is higher up.
return Ok((None, ScanResult::Distinct(provider)));
Expand Down
54 changes: 54 additions & 0 deletions datafusion-federation/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,60 @@ mod tests {
Ok(())
}

/// EXPLAIN ANALYZE must not federate the Analyze wrapper — only the inner
/// query should be federated. Otherwise the SQL Unparser fails because it
/// cannot convert Analyze to SQL.
#[tokio::test]
async fn explain_analyze_not_federated() -> Result<(), DataFusionError> {
let executor = TestExecutor {
compute_context: "a".into(),
cannot_federate: None,
};

let table_ref = "test_table".to_string();
let table = get_test_table_provider(table_ref.clone(), executor);

let state = crate::default_session_state();
let ctx = SessionContext::new_with_state(state);
ctx.register_table(table_ref, table).unwrap();

// EXPLAIN ANALYZE wraps the query in LogicalPlan::Analyze.
// The federation analyzer must NOT wrap the Analyze node itself.
let plan = ctx
.sql("EXPLAIN ANALYZE SELECT * FROM test_table")
.await?
.into_optimized_plan()?;

// The top-level node must be Analyze, not Federated.
assert!(
matches!(plan, LogicalPlan::Analyze(_)),
"Expected Analyze at root, got: {}",
plan.display_indent()
);

// The inner plan should contain a Federated extension node.
let mut found_federated = false;
plan.apply(|node| {
if let LogicalPlan::Extension(ext) = node {
if ext.node.name() == "Federated" {
found_federated = true;
return Ok(TreeNodeRecursion::Stop);
}
}
Ok(TreeNodeRecursion::Continue)
})?;
assert!(
found_federated,
"Expected a Federated node inside the Analyze plan"
);

// Physical planning should succeed (this is where it used to fail).
let physical_plan = ctx.state().create_physical_plan(&plan).await?;
assert_eq!(physical_plan.name(), "AnalyzeExec");

Ok(())
}

#[tokio::test]
async fn sql_query_rewriter_hook_invoked_and_rewrites_sql() -> Result<(), DataFusionError> {
let executor = TestExecutor {
Expand Down
Loading