feat: distributed EXPLAIN / EXPLAIN ANALYZE via logical extension codec#31
feat: distributed EXPLAIN / EXPLAIN ANALYZE via logical extension codec#31lukekim wants to merge 4 commits into
Conversation
18bee66 to
6bb2bd4
Compare
There was a problem hiding this comment.
Pull request overview
Adds end-to-end distributed support for EXPLAIN FORMAT <fmt> and EXPLAIN ANALYZE in Ballista by preserving lost logical-plan fields across datafusion-proto serialization and returning scheduler-rendered analyze output to the client.
Changes:
- Introduces Ballista logical extension nodes + codec wrapper to round-trip
ExplainFormat/verboseandAnalyzeverbose. - Scheduler unwraps extensions back into native DataFusion
LogicalPlan::{Explain,Analyze}, distributesEXPLAIN, and implements distributedEXPLAIN ANALYZEby returninganalyzed_plan_text. - Client detects
analyzed_plan_textand synthesizes the expectedEXPLAIN ANALYZERecordBatchlocally; adds integration tests for TREE format and ANALYZE(+VERBOSE).
Reviewed changes
Copilot reviewed 11 out of 12 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| ballista/scheduler/src/state/task_manager.rs | Threads optional analyze metadata into the created ExecutionGraph. |
| ballista/scheduler/src/state/mod.rs | Unwraps Ballista logical extensions, implements distributed EXPLAIN formatting handling, and strips top-level Analyze for distributed execution while tracking analyze info. |
| ballista/scheduler/src/state/execution_graph.rs | Tracks analyze jobs and renders per-stage “plan with metrics” text into SuccessfulJob.analyzed_plan_text. |
| ballista/scheduler/src/state/distributed_explain.rs | Adapts distributed explain output rows based on ExplainFormat (Tree vs non-Tree). |
| ballista/core/src/serde/mod.rs | Adds Ballista logical-extension codec wrapper encode/decode and unit tests for round-trips. |
| ballista/core/src/serde/logical_plan_ext.rs | New Ballista UserDefinedLogicalNodeCore wrappers for Explain/Analyze and format-string mapping helpers. |
| ballista/core/src/serde/generated/ballista.rs | Updates generated protobuf types for new logical-extension wrapper and analyzed_plan_text. |
| ballista/core/src/planner.rs | Wraps LogicalPlan::{Explain,Analyze} before sending to scheduler. |
| ballista/core/src/execution_plans/distributed_query.rs | Returns boxed streams and synthesizes EXPLAIN ANALYZE output when analyzed_plan_text is present. |
| ballista/core/proto/datafusion.proto | Adds ExplainFormat + ExplainNode.format to local proto definition. |
| ballista/core/proto/ballista.proto | Adds logical-extension wrapper protos and SuccessfulJob.analyzed_plan_text. |
| ballista/client/tests/context_checks.rs | Adds integration tests for EXPLAIN FORMAT TREE and EXPLAIN ANALYZE (incl. VERBOSE). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Splits the two features into complementary mechanisms:
1. EXPLAIN (including FORMAT TREE)
- New BallistaExplainNode UserDefinedLogicalNode preserves
LogicalPlan::Explain fields (verbose, explain_format) across
the datafusion-proto round trip from client to scheduler.
- BallistaLogicalExtensionCodec encodes/decodes it.
- Scheduler unwraps it back into LogicalPlan::Explain before
physical planning so DataFusion renders the requested format
(Indent, TreeRender, or PostgresJSON) natively.
2. EXPLAIN ANALYZE (based on upstream apache#1567)
- Client planner strips LogicalPlan::Analyze and wraps the inner
plan in DistributedQueryExec, then wraps that in the new
DistributedExplainAnalyzeExec.
- DistributedQueryExec records its job_id after submission so
the parent exec can retrieve it.
- After the child query drains, the wrapper calls a new
SchedulerGrpc.GetJobMetrics RPC that returns structured
per-stage / per-operator metrics.
- Metrics are formatted client-side into the familiar
'Plan with Metrics' single-row RecordBatch.
Tests:
- ballista-core: codec round-trip for BallistaExplainNode and
unit test for format_metrics_as_record_batch.
- ballista client integration: EXPLAIN, EXPLAIN FORMAT TREE,
and sanitized EXPLAIN ANALYZE end-to-end assertions.
6bb2bd4 to
7a80162
Compare
…llista into lukim/explain-analyze-codec
- Remove unused ExplainFormat enum and ExplainNode.format from ballista/core/proto/datafusion.proto. The Rust types at runtime come from datafusion_proto via extern_path, so these additions were dead schema that could mislead external consumers. - BallistaLogicalExtensionCodec::try_decode now falls through to the default codec when BallistaExplainNode::format_from_str cannot parse explain.explain_format, avoiding spurious errors when an unrelated extension payload decodes permissively into the wrapper shape. - Fix pre-existing clippy 'iterating on a map's values' in execution_graph::running_tasks and ExecutorManager terminating heartbeat filter; switch to .values() iterators. - cargo fmt.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 15 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Remove unused verbose flag plumbing from DistributedExplainAnalyzeExec: the flag was never honored by format_metrics_as_record_batch so EXPLAIN ANALYZE VERBOSE rendered identically to plain EXPLAIN ANALYZE. Drop the field/constructor arg, call-site (planner), and unused job_id parameter; pass only the schema. - Fix misleading inline comment in distributed_explain.rs that claimed the FinalLogicalPlan fallback was for 'non-indent formats' when the branch also handles Indent. - Update PR description to match the final implementation (no more BallistaAnalyzeNode / analyzed_plan_text; client-side Analyze stripping via DistributedExplainAnalyzeExec + GetJobMetrics RPC).
|
|
||
| rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {} | ||
|
|
||
| rpc GetJobMetrics (GetJobMetricsParams) returns (GetJobMetricsResult) {} |
There was a problem hiding this comment.
This seems unrelated to the explain changes?
phillipleblanc
left a comment
There was a problem hiding this comment.
This was implemented on the wrong branch, I will re-implement this on spiceai-52.5
…rebased onto spiceai-52.5) (#34) * feat: distributed EXPLAIN, EXPLAIN FORMAT TREE, and EXPLAIN ANALYZE Reimplements PR #31 on top of spiceai-52.5 (DataFusion 52). EXPLAIN Round-trip the ExplainFormat through the client -> scheduler boundary by wrapping the LogicalPlan::Explain in a BallistaExplainNode logical extension before serialization. The scheduler unwraps it back to a native LogicalPlan::Explain so its existing physical-planning intercept can substitute a distributed-aware ExplainExec replacement. EXPLAIN FORMAT TREE Honored end-to-end by threading the ExplainFormat through extract_logical_and_physical_plans and construct_distributed_explain_exec in scheduler/state/distributed_explain.rs (Tree format omits the logical_plan row to match DataFusion's native behavior). EXPLAIN ANALYZE - Client (BallistaQueryPlanner): strips the LogicalPlan::Analyze and runs the inner plan via DistributedQueryExec, wrapped in a new DistributedExplainAnalyzeExec. After the child stream drains, the wrapper publishes the job_id (added Arc<Mutex<Option<String>>> handle on DistributedQueryExec) and calls the scheduler's GetJobMetrics RPC. - Scheduler: new GetJobMetrics RPC walks the execution graph in the same pre-order DFS order as ballista_core::utils::collect_plan_metrics so per-operator metrics line up with the rendered plan text. Falls back from the active-job cache to the saved completed-job graph so the call still succeeds after succeed_job moves the graph out of active_job_cache. Includes ballista/client tests covering all three forms in both standalone and remote modes. * Support for Tree formatting + tests * Fortmatting * More formatting * Add insta snapshots for FORMAT TREE integration tests * Lint * Improve * Lint --------- Co-authored-by: Sergei Grebnov <sergei.grebnov@gmail.com>
Summary
Adds native Ballista support for
EXPLAIN FORMAT <fmt>andEXPLAIN ANALYZEso they execute through the distributed scheduler instead of being quietly lost bydatafusion-proto's defaultExplainNode(which dropsexplain_format) or blocked byAnalyzeExechaving no distributed plan.Approach
Logical extension codec
Two
UserDefinedLogicalNodeCores inballista/core/src/serde/logical_plan_ext.rs:BallistaExplainNode— carriesverbose+ExplainFormat(Indent/Tree/PostgresJSON/Graphviz) + inner plan.BallistaAnalyzeNode— carriesverbose+ inner plan.BallistaLogicalExtensionCodec::try_encode/try_decodeserialize them through a newBallistaLogicalExtensionNode { oneof node { explain, analyze } }proto wrapper, falling back to the default codec for anything else.The client planner wraps
LogicalPlan::Explain/LogicalPlan::Analyzein the matching extension node before buildingDistributedQueryExec. The scheduler unwraps back to the native node insubmit_jobbeforeoptimize()/create_physical_plan. The previous "run FORMAT TREE locally" workaround inplanner.rsis removed.Distributed `EXPLAIN ANALYZE`
For analyze jobs the scheduler:
The client's `DistributedQueryExec`, on seeing `analyzed_plan_text = Some(..)`, skips partition fetching and synthesizes a single-row 2-column `RecordBatch` (`"Plan with Metrics"`, rendered text) matching the `Analyze` output schema.
Test Coverage
Unit (ballista-core `serde::test`):
Integration (ballista-client `context_checks`, standalone + remote):
Results: 54 client integration tests, 51 scheduler tests, 51 core lib tests — all green.