Skip to content

Commit 6599d31

Browse files
committed
feat: Add EXPLAIN and EXPLAIN ANALYZE support to federated queries
When federating EXPLAIN or EXPLAIN ANALYZE queries, the SQL generator now correctly prefixes the unparsed federated query with EXPLAIN or EXPLAIN ANALYZE respectively. This ensures that remote databases receive the full query intent including the EXPLAIN directive. Changes: - Detect LogicalPlan::Explain and LogicalPlan::Analyze variants in VirtualExecutionPlan::final_sql() - Extract the inner plan and apply all rewrites and optimizations - Prefix the final SQL with 'EXPLAIN ' or 'EXPLAIN ANALYZE ' - Extract common rewrite logic into rewrite_plan_to_sql() helper - Add test cases for both EXPLAIN and EXPLAIN ANALYZE queries - Remove unused sources/sql/src/lib.rs (not wired into workspace)
1 parent d653c52 commit 6599d31

2 files changed

Lines changed: 96 additions & 1005 deletions

File tree

datafusion-federation/src/sql/mod.rs

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,21 @@ impl VirtualExecutionPlan {
205205
}
206206

207207
fn final_sql(&self) -> Result<String> {
208-
let plan = self.plan.clone();
208+
// Check if this is an EXPLAIN or EXPLAIN ANALYZE query
209+
if let LogicalPlan::Explain(explain) = &self.plan {
210+
let plan = explain.plan.as_ref().clone();
211+
let sql = self.rewrite_plan_to_sql(plan)?;
212+
Ok(format!("EXPLAIN {sql}"))
213+
} else if let LogicalPlan::Analyze(analyze) = &self.plan {
214+
let plan = analyze.input.as_ref().clone();
215+
let sql = self.rewrite_plan_to_sql(plan)?;
216+
Ok(format!("EXPLAIN ANALYZE {sql}"))
217+
} else {
218+
self.rewrite_plan_to_sql(self.plan.clone())
219+
}
220+
}
221+
222+
fn rewrite_plan_to_sql(&self, plan: LogicalPlan) -> Result<String> {
209223
let plan = RewriteTableScanAnalyzer::rewrite(plan)?;
210224
let (logical_optimizers, ast_analyzers, sql_query_rewriters) = gather_analyzers(&plan)?;
211225
let plan = apply_logical_optimizers(plan, logical_optimizers)?;
@@ -456,6 +470,7 @@ mod tests {
456470
};
457471
use crate::FederatedTableProviderAdaptor;
458472
use async_trait::async_trait;
473+
use datafusion::arrow::array::Array;
459474
use datafusion::arrow::datatypes::{Schema, SchemaRef};
460475
use datafusion::common::tree_node::TreeNodeRecursion;
461476
use datafusion::execution::SendableRecordBatchStream;
@@ -816,4 +831,84 @@ mod tests {
816831

817832
Ok(())
818833
}
834+
835+
#[tokio::test]
836+
async fn explain_federation_test() -> Result<(), DataFusionError> {
837+
// Plain EXPLAIN in DataFusion produces string descriptions via
838+
// ExplainExec rather than executing the physical plan. Verify
839+
// that the EXPLAIN output shows the VirtualExecutionPlan and
840+
// its SQL in the stringified plan.
841+
let test_executor = TestExecutor {
842+
compute_context: "test".into(),
843+
};
844+
845+
let table_ref = "test_table".to_string();
846+
let table = get_test_table_provider(table_ref.clone(), test_executor);
847+
848+
let state = crate::default_session_state();
849+
let ctx = SessionContext::new_with_state(state);
850+
ctx.register_table(table_ref, table).unwrap();
851+
852+
let query = "EXPLAIN SELECT * FROM test_table";
853+
let batches = ctx.sql(query).await?.collect().await?;
854+
let explain_output: Vec<String> = batches
855+
.iter()
856+
.flat_map(|b| {
857+
let col = b
858+
.column(1)
859+
.as_any()
860+
.downcast_ref::<datafusion::arrow::array::StringArray>()
861+
.unwrap();
862+
(0..col.len()).map(move |i| col.value(i).to_string())
863+
})
864+
.collect();
865+
866+
let combined = explain_output.join("\n");
867+
assert!(
868+
combined.contains("VirtualExecutionPlan"),
869+
"Expected VirtualExecutionPlan in EXPLAIN output, got:\n{combined}",
870+
);
871+
872+
Ok(())
873+
}
874+
875+
#[tokio::test]
876+
async fn explain_analyze_federation_test() -> Result<(), DataFusionError> {
877+
let test_executor = TestExecutor {
878+
compute_context: "test".into(),
879+
};
880+
881+
let table_ref = "test_table".to_string();
882+
let table = get_test_table_provider(table_ref.clone(), test_executor);
883+
884+
let state = crate::default_session_state();
885+
let ctx = SessionContext::new_with_state(state);
886+
ctx.register_table(table_ref, table).unwrap();
887+
888+
let query = "EXPLAIN ANALYZE SELECT * FROM test_table";
889+
let df = ctx.sql(query).await?;
890+
let logical_plan = df.into_optimized_plan()?;
891+
let physical_plan = ctx.state().create_physical_plan(&logical_plan).await?;
892+
893+
let mut final_queries = vec![];
894+
let _ = physical_plan.apply(|node| {
895+
if node.name() == "sql_federation_exec" {
896+
let vep = node
897+
.as_any()
898+
.downcast_ref::<VirtualExecutionPlan>()
899+
.unwrap();
900+
final_queries.push(vep.final_sql()?);
901+
}
902+
Ok(TreeNodeRecursion::Continue)
903+
});
904+
905+
assert_eq!(final_queries.len(), 1);
906+
assert!(
907+
final_queries[0].starts_with("EXPLAIN ANALYZE "),
908+
"Expected EXPLAIN ANALYZE prefix, got: {}",
909+
final_queries[0]
910+
);
911+
912+
Ok(())
913+
}
819914
}

0 commit comments

Comments
 (0)