Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 158 additions & 1 deletion ballista/client/tests/context_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ mod supported {
standalone_context_with_state,
};
use ballista_core::config::BallistaConfig;

use datafusion::arrow::array::Array;
use datafusion::arrow::array::StringArray;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::collect;
use datafusion::prelude::*;
use datafusion::{assert_batches_eq, prelude::SessionContext};
use rstest::*;
use std::path::PathBuf;
use std::sync::Arc;

#[rstest::fixture]
fn test_data() -> String {
Expand Down Expand Up @@ -1105,4 +1108,158 @@ mod supported {

Ok(())
}

#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
#[tokio::test]
async fn should_execute_explain_format_tree_query_correctly(
#[future(awt)]
#[case]
ctx: SessionContext,
) {
let result = ctx
.sql("EXPLAIN FORMAT TREE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id")
.await
.unwrap()
.collect()
.await
.unwrap();

// With the Ballista logical extension codec, FORMAT TREE round-trips
// through the distributed scheduler. The result contains both the
// tree-rendered physical_plan and the Ballista distributed_plan.
assert_eq!(result.len(), 1);
let batch = &result[0];
assert_eq!(batch.num_columns(), 2);
// Tree format: physical_plan and distributed_plan (no logical_plan)
assert_eq!(batch.column(0).len(), 2);

let plan_type_arr = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(plan_type_arr.value(0), "physical_plan");
assert_eq!(plan_type_arr.value(1), "distributed_plan");

let plan_arr = batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();

let physical_plan_txt = plan_arr.value(0);
// Tree format uses box drawing characters like ┌, ─, ┐, │, └, ┘.
assert!(
physical_plan_txt.contains('┌') || physical_plan_txt.contains('│'),
"Expected tree format with box characters in physical_plan, got: {physical_plan_txt}"
);

let distributed_plan_txt = plan_arr.value(1);
assert!(
!distributed_plan_txt.is_empty(),
"Expected non-empty distributed_plan"
);
}

#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
#[tokio::test]
async fn should_execute_explain_analyze_query(
#[future(awt)]
#[case]
ctx: SessionContext,
) -> datafusion::error::Result<()> {
let result = ctx
.sql(
"EXPLAIN ANALYZE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id",
)
.await?
.collect()
.await?;

// Replace metric values with "..." so the assertion isn't sensitive to
// varying timings/byte counts.
let sanitized_plan_text = result[0]
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0)
.lines()
.map(|line| {
if let Some(index) = line.find("metrics=[") {
let prefix = &line[..index];
let metrics = &line[index + "metrics=[".len()..];
let sanitized_metrics = metrics.strip_suffix(']').map_or_else(
|| "...".to_string(),
|body| {
body.split(", ")
.map(|metric| {
metric.split_once('=').map_or_else(
|| "...".to_string(),
|(name, _)| format!("{name}=..."),
)
})
.collect::<Vec<_>>()
.join(", ")
},
);
format!("{prefix}metrics=[{sanitized_metrics}]")
} else {
line.to_string()
}
})
.collect::<Vec<_>>()
.join("\n");

let sanitized = RecordBatch::try_new(
result[0].schema(),
vec![
result[0].column(0).clone(),
Arc::new(StringArray::from(vec![sanitized_plan_text])),
],
)?;

// Loose assertions rather than a frozen golden snapshot, since the
// exact metric set per operator can change between DataFusion releases.
let plan_type_arr = sanitized
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(plan_type_arr.len(), 1);
assert_eq!(plan_type_arr.value(0), "Plan with Metrics");

let plan_arr = sanitized
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let plan_text = plan_arr.value(0);
assert!(
plan_text.contains("SuccessfulStage[stage_id=1"),
"expected stage 1 in plan, got:\n{plan_text}"
);
assert!(
plan_text.contains("SuccessfulStage[stage_id=2"),
"expected stage 2 in plan, got:\n{plan_text}"
);
assert!(
plan_text.contains("ShuffleWriterExec"),
"expected ShuffleWriterExec in plan, got:\n{plan_text}"
);
assert!(
plan_text.contains("AggregateExec"),
"expected AggregateExec in plan, got:\n{plan_text}"
);
assert!(
plan_text.contains("metrics=["),
"expected per-operator metrics in plan, got:\n{plan_text}"
);

Ok(())
}
}
41 changes: 41 additions & 0 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import "datafusion_common.proto";
message BallistaLogicalPlanNode {
oneof LogicalPlanType {
LogicalPlanCacheNode cache_node = 1;
LogicalPlanExplainNode explain_node = 2;
}
}

Expand All @@ -41,6 +42,16 @@ message LogicalPlanCacheNode {
string session_id = 2;
}

// Ballista wrapper around datafusion's Explain logical plan node.
// Used to preserve `explain_format` across the client -> scheduler boundary,
// because `datafusion-proto`'s `ExplainNode` does not encode that field.
message LogicalPlanExplainNode {
bool verbose = 1;
// One of: "indent", "tree", "pgjson", "graphviz". See `BallistaExplainNode`
// in `core/src/extension.rs` for the canonical mapping.
string explain_format = 2;
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// Ballista Physical Plan
///////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -631,6 +642,31 @@ message ExecuteQueryFailureResult {
}
}

message GetJobMetricsParams {
string job_id = 1;
}

message JobStageMetrics {
uint32 stage_id = 1;
uint32 partitions = 2;
repeated OperatorWithMetrics operators = 3;
}

message OperatorWithMetrics {
// Pre-order DFS depth in the stage's plan tree (root = 0).
uint32 depth = 1;
// ExecutionPlan::name(), e.g. "FilterExec".
string operator_type = 2;
// Single-line operator description, equivalent to
// `DisplayableExecutionPlan::indent` for that node only.
string operator_desc = 3;
repeated OperatorMetric metrics = 4;
}

message GetJobMetricsResult {
repeated JobStageMetrics stages = 1;
}

message GetJobStatusParams {
string job_id = 1;
}
Expand Down Expand Up @@ -834,6 +870,11 @@ service SchedulerGrpc {

rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {}

// Returns per-stage / per-operator metrics for a successfully-finished job.
// Used by the client `DistributedExplainAnalyzeExec` to render an
// `EXPLAIN ANALYZE` result without changing the wire format of normal jobs.
rpc GetJobMetrics (GetJobMetricsParams) returns (GetJobMetricsResult) {}

// Used by Executor to tell Scheduler it is stopped.
rpc ExecutorStopped (ExecutorStoppedParams) returns (ExecutorStoppedResult) {}

Expand Down
Loading
Loading