Skip to content

Commit fd05313

Browse files
feat: distributed EXPLAIN, EXPLAIN FORMAT TREE, and EXPLAIN ANALYZE
Reimplements PR #31 on top of spiceai-52.5 (DataFusion 52). EXPLAIN Round-trip the ExplainFormat through the client -> scheduler boundary by wrapping the LogicalPlan::Explain in a BallistaExplainNode logical extension before serialization. The scheduler unwraps it back to a native LogicalPlan::Explain so its existing physical-planning intercept can substitute a distributed-aware ExplainExec replacement. EXPLAIN FORMAT TREE Honored end-to-end by threading the ExplainFormat through extract_logical_and_physical_plans and construct_distributed_explain_exec in scheduler/state/distributed_explain.rs (Tree format omits the logical_plan row to match DataFusion's native behavior). EXPLAIN ANALYZE - Client (BallistaQueryPlanner): strips the LogicalPlan::Analyze and runs the inner plan via DistributedQueryExec, wrapped in a new DistributedExplainAnalyzeExec. After the child stream drains, the wrapper publishes the job_id (added Arc<Mutex<Option<String>>> handle on DistributedQueryExec) and calls the scheduler's GetJobMetrics RPC. - Scheduler: new GetJobMetrics RPC walks the execution graph in the same pre-order DFS order as ballista_core::utils::collect_plan_metrics so per-operator metrics line up with the rendered plan text. Falls back from the active-job cache to the saved completed-job graph so the call still succeeds after succeed_job moves the graph out of active_job_cache. Includes ballista/client tests covering all three forms in both standalone and remote modes.
1 parent 8bc4d75 commit fd05313

14 files changed

Lines changed: 1363 additions & 34 deletions

File tree

ballista/client/tests/context_checks.rs

Lines changed: 158 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ mod supported {
2424
standalone_context_with_state,
2525
};
2626
use ballista_core::config::BallistaConfig;
27-
27+
use datafusion::arrow::array::StringArray;
28+
use datafusion::arrow::array::Array;
29+
use datafusion::arrow::record_batch::RecordBatch;
2830
use datafusion::physical_plan::collect;
2931
use datafusion::prelude::*;
3032
use datafusion::{assert_batches_eq, prelude::SessionContext};
3133
use rstest::*;
34+
use std::sync::Arc;
3235
use std::path::PathBuf;
3336

3437
#[rstest::fixture]
@@ -1105,4 +1108,158 @@ mod supported {
11051108

11061109
Ok(())
11071110
}
1111+
1112+
#[rstest]
1113+
#[case::standalone(standalone_context())]
1114+
#[case::remote(remote_context())]
1115+
#[tokio::test]
1116+
async fn should_execute_explain_format_tree_query_correctly(
1117+
#[future(awt)]
1118+
#[case]
1119+
ctx: SessionContext,
1120+
) {
1121+
let result = ctx
1122+
.sql("EXPLAIN FORMAT TREE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id")
1123+
.await
1124+
.unwrap()
1125+
.collect()
1126+
.await
1127+
.unwrap();
1128+
1129+
// With the Ballista logical extension codec, FORMAT TREE round-trips
1130+
// through the distributed scheduler. The result contains both the
1131+
// tree-rendered physical_plan and the Ballista distributed_plan.
1132+
assert_eq!(result.len(), 1);
1133+
let batch = &result[0];
1134+
assert_eq!(batch.num_columns(), 2);
1135+
// Tree format: physical_plan and distributed_plan (no logical_plan)
1136+
assert_eq!(batch.column(0).len(), 2);
1137+
1138+
let plan_type_arr = batch
1139+
.column(0)
1140+
.as_any()
1141+
.downcast_ref::<StringArray>()
1142+
.unwrap();
1143+
assert_eq!(plan_type_arr.value(0), "physical_plan");
1144+
assert_eq!(plan_type_arr.value(1), "distributed_plan");
1145+
1146+
let plan_arr = batch
1147+
.column(1)
1148+
.as_any()
1149+
.downcast_ref::<StringArray>()
1150+
.unwrap();
1151+
1152+
let physical_plan_txt = plan_arr.value(0);
1153+
// Tree format uses box drawing characters like ┌, ─, ┐, │, └, ┘.
1154+
assert!(
1155+
physical_plan_txt.contains('┌') || physical_plan_txt.contains('│'),
1156+
"Expected tree format with box characters in physical_plan, got: {physical_plan_txt}"
1157+
);
1158+
1159+
let distributed_plan_txt = plan_arr.value(1);
1160+
assert!(
1161+
!distributed_plan_txt.is_empty(),
1162+
"Expected non-empty distributed_plan"
1163+
);
1164+
}
1165+
1166+
#[rstest]
1167+
#[case::standalone(standalone_context())]
1168+
#[case::remote(remote_context())]
1169+
#[tokio::test]
1170+
async fn should_execute_explain_analyze_query(
1171+
#[future(awt)]
1172+
#[case]
1173+
ctx: SessionContext,
1174+
) -> datafusion::error::Result<()> {
1175+
let result = ctx
1176+
.sql(
1177+
"EXPLAIN ANALYZE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id",
1178+
)
1179+
.await?
1180+
.collect()
1181+
.await?;
1182+
1183+
// Replace metric values with "..." so the assertion isn't sensitive to
1184+
// varying timings/byte counts.
1185+
let sanitized_plan_text = result[0]
1186+
.column(1)
1187+
.as_any()
1188+
.downcast_ref::<StringArray>()
1189+
.unwrap()
1190+
.value(0)
1191+
.lines()
1192+
.map(|line| {
1193+
if let Some(index) = line.find("metrics=[") {
1194+
let prefix = &line[..index];
1195+
let metrics = &line[index + "metrics=[".len()..];
1196+
let sanitized_metrics = metrics.strip_suffix(']').map_or_else(
1197+
|| "...".to_string(),
1198+
|body| {
1199+
body.split(", ")
1200+
.map(|metric| {
1201+
metric.split_once('=').map_or_else(
1202+
|| "...".to_string(),
1203+
|(name, _)| format!("{name}=..."),
1204+
)
1205+
})
1206+
.collect::<Vec<_>>()
1207+
.join(", ")
1208+
},
1209+
);
1210+
format!("{prefix}metrics=[{sanitized_metrics}]")
1211+
} else {
1212+
line.to_string()
1213+
}
1214+
})
1215+
.collect::<Vec<_>>()
1216+
.join("\n");
1217+
1218+
let sanitized = RecordBatch::try_new(
1219+
result[0].schema(),
1220+
vec![
1221+
result[0].column(0).clone(),
1222+
Arc::new(StringArray::from(vec![sanitized_plan_text])),
1223+
],
1224+
)?;
1225+
1226+
// Loose assertions rather than a frozen golden snapshot, since the
1227+
// exact metric set per operator can change between DataFusion releases.
1228+
let plan_type_arr = sanitized
1229+
.column(0)
1230+
.as_any()
1231+
.downcast_ref::<StringArray>()
1232+
.unwrap();
1233+
assert_eq!(plan_type_arr.len(), 1);
1234+
assert_eq!(plan_type_arr.value(0), "Plan with Metrics");
1235+
1236+
let plan_arr = sanitized
1237+
.column(1)
1238+
.as_any()
1239+
.downcast_ref::<StringArray>()
1240+
.unwrap();
1241+
let plan_text = plan_arr.value(0);
1242+
assert!(
1243+
plan_text.contains("SuccessfulStage[stage_id=1"),
1244+
"expected stage 1 in plan, got:\n{plan_text}"
1245+
);
1246+
assert!(
1247+
plan_text.contains("SuccessfulStage[stage_id=2"),
1248+
"expected stage 2 in plan, got:\n{plan_text}"
1249+
);
1250+
assert!(
1251+
plan_text.contains("ShuffleWriterExec"),
1252+
"expected ShuffleWriterExec in plan, got:\n{plan_text}"
1253+
);
1254+
assert!(
1255+
plan_text.contains("AggregateExec"),
1256+
"expected AggregateExec in plan, got:\n{plan_text}"
1257+
);
1258+
assert!(
1259+
plan_text.contains("metrics=["),
1260+
"expected per-operator metrics in plan, got:\n{plan_text}"
1261+
);
1262+
1263+
Ok(())
1264+
}
11081265
}

ballista/core/proto/ballista.proto

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import "datafusion_common.proto";
3333
message BallistaLogicalPlanNode {
3434
oneof LogicalPlanType {
3535
LogicalPlanCacheNode cache_node = 1;
36+
LogicalPlanExplainNode explain_node = 2;
3637
}
3738
}
3839

@@ -41,6 +42,16 @@ message LogicalPlanCacheNode {
4142
string session_id = 2;
4243
}
4344

45+
// Ballista wrapper around datafusion's Explain logical plan node.
46+
// Used to preserve `explain_format` across the client -> scheduler boundary,
47+
// because `datafusion-proto`'s `ExplainNode` does not encode that field.
48+
message LogicalPlanExplainNode {
49+
bool verbose = 1;
50+
// One of: "indent", "tree", "pgjson", "graphviz". See `BallistaExplainNode`
51+
// in `core/src/extension.rs` for the canonical mapping.
52+
string explain_format = 2;
53+
}
54+
4455
///////////////////////////////////////////////////////////////////////////////////////////////////
4556
// Ballista Physical Plan
4657
///////////////////////////////////////////////////////////////////////////////////////////////////
@@ -631,6 +642,31 @@ message ExecuteQueryFailureResult {
631642
}
632643
}
633644

645+
message GetJobMetricsParams {
646+
string job_id = 1;
647+
}
648+
649+
message JobStageMetrics {
650+
uint32 stage_id = 1;
651+
uint32 partitions = 2;
652+
repeated OperatorWithMetrics operators = 3;
653+
}
654+
655+
message OperatorWithMetrics {
656+
// Pre-order DFS depth in the stage's plan tree (root = 0).
657+
uint32 depth = 1;
658+
// ExecutionPlan::name(), e.g. "FilterExec".
659+
string operator_type = 2;
660+
// Single-line operator description, equivalent to
661+
// `DisplayableExecutionPlan::indent` for that node only.
662+
string operator_desc = 3;
663+
repeated OperatorMetric metrics = 4;
664+
}
665+
666+
message GetJobMetricsResult {
667+
repeated JobStageMetrics stages = 1;
668+
}
669+
634670
message GetJobStatusParams {
635671
string job_id = 1;
636672
}
@@ -834,6 +870,11 @@ service SchedulerGrpc {
834870

835871
rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {}
836872

873+
// Returns per-stage / per-operator metrics for a successfully-finished job.
874+
// Used by the client `DistributedExplainAnalyzeExec` to render an
875+
// `EXPLAIN ANALYZE` result without changing the wire format of normal jobs.
876+
rpc GetJobMetrics (GetJobMetricsParams) returns (GetJobMetricsResult) {}
877+
837878
// Used by Executor to tell Scheduler it is stopped.
838879
rpc ExecutorStopped (ExecutorStoppedParams) returns (ExecutorStoppedResult) {}
839880

0 commit comments

Comments
 (0)