Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions ballista/client/tests/context_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ mod supported {
standalone_context_with_state,
};
use ballista_core::config::BallistaConfig;
use datafusion::arrow::array::StringArray;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::collect;
use datafusion::prelude::*;
use datafusion::{assert_batches_eq, prelude::SessionContext};
use rstest::*;
use std::sync::Arc;

#[rstest::fixture]
fn test_data() -> String {
Expand Down Expand Up @@ -1077,4 +1080,149 @@ mod supported {
];
assert_batches_eq!(expected, &result);
}

#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
#[tokio::test]
async fn should_execute_explain_format_tree_query_correctly(
#[future(awt)]
#[case]
ctx: SessionContext,
) {
let result = ctx
.sql("EXPLAIN FORMAT TREE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id")
.await
.unwrap()
.collect()
.await
.unwrap();

// With the Ballista logical extension codec, FORMAT TREE round-trips
// through the distributed scheduler. The result contains both the
// tree-rendered physical_plan and the Ballista distributed_plan.
assert_eq!(result.len(), 1);
let batch = &result[0];

// Verify we have 2 columns: plan_type and plan
assert_eq!(batch.num_columns(), 2);
// Tree format: physical_plan and distributed_plan (no logical_plan)
assert_eq!(batch.column(0).len(), 2);

// Verify the plan_type column contains the expected values
let plan_type_col = batch.column(0);
let plan_type_arr = plan_type_col
.as_any()
.downcast_ref::<datafusion::arrow::array::StringArray>()
.unwrap();

assert_eq!(plan_type_arr.value(0), "physical_plan");
assert_eq!(plan_type_arr.value(1), "distributed_plan");

// Verify physical_plan is in tree format (contains box characters)
let plan_col = batch.column(1);
let plan_arr = plan_col
.as_any()
.downcast_ref::<datafusion::arrow::array::StringArray>()
.unwrap();

let physical_plan_txt = plan_arr.value(0);
// Tree format uses box drawing characters like ┌, ─, ┐, │, └, ┘, ┬, ┴, etc.
assert!(
physical_plan_txt.contains('┌') || physical_plan_txt.contains('│'),
"Expected tree format with box characters in physical_plan, got: {}",
physical_plan_txt
);

// Verify distributed_plan is present and non-empty
let distributed_plan_txt = plan_arr.value(1);
assert!(
!distributed_plan_txt.is_empty(),
"Expected non-empty distributed_plan"
);
}

#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
#[tokio::test]
async fn should_execute_explain_analyze_query(
#[future(awt)]
#[case]
ctx: SessionContext,
) -> datafusion::error::Result<()> {
let result = ctx
.sql(
"EXPLAIN ANALYZE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id",
)
.await?
.collect()
.await?;

// Replace the metric values with "..." to keep the test stable.
let sanitized_plan_text = result[0]
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0)
.lines()
.map(|line| {
if let Some(index) = line.find("metrics=[") {
let prefix = &line[..index];
let metrics = &line[index + "metrics=[".len()..];
let sanitized_metrics = metrics.strip_suffix(']').map_or_else(
|| "...".to_string(),
|body| {
body.split(", ")
.map(|metric| {
metric.split_once('=').map_or_else(
|| "...".to_string(),
|(name, _)| format!("{name}=..."),
)
})
.collect::<Vec<_>>()
.join(", ")
},
);
format!("{prefix}metrics=[{sanitized_metrics}]")
} else {
line.to_string()
}
})
.collect::<Vec<_>>()
.join("\n");

let sanitized = RecordBatch::try_new(
result[0].schema(),
vec![
result[0].column(0).clone(),
Arc::new(StringArray::from(vec![sanitized_plan_text])),
],
)?;

let expected = [
"+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
"| Plan with Metrics | =========SuccessfulStage[stage_id=1, partitions=1]========= |",
"| | ShuffleWriterExec: partitioning: Hash([id@0], 16), metrics=[output_rows=..., input_rows=..., repart_time=..., write_time=...] |",
"| | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=..., elapsed_compute=..., output_bytes=..., spill_count=..., spilled_bytes=..., spilled_rows=..., skipped_aggregation_rows=..., peak_mem_used=..., aggregate_arguments_time=..., aggregation_time=..., emitting_time=..., time_calculating_group_ids=..., reduction_factor=...] |",
"| | ProjectionExec: expr=[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)@0 as id], metrics=[output_rows=..., elapsed_compute=..., output_bytes=...] |",
"| | UnnestExec, metrics=[output_rows=..., elapsed_compute=..., output_bytes=..., input_batches=..., input_rows=..., output_batches=...] |",
"| | ProjectionExec: expr=[[1, 2, 3, 4, 5] as __unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))], metrics=[output_rows=..., elapsed_compute=..., output_bytes=...] |",
"| | PlaceholderRowExec, metrics=[...] |",
"| | |",
"| | =========SuccessfulStage[stage_id=2, partitions=16]========= |",
"| | ShuffleWriterExec: partitioning: None, metrics=[output_rows=..., input_rows=..., repart_time=..., write_time=...] |",
"| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), id@0 as id], metrics=[output_rows=..., elapsed_compute=..., output_bytes=...] |",
"| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=..., elapsed_compute=..., output_bytes=..., spill_count=..., spilled_bytes=..., spilled_rows=..., peak_mem_used=..., aggregate_arguments_time=..., aggregation_time=..., emitting_time=..., time_calculating_group_ids=...] |",
"| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=..., elapsed_compute=..., output_bytes=...] |",
"| | ShuffleReaderExec: partitioning: Hash([id@0], 16), metrics=[...] |",
"+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
];
assert_batches_eq!(expected, &[sanitized]);

Ok(())
}
}
45 changes: 45 additions & 0 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ message BallistaPhysicalPlanNode {
}
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// Ballista Logical Plan extensions
///////////////////////////////////////////////////////////////////////////////////////////////////

// Ballista wrapper around datafusion Explain logical plan node.
// Used to preserve `explain_format` across the client -> scheduler boundary
// because the datafusion-proto `ExplainNode` does not encode that field.
message BallistaExplainNode {
bool verbose = 1;
// One of: "indent", "tree", "pgjson", "graphviz"
string explain_format = 2;
}

// Discriminating wrapper encoded as the payload of a Ballista logical
// extension node. Allows a single codec entry point to route to the
// appropriate Ballista logical extension type.
message BallistaLogicalExtensionNode {
oneof node {
BallistaExplainNode explain = 1;
}
Comment thread
lukekim marked this conversation as resolved.
}

message ShuffleWriterExecNode {
//TODO it seems redundant to provide job and stage id here since we also have them
// in the TaskDefinition that wraps this plan
Expand Down Expand Up @@ -606,6 +628,27 @@ message GetJobStatusParams {
string job_id = 1;
}

message GetJobMetricsParams {
string job_id = 1;
}

message JobStageMetrics {
uint32 stage_id = 1;
uint32 partitions = 2;
repeated OperatorWithMetrics operators = 3;
}

message OperatorWithMetrics {
uint32 depth = 1;
string operator_type = 2;
string operator_desc = 3;
repeated OperatorMetric metrics = 4;
}

message GetJobMetricsResult {
repeated JobStageMetrics stages = 1;
}

message SuccessfulJob {
repeated PartitionLocation partition_location = 1;
uint64 queued_at = 2;
Expand Down Expand Up @@ -799,6 +842,8 @@ service SchedulerGrpc {

rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {}

rpc GetJobMetrics (GetJobMetricsParams) returns (GetJobMetricsResult) {}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unrelated to the explain changes?


// Used by Executor to tell Scheduler it is stopped.
rpc ExecutorStopped (ExecutorStoppedParams) returns (ExecutorStoppedResult) {}

Expand Down
Loading
Loading