Skip to content

Commit 6bb2bd4

Browse files
committed
feat: distributed EXPLAIN / EXPLAIN ANALYZE via logical extension codec
Add Ballista-specific logical extension nodes (BallistaExplainNode, BallistaAnalyzeNode) so EXPLAIN/EXPLAIN ANALYZE survive the round-trip through the scheduler. The client planner wraps LogicalPlan::Explain / LogicalPlan::Analyze in the matching extension node before sending to the scheduler. The scheduler unwraps it back to the native node for optimization and physical planning. This preserves ExplainFormat (Indent/Tree/ PostgresJSON/Graphviz) and the analyze verbose flag, which were lost by the default datafusion-proto ExplainNode. For EXPLAIN ANALYZE, the scheduler strips the Analyze wrapper from the logical plan, runs the inner plan as a regular distributed job, and on completion renders per-stage metrics via DisplayableBallistaExecutionPlan into a new SuccessfulJob.analyzed_plan_text proto field. The client synthesizes the Analyze output locally from that string instead of fetching partitions. Tests: - 3 codec round-trip unit tests in ballista-core (all ExplainFormat variants, verbose flags, stable format-string encoding). - 4 integration tests in ballista-client (EXPLAIN, EXPLAIN FORMAT TREE, EXPLAIN ANALYZE, EXPLAIN ANALYZE VERBOSE) in standalone + remote modes, asserting metrics=[] and output_rows= appear in analyze output.
1 parent 62c381a commit 6bb2bd4

12 files changed

Lines changed: 1061 additions & 32 deletions

File tree

ballista/client/tests/context_checks.rs

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,4 +1077,169 @@ mod supported {
10771077
];
10781078
assert_batches_eq!(expected, &result);
10791079
}
1080+
1081+
#[rstest]
1082+
#[case::standalone(standalone_context())]
1083+
#[case::remote(remote_context())]
1084+
#[tokio::test]
1085+
async fn should_execute_explain_format_tree_query_correctly(
1086+
#[future(awt)]
1087+
#[case]
1088+
ctx: SessionContext,
1089+
) {
1090+
let result = ctx
1091+
.sql("EXPLAIN FORMAT TREE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id")
1092+
.await
1093+
.unwrap()
1094+
.collect()
1095+
.await
1096+
.unwrap();
1097+
1098+
// With the Ballista logical extension codec, FORMAT TREE round-trips
1099+
// through the distributed scheduler. The result contains both the
1100+
// tree-rendered physical_plan and the Ballista distributed_plan.
1101+
assert_eq!(result.len(), 1);
1102+
let batch = &result[0];
1103+
1104+
// Verify we have 2 columns: plan_type and plan
1105+
assert_eq!(batch.num_columns(), 2);
1106+
// Tree format: physical_plan and distributed_plan (no logical_plan)
1107+
assert_eq!(batch.column(0).len(), 2);
1108+
1109+
// Verify the plan_type column contains the expected values
1110+
let plan_type_col = batch.column(0);
1111+
let plan_type_arr = plan_type_col
1112+
.as_any()
1113+
.downcast_ref::<datafusion::arrow::array::StringArray>()
1114+
.unwrap();
1115+
1116+
assert_eq!(plan_type_arr.value(0), "physical_plan");
1117+
assert_eq!(plan_type_arr.value(1), "distributed_plan");
1118+
1119+
// Verify physical_plan is in tree format (contains box characters)
1120+
let plan_col = batch.column(1);
1121+
let plan_arr = plan_col
1122+
.as_any()
1123+
.downcast_ref::<datafusion::arrow::array::StringArray>()
1124+
.unwrap();
1125+
1126+
let physical_plan_txt = plan_arr.value(0);
1127+
// Tree format uses box drawing characters like ┌, ─, ┐, │, └, ┘, ┬, ┴, etc.
1128+
assert!(
1129+
physical_plan_txt.contains('┌') || physical_plan_txt.contains('│'),
1130+
"Expected tree format with box characters in physical_plan, got: {}",
1131+
physical_plan_txt
1132+
);
1133+
1134+
// Verify distributed_plan is present and non-empty
1135+
let distributed_plan_txt = plan_arr.value(1);
1136+
assert!(
1137+
!distributed_plan_txt.is_empty(),
1138+
"Expected non-empty distributed_plan"
1139+
);
1140+
}
1141+
1142+
#[rstest]
1143+
#[case::standalone(standalone_context())]
1144+
#[case::remote(remote_context())]
1145+
#[tokio::test]
1146+
async fn should_execute_explain_analyze_query_correctly(
1147+
#[future(awt)]
1148+
#[case]
1149+
ctx: SessionContext,
1150+
) {
1151+
let result = ctx
1152+
.sql("EXPLAIN ANALYZE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id")
1153+
.await
1154+
.unwrap()
1155+
.collect()
1156+
.await
1157+
.unwrap();
1158+
1159+
assert_eq!(result.len(), 1);
1160+
let batch = &result[0];
1161+
1162+
// Two columns: plan_type and plan; single row with the rendered
1163+
// annotated plan text produced by the scheduler.
1164+
assert_eq!(batch.num_columns(), 2);
1165+
assert_eq!(batch.column(0).len(), 1);
1166+
1167+
let plan_type_col = batch
1168+
.column(0)
1169+
.as_any()
1170+
.downcast_ref::<datafusion::arrow::array::StringArray>()
1171+
.unwrap();
1172+
let plan_col = batch
1173+
.column(1)
1174+
.as_any()
1175+
.downcast_ref::<datafusion::arrow::array::StringArray>()
1176+
.unwrap();
1177+
1178+
assert_eq!(plan_type_col.value(0), "Plan with Metrics");
1179+
1180+
let plan_txt = plan_col.value(0);
1181+
assert!(
1182+
!plan_txt.is_empty(),
1183+
"Expected non-empty Plan with Metrics output"
1184+
);
1185+
assert!(
1186+
plan_txt.contains("Stage[stage_id="),
1187+
"Expected stage header in EXPLAIN ANALYZE output, got: {}",
1188+
plan_txt
1189+
);
1190+
// The per-stage rendering uses the Ballista indent visitor which
1191+
// always emits a `metrics=[...]` suffix on each operator line.
1192+
assert!(
1193+
plan_txt.contains("metrics=["),
1194+
"Expected metrics=[ in EXPLAIN ANALYZE output, got: {}",
1195+
plan_txt
1196+
);
1197+
// Aggregation in the query should produce a real output_rows metric.
1198+
assert!(
1199+
plan_txt.contains("output_rows="),
1200+
"Expected output_rows= in EXPLAIN ANALYZE output, got: {}",
1201+
plan_txt
1202+
);
1203+
}
1204+
1205+
#[rstest]
1206+
#[case::standalone(standalone_context())]
1207+
#[case::remote(remote_context())]
1208+
#[tokio::test]
1209+
async fn should_execute_explain_analyze_verbose_query_correctly(
1210+
#[future(awt)]
1211+
#[case]
1212+
ctx: SessionContext,
1213+
) {
1214+
// VERBOSE must be accepted and propagate end-to-end. The rendered
1215+
// output shape is the same as plain EXPLAIN ANALYZE.
1216+
let result = ctx
1217+
.sql("EXPLAIN ANALYZE VERBOSE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id")
1218+
.await
1219+
.unwrap()
1220+
.collect()
1221+
.await
1222+
.unwrap();
1223+
1224+
assert_eq!(result.len(), 1);
1225+
let batch = &result[0];
1226+
assert_eq!(batch.num_columns(), 2);
1227+
assert_eq!(batch.column(0).len(), 1);
1228+
1229+
let plan_type_col = batch
1230+
.column(0)
1231+
.as_any()
1232+
.downcast_ref::<datafusion::arrow::array::StringArray>()
1233+
.unwrap();
1234+
let plan_col = batch
1235+
.column(1)
1236+
.as_any()
1237+
.downcast_ref::<datafusion::arrow::array::StringArray>()
1238+
.unwrap();
1239+
1240+
assert_eq!(plan_type_col.value(0), "Plan with Metrics");
1241+
let plan_txt = plan_col.value(0);
1242+
assert!(plan_txt.contains("Stage[stage_id="));
1243+
assert!(plan_txt.contains("metrics=["));
1244+
}
10801245
}

ballista/core/proto/ballista.proto

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,35 @@ message BallistaPhysicalPlanNode {
3838
}
3939
}
4040

41+
///////////////////////////////////////////////////////////////////////////////////////////////////
42+
// Ballista Logical Plan extensions
43+
///////////////////////////////////////////////////////////////////////////////////////////////////
44+
45+
// Ballista wrapper around datafusion Explain logical plan node.
46+
// Used to preserve `explain_format` across the client -> scheduler boundary
47+
// because the datafusion-proto `ExplainNode` does not encode that field.
48+
message BallistaExplainNode {
49+
bool verbose = 1;
50+
// One of: "indent", "tree", "pgjson", "graphviz"
51+
string explain_format = 2;
52+
}
53+
54+
// Ballista wrapper around datafusion Analyze logical plan node.
55+
// Reserved for carrying distributed-analyze specific flags forward.
56+
message BallistaAnalyzeNode {
57+
bool verbose = 1;
58+
}
59+
60+
// Discriminating wrapper encoded as the payload of a Ballista logical
61+
// extension node. Allows a single codec entry point to route to the
62+
// appropriate Ballista logical extension type.
63+
message BallistaLogicalExtensionNode {
64+
oneof node {
65+
BallistaExplainNode explain = 1;
66+
BallistaAnalyzeNode analyze = 2;
67+
}
68+
}
69+
4170
message ShuffleWriterExecNode {
4271
//TODO it seems redundant to provide job and stage id here since we also have them
4372
// in the TaskDefinition that wraps this plan
@@ -611,6 +640,12 @@ message SuccessfulJob {
611640
uint64 queued_at = 2;
612641
uint64 started_at = 3;
613642
uint64 ended_at = 4;
643+
// Set when the original query was `EXPLAIN ANALYZE`. Contains the
644+
// rendered annotated plan text produced by the scheduler from the
645+
// per-stage metrics collected during distributed execution. When
646+
// present, the client synthesizes a 2-column (plan_type, plan) output
647+
// batch from this text instead of fetching partition data.
648+
optional string analyzed_plan_text = 5;
614649
}
615650

616651
message QueuedJob {

ballista/core/proto/datafusion.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,19 @@ message AnalyzeNode {
223223
bool verbose = 2;
224224
}
225225

226+
// Format options for EXPLAIN output
227+
enum ExplainFormat {
228+
EXPLAIN_FORMAT_UNSPECIFIED = 0;
229+
EXPLAIN_FORMAT_INDENT = 1;
230+
EXPLAIN_FORMAT_TREE = 2;
231+
EXPLAIN_FORMAT_POSTGRES_JSON = 3;
232+
EXPLAIN_FORMAT_GRAPHVIZ = 4;
233+
}
234+
226235
message ExplainNode {
227236
LogicalPlanNode input = 1;
228237
bool verbose = 2;
238+
ExplainFormat format = 3;
229239
}
230240

231241
message AggregateNode {

ballista/core/src/execution_plans/distributed_query.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use datafusion::physical_plan::{
4848
use datafusion_proto::logical_plan::{
4949
AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
5050
};
51-
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
51+
use futures::{StreamExt, TryFutureExt, TryStreamExt};
5252
use log::{debug, error, info};
5353
use std::any::Any;
5454
use std::fmt::Debug;
@@ -271,6 +271,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
271271
customize_endpoint,
272272
use_tls,
273273
result_fetch_callback,
274+
self.schema(),
274275
)
275276
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
276277
)
@@ -314,7 +315,8 @@ async fn execute_query(
314315
customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,
315316
use_tls: bool,
316317
result_fetch_callback: Option<Arc<dyn ResultFetchMetricsCallback>>,
317-
) -> Result<impl Stream<Item = Result<RecordBatch>> + Send> {
318+
output_schema: SchemaRef,
319+
) -> Result<futures::stream::BoxStream<'static, Result<RecordBatch>>> {
318320
// Capture query submission time for total_query_time_ms
319321
let query_start_time = std::time::Instant::now();
320322

@@ -409,7 +411,7 @@ async fn execute_query(
409411
started_at,
410412
ended_at,
411413
partition_location,
412-
..
414+
analyzed_plan_text,
413415
})) => {
414416
// Calculate job execution time (server-side execution)
415417
let job_execution_ms = ended_at.saturating_sub(started_at);
@@ -442,6 +444,14 @@ async fn execute_query(
442444
// happens lazily when the stream is consumed, not during execute_query.
443445
// This could be added in a future enhancement by wrapping the stream.
444446

447+
// If the server reports an EXPLAIN ANALYZE result, synthesize the
448+
// output locally using the scheduler-rendered plan text. We skip
449+
// partition fetching entirely in this case.
450+
if let Some(text) = analyzed_plan_text {
451+
let batch = build_analyze_record_batch(&output_schema, text)?;
452+
break Ok(futures::stream::iter(vec![Ok(batch)]).boxed());
453+
}
454+
445455
let streams = partition_location.into_iter().map(move |partition| {
446456
let callback = result_fetch_callback.clone();
447457
let f = fetch_partition(
@@ -457,12 +467,34 @@ async fn execute_query(
457467
futures::stream::once(f).try_flatten()
458468
});
459469

460-
break Ok(futures::stream::iter(streams).flatten());
470+
break Ok(futures::stream::iter(streams).flatten().boxed());
461471
}
462472
};
463473
}
464474
}
465475

476+
/// Construct the single-row RecordBatch returned by a distributed
477+
/// `EXPLAIN ANALYZE` statement. The schema is expected to match the
478+
/// `LogicalPlan::Analyze` output schema (`plan_type`, `plan`).
479+
fn build_analyze_record_batch(
480+
schema: &SchemaRef,
481+
text: String,
482+
) -> Result<RecordBatch, DataFusionError> {
483+
use datafusion::arrow::array::StringArray;
484+
485+
if schema.fields().len() != 2 {
486+
return Err(DataFusionError::Internal(format!(
487+
"expected EXPLAIN ANALYZE schema to have 2 columns, got {}",
488+
schema.fields().len()
489+
)));
490+
}
491+
492+
let plan_type = Arc::new(StringArray::from(vec!["Plan with Metrics"]));
493+
let plan = Arc::new(StringArray::from(vec![text]));
494+
RecordBatch::try_new(schema.clone(), vec![plan_type, plan])
495+
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
496+
}
497+
466498
async fn fetch_partition(
467499
location: PartitionLocation,
468500
max_message_size: usize,

ballista/core/src/planner.rs

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
use crate::config::BallistaConfig;
1919
use crate::execution_plans::DistributedQueryExec;
2020
use crate::serde::BallistaLogicalExtensionCodec;
21+
use crate::serde::logical_plan_ext::{BallistaAnalyzeNode, BallistaExplainNode};
2122

2223
use async_trait::async_trait;
2324
use datafusion::arrow::datatypes::Schema;
2425
use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor};
2526
use datafusion::error::DataFusionError;
2627
use datafusion::execution::context::{QueryPlanner, SessionState};
27-
use datafusion::logical_expr::{LogicalPlan, TableScan};
28+
use datafusion::logical_expr::{Extension, LogicalPlan, TableScan};
2829
use datafusion::physical_plan::ExecutionPlan;
2930
use datafusion::physical_plan::empty::EmptyExec;
3031
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
@@ -131,10 +132,17 @@ impl<T: 'static + AsLogicalPlan> QueryPlanner for BallistaQueryPlanner<T> {
131132
_ => {
132133
log::debug!("create_physical_plan - handling general statement");
133134

135+
// For EXPLAIN / EXPLAIN ANALYZE, wrap the plan in a Ballista
136+
// logical extension so fields (like `explain_format`) survive
137+
// the client -> scheduler serialization round-trip. The
138+
// scheduler unwraps these before physical planning.
139+
let plan_to_send =
140+
wrap_explain_analyze_for_distribution(logical_plan);
141+
134142
Ok(Arc::new(DistributedQueryExec::<T>::with_extension(
135143
self.scheduler_url.clone(),
136144
self.config.clone(),
137-
logical_plan.clone(),
145+
plan_to_send,
138146
self.extension_codec.clone(),
139147
session_state.session_id().to_string(),
140148
)))
@@ -144,6 +152,37 @@ impl<T: 'static + AsLogicalPlan> QueryPlanner for BallistaQueryPlanner<T> {
144152
}
145153
}
146154

155+
/// Wrap `LogicalPlan::Explain` or `LogicalPlan::Analyze` in a Ballista
156+
/// logical extension node so that fields such as `explain_format` survive
157+
/// serialization to the scheduler via `datafusion-proto`. Other plans are
158+
/// returned unchanged.
159+
fn wrap_explain_analyze_for_distribution(plan: &LogicalPlan) -> LogicalPlan {
160+
match plan {
161+
LogicalPlan::Explain(explain) => {
162+
let node = BallistaExplainNode {
163+
verbose: explain.verbose,
164+
explain_format: explain.explain_format.clone(),
165+
plan: explain.plan.clone(),
166+
schema: explain.schema.clone(),
167+
};
168+
LogicalPlan::Extension(Extension {
169+
node: Arc::new(node),
170+
})
171+
}
172+
LogicalPlan::Analyze(analyze) => {
173+
let node = BallistaAnalyzeNode {
174+
verbose: analyze.verbose,
175+
input: analyze.input.clone(),
176+
schema: analyze.schema.clone(),
177+
};
178+
LogicalPlan::Extension(Extension {
179+
node: Arc::new(node),
180+
})
181+
}
182+
_ => plan.clone(),
183+
}
184+
}
185+
147186
/// A Visitor which detect if query is using local tables,
148187
/// such as tables located in `information_schema` and returns true
149188
/// only if all scans are in from local tables

0 commit comments

Comments
 (0)