feat: distributed EXPLAIN, EXPLAIN FORMAT TREE, and EXPLAIN ANALYZE (rebased onto spiceai-52.5)#34
Conversation
Reimplements PR #31 on top of spiceai-52.5 (DataFusion 52). EXPLAIN Round-trip the ExplainFormat through the client -> scheduler boundary by wrapping the LogicalPlan::Explain in a BallistaExplainNode logical extension before serialization. The scheduler unwraps it back to a native LogicalPlan::Explain so its existing physical-planning intercept can substitute a distributed-aware ExplainExec replacement. EXPLAIN FORMAT TREE Honored end-to-end by threading the ExplainFormat through extract_logical_and_physical_plans and construct_distributed_explain_exec in scheduler/state/distributed_explain.rs (Tree format omits the logical_plan row to match DataFusion's native behavior). EXPLAIN ANALYZE - Client (BallistaQueryPlanner): strips the LogicalPlan::Analyze and runs the inner plan via DistributedQueryExec, wrapped in a new DistributedExplainAnalyzeExec. After the child stream drains, the wrapper publishes the job_id (added Arc<Mutex<Option<String>>> handle on DistributedQueryExec) and calls the scheduler's GetJobMetrics RPC. - Scheduler: new GetJobMetrics RPC walks the execution graph in the same pre-order DFS order as ballista_core::utils::collect_plan_metrics so per-operator metrics line up with the rendered plan text. Falls back from the active-job cache to the saved completed-job graph so the call still succeeds after succeed_job moves the graph out of active_job_cache. Includes ballista/client tests covering all three forms in both standalone and remote modes.
There was a problem hiding this comment.
Pull request overview
Adds end-to-end distributed support for EXPLAIN, EXPLAIN FORMAT TREE, and EXPLAIN ANALYZE in Ballista (DataFusion 52), ensuring explain-format survives the client→scheduler proto round-trip and enabling scheduler-sourced per-stage/operator metrics for analyze output.
Changes:
- Introduces
BallistaExplainNodelogical extension + codec/proto support to preserveExplainFormatacrossdatafusion-proto, and unwraps it on the scheduler back into nativeLogicalPlan::Explain. - Threads
ExplainFormatthrough distributed explain rendering, matching DataFusion behavior forFORMAT TREE(omitlogical_planrow; tree-renderedphysical_plan). - Adds
GetJobMetricsscheduler gRPC RPC and a client-sideDistributedExplainAnalyzeExecthat fetches and renders per-stage/operator metrics asPlan with Metrics.
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| ballista/scheduler/src/state/task_manager.rs | Makes execution-graph lookup available for gRPC metrics retrieval. |
| ballista/scheduler/src/state/mod.rs | Unwraps BallistaExplainNode, captures ExplainFormat, and threads it into distributed explain exec construction. |
| ballista/scheduler/src/state/distributed_explain.rs | Formats explain output according to ExplainFormat and matches DataFusion’s FORMAT TREE row behavior. |
| ballista/scheduler/src/scheduler_server/grpc.rs | Adds GetJobMetrics RPC and stage/operator metrics serialization using plan traversal. |
| ballista/executor/src/execution_loop.rs | Updates test SchedulerGrpc mock to include the new RPC. |
| ballista/core/src/serde/mod.rs | Extends Ballista logical extension codec to encode/decode BallistaExplainNode via Ballista protobuf. |
| ballista/core/src/serde/generated/ballista.rs | Regenerates protobuf bindings for ExplainNode and GetJobMetrics messages/RPC. |
| ballista/core/src/planner.rs | Wraps EXPLAIN for distribution and implements distributed EXPLAIN ANALYZE via DistributedExplainAnalyzeExec. |
| ballista/core/src/extension.rs | Defines BallistaExplainNode and stable string mapping for explain formats. |
| ballista/core/src/execution_plans/mod.rs | Registers the new distributed_explain_analyze execution plan module. |
| ballista/core/src/execution_plans/distributed_query.rs | Adds a shared job_id handle so parents can query scheduler metrics post-execution. |
| ballista/core/src/execution_plans/distributed_explain_analyze.rs | New exec that runs a distributed job, then fetches and renders metrics as a single-row EXPLAIN ANALYZE result. |
| ballista/core/proto/ballista.proto | Adds LogicalPlanExplainNode and GetJobMetrics RPC + message types. |
| ballista/client/tests/context_checks.rs | Adds integration coverage for EXPLAIN FORMAT TREE and EXPLAIN ANALYZE in standalone + remote modes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
f7eed42 to
a140df6
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 19 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 23 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Reimplements #31 on top of
spiceai-52.5(DataFusion 52). Supersedes #31.What
End-to-end support for the three EXPLAIN forms over distributed Ballista:
EXPLAIN ...— adds adistributed_planrow alongsidelogical_planandphysical_plan.EXPLAIN FORMAT TREE ...— physical plan rendered with box-drawing tree characters;logical_planis omitted to match DataFusion's native behavior.EXPLAIN ANALYZE ...— a singlePlan with Metricsrow containing the per-stage, per-operator metrics gathered from the scheduler after the job completes.How
EXPLAIN/FORMAT TREE—ExplainFormatdoesn't survive adatafusion-protoround-trip, so the client wrapsLogicalPlan::Explainin a newBallistaExplainNodelogical extension before sending to the scheduler. The scheduler unwraps it back into a nativeLogicalPlan::Explain(unwrap_ballista_explaininstate/mod.rs) and then its existing physical-planning intercept replacesExplainExecwith a distributed-aware variant.extract_logical_and_physical_plansandconstruct_distributed_explain_execare threaded with&ExplainFormatso Tree/Indent/PgJson/Graphviz are honored.EXPLAIN ANALYZE—BallistaQueryPlanner): stripsLogicalPlan::Analyze, runs the inner plan viaDistributedQueryExec, and wraps it in a newDistributedExplainAnalyzeExec.DistributedQueryExecgains anArc<Mutex<Option<String>>>job_idhandle that is populated when the job is submitted; the parent reads it after the child stream drains and calls the newGetJobMetricsRPC.GetJobMetricsRPC (proto:GetJobMetricsParams/Result,JobStageMetrics,OperatorWithMetrics,OperatorMetric) walks the execution graph in the same pre-order DFS order asballista_core::utils::collect_plan_metricsso the per-operator metrics line up with the rendered plan text. Falls back from the active-job cache to the saved completed-job graph (get_job_execution_graph) so the call still succeeds aftersucceed_jobmoves the graph out ofactive_job_cache.Tests
ballista/client/tests/context_checks.rs:should_execute_explain_query_correctly(was already there)should_execute_explain_format_tree_query_correctly— checks two-row result (nological_plan) and box-drawing characters in physical_plan.should_execute_explain_analyze_query— sanitizes metric values to...and checks both stages, bothShuffleWriterExecandAggregateExec, and that per-operatormetrics=[...]appear.All three cases run in both
standaloneandremotemodes (theremotemode goes over the gRPC scheduler, exercising the codec round-trip and theGetJobMetricsRPC):Also manually validated against a local distributed cluster (scheduler + executor with mTLS) by issuing the queries through Spice's HTTP API:
EXPLAINreturns the expected three rows.EXPLAIN FORMAT TREEreturns two rows with tree-formatted physical plan.Notes / follow-ups
BallistaQueryPlanner) path. Spice's/v1/queriesasync path submits the rawLogicalPlan::Analyzedirectly viascheduler.submit_job, bypassingBallistaQueryPlanner, and the scheduler currently only interceptsExplain, notAnalyze. A follow-up is needed in either Spice or in this schedulersubmit_jobpath to getEXPLAIN ANALYZEworking over/v1/queries.