Skip to content

Commit e6bcc66

Browse files
feat: distributed EXPLAIN, EXPLAIN FORMAT TREE, and EXPLAIN ANALYZE (rebased onto spiceai-52.5) (#34)
* feat: distributed EXPLAIN, EXPLAIN FORMAT TREE, and EXPLAIN ANALYZE Reimplements PR #31 on top of spiceai-52.5 (DataFusion 52). EXPLAIN Round-trip the ExplainFormat through the client -> scheduler boundary by wrapping the LogicalPlan::Explain in a BallistaExplainNode logical extension before serialization. The scheduler unwraps it back to a native LogicalPlan::Explain so its existing physical-planning intercept can substitute a distributed-aware ExplainExec replacement. EXPLAIN FORMAT TREE Honored end-to-end by threading the ExplainFormat through extract_logical_and_physical_plans and construct_distributed_explain_exec in scheduler/state/distributed_explain.rs (Tree format omits the logical_plan row to match DataFusion's native behavior). EXPLAIN ANALYZE - Client (BallistaQueryPlanner): strips the LogicalPlan::Analyze and runs the inner plan via DistributedQueryExec, wrapped in a new DistributedExplainAnalyzeExec. After the child stream drains, the wrapper publishes the job_id (added Arc<Mutex<Option<String>>> handle on DistributedQueryExec) and calls the scheduler's GetJobMetrics RPC. - Scheduler: new GetJobMetrics RPC walks the execution graph in the same pre-order DFS order as ballista_core::utils::collect_plan_metrics so per-operator metrics line up with the rendered plan text. Falls back from the active-job cache to the saved completed-job graph so the call still succeeds after succeed_job moves the graph out of active_job_cache. Includes ballista/client tests covering all three forms in both standalone and remote modes. * Support for Tree formatting + tests * Fortmatting * More formatting * Add insta snapshots for FORMAT TREE integration tests * Lint * Improve * Lint --------- Co-authored-by: Sergei Grebnov <sergei.grebnov@gmail.com>
1 parent 8bc4d75 commit e6bcc66

23 files changed

Lines changed: 1865 additions & 46 deletions

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ballista/client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ ballista-scheduler = { path = "../scheduler", version = "52.0.0" }
4646
ctor = { workspace = true }
4747
datafusion-proto = { workspace = true }
4848
env_logger = { workspace = true }
49+
insta = { workspace = true }
4950
rstest = { workspace = true }
5051
tempfile = { workspace = true }
5152
tonic = { workspace = true }

ballista/client/tests/context_checks.rs

Lines changed: 156 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,15 @@ mod supported {
2424
standalone_context_with_state,
2525
};
2626
use ballista_core::config::BallistaConfig;
27-
27+
use datafusion::arrow::array::Array;
28+
use datafusion::arrow::array::StringArray;
29+
use datafusion::arrow::record_batch::RecordBatch;
2830
use datafusion::physical_plan::collect;
2931
use datafusion::prelude::*;
3032
use datafusion::{assert_batches_eq, prelude::SessionContext};
3133
use rstest::*;
3234
use std::path::PathBuf;
35+
use std::sync::Arc;
3336

3437
#[rstest::fixture]
3538
fn test_data() -> String {
@@ -1105,4 +1108,156 @@ mod supported {
11051108

11061109
Ok(())
11071110
}
1111+
1112+
#[rstest]
1113+
#[case::standalone(standalone_context())]
1114+
#[case::remote(remote_context())]
1115+
#[tokio::test]
1116+
async fn should_execute_explain_format_tree_query_correctly(
1117+
#[future(awt)]
1118+
#[case]
1119+
ctx: SessionContext,
1120+
) {
1121+
let result = ctx
1122+
.sql("EXPLAIN FORMAT TREE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id")
1123+
.await
1124+
.unwrap()
1125+
.collect()
1126+
.await
1127+
.unwrap();
1128+
1129+
// With the Ballista logical extension codec, FORMAT TREE round-trips
1130+
// through the distributed scheduler. The result contains both the
1131+
// tree-rendered physical_plan and the Ballista distributed_plan.
1132+
assert_eq!(result.len(), 1);
1133+
let batch = &result[0];
1134+
assert_eq!(batch.num_columns(), 2);
1135+
// Tree format: physical_plan and distributed_plan (no logical_plan)
1136+
assert_eq!(batch.column(0).len(), 2);
1137+
1138+
let plan_type_arr = batch
1139+
.column(0)
1140+
.as_any()
1141+
.downcast_ref::<StringArray>()
1142+
.unwrap();
1143+
assert_eq!(plan_type_arr.value(0), "physical_plan");
1144+
assert_eq!(plan_type_arr.value(1), "distributed_plan");
1145+
1146+
let plan_arr = batch
1147+
.column(1)
1148+
.as_any()
1149+
.downcast_ref::<StringArray>()
1150+
.unwrap();
1151+
1152+
let physical_plan_txt = plan_arr.value(0);
1153+
let distributed_plan_txt = plan_arr.value(1);
1154+
1155+
// Snapshot the tree-rendered plans. Both standalone and remote cases
1156+
// should produce identical output, validating the codec round-trip.
1157+
insta::assert_snapshot!("explain_format_tree_physical_plan", physical_plan_txt);
1158+
insta::assert_snapshot!(
1159+
"explain_format_tree_distributed_plan",
1160+
distributed_plan_txt
1161+
);
1162+
}
1163+
1164+
#[rstest]
1165+
#[case::standalone(standalone_context())]
1166+
#[case::remote(remote_context())]
1167+
#[tokio::test]
1168+
async fn should_execute_explain_analyze_query(
1169+
#[future(awt)]
1170+
#[case]
1171+
ctx: SessionContext,
1172+
) -> datafusion::error::Result<()> {
1173+
let result = ctx
1174+
.sql(
1175+
"EXPLAIN ANALYZE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id",
1176+
)
1177+
.await?
1178+
.collect()
1179+
.await?;
1180+
1181+
// Replace metric values with "..." so the assertion isn't sensitive to
1182+
// varying timings/byte counts.
1183+
let sanitized_plan_text = result[0]
1184+
.column(1)
1185+
.as_any()
1186+
.downcast_ref::<StringArray>()
1187+
.unwrap()
1188+
.value(0)
1189+
.lines()
1190+
.map(|line| {
1191+
if let Some(index) = line.find("metrics=[") {
1192+
let prefix = &line[..index];
1193+
let metrics = &line[index + "metrics=[".len()..];
1194+
let sanitized_metrics = metrics.strip_suffix(']').map_or_else(
1195+
|| "...".to_string(),
1196+
|body| {
1197+
body.split(", ")
1198+
.map(|metric| {
1199+
metric.split_once('=').map_or_else(
1200+
|| "...".to_string(),
1201+
|(name, _)| format!("{name}=..."),
1202+
)
1203+
})
1204+
.collect::<Vec<_>>()
1205+
.join(", ")
1206+
},
1207+
);
1208+
format!("{prefix}metrics=[{sanitized_metrics}]")
1209+
} else {
1210+
line.to_string()
1211+
}
1212+
})
1213+
.collect::<Vec<_>>()
1214+
.join("\n");
1215+
1216+
let sanitized = RecordBatch::try_new(
1217+
result[0].schema(),
1218+
vec![
1219+
result[0].column(0).clone(),
1220+
Arc::new(StringArray::from(vec![sanitized_plan_text])),
1221+
],
1222+
)?;
1223+
1224+
// Loose assertions rather than a frozen golden snapshot, since the
1225+
// exact metric set per operator can change between DataFusion releases.
1226+
let plan_type_arr = sanitized
1227+
.column(0)
1228+
.as_any()
1229+
.downcast_ref::<StringArray>()
1230+
.unwrap();
1231+
assert_eq!(plan_type_arr.len(), 1);
1232+
assert_eq!(plan_type_arr.value(0), "Plan with Metrics");
1233+
1234+
let plan_arr = sanitized
1235+
.column(1)
1236+
.as_any()
1237+
.downcast_ref::<StringArray>()
1238+
.unwrap();
1239+
let plan_text = plan_arr.value(0);
1240+
assert!(
1241+
plan_text.contains("SuccessfulStage[stage_id=1"),
1242+
"expected stage 1 in plan, got:\n{plan_text}"
1243+
);
1244+
assert!(
1245+
plan_text.contains("SuccessfulStage[stage_id=2"),
1246+
"expected stage 2 in plan, got:\n{plan_text}"
1247+
);
1248+
assert!(
1249+
plan_text.contains("ShuffleWriterExec"),
1250+
"expected ShuffleWriterExec in plan, got:\n{plan_text}"
1251+
);
1252+
assert!(
1253+
plan_text.contains("AggregateExec"),
1254+
"expected AggregateExec in plan, got:\n{plan_text}"
1255+
);
1256+
assert!(
1257+
plan_text.contains("metrics=["),
1258+
"expected per-operator metrics in plan, got:\n{plan_text}"
1259+
);
1260+
1261+
Ok(())
1262+
}
11081263
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
---
2+
source: ballista/client/tests/context_checks.rs
3+
expression: distributed_plan_txt
4+
---
5+
=========ResolvedStage[stage_id=1.0, partitions=1]=========
6+
┌───────────────────────────┐
7+
ShuffleWriterExec
8+
--------------------
9+
partitioning: │
10+
Hash([id@0], 16) │
11+
└─────────────┬─────────────┘
12+
┌─────────────┴─────────────┐
13+
AggregateExec
14+
--------------------
15+
aggr: count(1) │
16+
group_by: id
17+
mode: Partial
18+
└─────────────┬─────────────┘
19+
┌─────────────┴─────────────┐
20+
ProjectionExec
21+
--------------------
22+
id: │
23+
__unnest_placeholder
24+
│ (make_array(Int64(1
25+
│ ),Int64(2),Int64(3),Int64
26+
│ (4),Int64(5)),depth=1) │
27+
└─────────────┬─────────────┘
28+
┌─────────────┴─────────────┐
29+
UnnestExec
30+
└─────────────┬─────────────┘
31+
┌─────────────┴─────────────┐
32+
ProjectionExec
33+
--------------------
34+
__unnest_placeholder
35+
│ (make_array(Int64(1
36+
│ ),Int64(2),Int64(3),Int64
37+
│ (4),Int64(5))): │
38+
│ [1, 2, 3, 4, 5] │
39+
└─────────────┬─────────────┘
40+
┌─────────────┴─────────────┐
41+
PlaceholderRowExec
42+
└───────────────────────────┘
43+
44+
=========UnResolvedStage[stage_id=2.0, children=1]=========
45+
Inputs{1: StageOutput { partition_locations: {}, complete: false }}
46+
┌───────────────────────────┐
47+
ShuffleWriterExec
48+
--------------------
49+
partitioning: None
50+
└─────────────┬─────────────┘
51+
┌─────────────┴─────────────┐
52+
ProjectionExec
53+
--------------------
54+
count(*): │
55+
count(Int64(1)) │
56+
│ │
57+
id: id
58+
└─────────────┬─────────────┘
59+
┌─────────────┴─────────────┐
60+
AggregateExec
61+
--------------------
62+
aggr: count(1) │
63+
group_by: id
64+
│ │
65+
mode: │
66+
FinalPartitioned
67+
└─────────────┬─────────────┘
68+
┌─────────────┴─────────────┐
69+
UnresolvedShuffleExec
70+
--------------------
71+
partitioning: │
72+
Hash([id@0], 16) │
73+
└───────────────────────────┘
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
---
2+
source: ballista/client/tests/context_checks.rs
3+
expression: physical_plan_txt
4+
---
5+
┌───────────────────────────┐
6+
ProjectionExec
7+
--------------------
8+
count(*): │
9+
count(Int64(1)) │
10+
│ │
11+
id: id
12+
└─────────────┬─────────────┘
13+
┌─────────────┴─────────────┐
14+
AggregateExec
15+
--------------------
16+
aggr: count(1) │
17+
group_by: id
18+
│ │
19+
mode: │
20+
FinalPartitioned
21+
└─────────────┬─────────────┘
22+
┌─────────────┴─────────────┐
23+
RepartitionExec
24+
--------------------
25+
partition_count(in->out): │
26+
1 -> 16
27+
│ │
28+
partitioning_scheme: │
29+
Hash([id@0], 16) │
30+
└─────────────┬─────────────┘
31+
┌─────────────┴─────────────┐
32+
AggregateExec
33+
--------------------
34+
aggr: count(1) │
35+
group_by: id
36+
mode: Partial
37+
└─────────────┬─────────────┘
38+
┌─────────────┴─────────────┐
39+
ProjectionExec
40+
--------------------
41+
id: │
42+
__unnest_placeholder
43+
│ (make_array(Int64(1
44+
│ ),Int64(2),Int64(3),Int64
45+
│ (4),Int64(5)),depth=1) │
46+
└─────────────┬─────────────┘
47+
┌─────────────┴─────────────┐
48+
UnnestExec
49+
└─────────────┬─────────────┘
50+
┌─────────────┴─────────────┐
51+
ProjectionExec
52+
--------------------
53+
__unnest_placeholder
54+
│ (make_array(Int64(1
55+
│ ),Int64(2),Int64(3),Int64
56+
│ (4),Int64(5))): │
57+
│ [1, 2, 3, 4, 5] │
58+
└─────────────┬─────────────┘
59+
┌─────────────┴─────────────┐
60+
PlaceholderRowExec
61+
└───────────────────────────┘

ballista/core/proto/ballista.proto

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import "datafusion_common.proto";
3333
message BallistaLogicalPlanNode {
3434
oneof LogicalPlanType {
3535
LogicalPlanCacheNode cache_node = 1;
36+
LogicalPlanExplainNode explain_node = 2;
3637
}
3738
}
3839

@@ -41,6 +42,16 @@ message LogicalPlanCacheNode {
4142
string session_id = 2;
4243
}
4344

45+
// Ballista wrapper around datafusion's Explain logical plan node.
46+
// Used to preserve `explain_format` across the client -> scheduler boundary,
47+
// because `datafusion-proto`'s `ExplainNode` does not encode that field.
48+
message LogicalPlanExplainNode {
49+
bool verbose = 1;
50+
// One of: "indent", "tree", "pgjson", "graphviz". See `BallistaExplainNode`
51+
// in `core/src/extension.rs` for the canonical mapping.
52+
string explain_format = 2;
53+
}
54+
4455
///////////////////////////////////////////////////////////////////////////////////////////////////
4556
// Ballista Physical Plan
4657
///////////////////////////////////////////////////////////////////////////////////////////////////
@@ -631,6 +642,31 @@ message ExecuteQueryFailureResult {
631642
}
632643
}
633644

645+
message GetJobMetricsParams {
646+
string job_id = 1;
647+
}
648+
649+
message JobStageMetrics {
650+
uint32 stage_id = 1;
651+
uint32 partitions = 2;
652+
repeated OperatorWithMetrics operators = 3;
653+
}
654+
655+
message OperatorWithMetrics {
656+
// Pre-order DFS depth in the stage's plan tree (root = 0).
657+
uint32 depth = 1;
658+
// ExecutionPlan::name(), e.g. "FilterExec".
659+
string operator_type = 2;
660+
// Single-line operator description, equivalent to
661+
// `DisplayableExecutionPlan::indent` for that node only.
662+
string operator_desc = 3;
663+
repeated OperatorMetric metrics = 4;
664+
}
665+
666+
message GetJobMetricsResult {
667+
repeated JobStageMetrics stages = 1;
668+
}
669+
634670
message GetJobStatusParams {
635671
string job_id = 1;
636672
}
@@ -834,6 +870,11 @@ service SchedulerGrpc {
834870

835871
rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {}
836872

873+
// Returns per-stage / per-operator metrics for a successfully-finished job.
874+
// Used by the client `DistributedExplainAnalyzeExec` to render an
875+
// `EXPLAIN ANALYZE` result without changing the wire format of normal jobs.
876+
rpc GetJobMetrics (GetJobMetricsParams) returns (GetJobMetricsResult) {}
877+
837878
// Used by Executor to tell Scheduler it is stopped.
838879
rpc ExecutorStopped (ExecutorStoppedParams) returns (ExecutorStoppedResult) {}
839880

0 commit comments

Comments
 (0)