diff --git a/ballista/client/tests/context_checks.rs b/ballista/client/tests/context_checks.rs index 7bc1d00bd2..e461708643 100644 --- a/ballista/client/tests/context_checks.rs +++ b/ballista/client/tests/context_checks.rs @@ -24,10 +24,13 @@ mod supported { standalone_context_with_state, }; use ballista_core::config::BallistaConfig; + use datafusion::arrow::array::StringArray; + use datafusion::arrow::record_batch::RecordBatch; use datafusion::physical_plan::collect; use datafusion::prelude::*; use datafusion::{assert_batches_eq, prelude::SessionContext}; use rstest::*; + use std::sync::Arc; #[rstest::fixture] fn test_data() -> String { @@ -1077,4 +1080,149 @@ mod supported { ]; assert_batches_eq!(expected, &result); } + + #[rstest] + #[case::standalone(standalone_context())] + #[case::remote(remote_context())] + #[tokio::test] + async fn should_execute_explain_format_tree_query_correctly( + #[future(awt)] + #[case] + ctx: SessionContext, + ) { + let result = ctx + .sql("EXPLAIN FORMAT TREE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // With the Ballista logical extension codec, FORMAT TREE round-trips + // through the distributed scheduler. The result contains both the + // tree-rendered physical_plan and the Ballista distributed_plan. + assert_eq!(result.len(), 1); + let batch = &result[0]; + + // Verify we have 2 columns: plan_type and plan + assert_eq!(batch.num_columns(), 2); + // Tree format: physical_plan and distributed_plan (no logical_plan) + assert_eq!(batch.column(0).len(), 2); + + // Verify the plan_type column contains the expected values + let plan_type_col = batch.column(0); + let plan_type_arr = plan_type_col + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(plan_type_arr.value(0), "physical_plan"); + assert_eq!(plan_type_arr.value(1), "distributed_plan"); + + // Verify physical_plan is in tree format (contains box characters) + let plan_col = batch.column(1); + let plan_arr = plan_col + .as_any() + .downcast_ref::() + .unwrap(); + + let physical_plan_txt = plan_arr.value(0); + // Tree format uses box drawing characters like ┌, ─, ┐, │, └, ┘, ┬, ┴, etc. + assert!( + physical_plan_txt.contains('┌') || physical_plan_txt.contains('│'), + "Expected tree format with box characters in physical_plan, got: {}", + physical_plan_txt + ); + + // Verify distributed_plan is present and non-empty + let distributed_plan_txt = plan_arr.value(1); + assert!( + !distributed_plan_txt.is_empty(), + "Expected non-empty distributed_plan" + ); + } + + #[rstest] + #[case::standalone(standalone_context())] + #[case::remote(remote_context())] + #[tokio::test] + async fn should_execute_explain_analyze_query( + #[future(awt)] + #[case] + ctx: SessionContext, + ) -> datafusion::error::Result<()> { + let result = ctx + .sql( + "EXPLAIN ANALYZE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id", + ) + .await? + .collect() + .await?; + + // Replace the metric values with "..." to keep the test stable. + let sanitized_plan_text = result[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .value(0) + .lines() + .map(|line| { + if let Some(index) = line.find("metrics=[") { + let prefix = &line[..index]; + let metrics = &line[index + "metrics=[".len()..]; + let sanitized_metrics = metrics.strip_suffix(']').map_or_else( + || "...".to_string(), + |body| { + body.split(", ") + .map(|metric| { + metric.split_once('=').map_or_else( + || "...".to_string(), + |(name, _)| format!("{name}=..."), + ) + }) + .collect::>() + .join(", ") + }, + ); + format!("{prefix}metrics=[{sanitized_metrics}]") + } else { + line.to_string() + } + }) + .collect::>() + .join("\n"); + + let sanitized = RecordBatch::try_new( + result[0].schema(), + vec![ + result[0].column(0).clone(), + Arc::new(StringArray::from(vec![sanitized_plan_text])), + ], + )?; + + let expected = [ + "+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + "| Plan with Metrics | =========SuccessfulStage[stage_id=1, partitions=1]========= |", + "| | ShuffleWriterExec: partitioning: Hash([id@0], 16), metrics=[output_rows=..., input_rows=..., repart_time=..., write_time=...] |", + "| | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=..., elapsed_compute=..., output_bytes=..., spill_count=..., spilled_bytes=..., spilled_rows=..., skipped_aggregation_rows=..., peak_mem_used=..., aggregate_arguments_time=..., aggregation_time=..., emitting_time=..., time_calculating_group_ids=..., reduction_factor=...] |", + "| | ProjectionExec: expr=[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)@0 as id], metrics=[output_rows=..., elapsed_compute=..., output_bytes=...] |", + "| | UnnestExec, metrics=[output_rows=..., elapsed_compute=..., output_bytes=..., input_batches=..., input_rows=..., output_batches=...] |", + "| | ProjectionExec: expr=[[1, 2, 3, 4, 5] as __unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))], metrics=[output_rows=..., elapsed_compute=..., output_bytes=...] |", + "| | PlaceholderRowExec, metrics=[...] |", + "| | |", + "| | =========SuccessfulStage[stage_id=2, partitions=16]========= |", + "| | ShuffleWriterExec: partitioning: None, metrics=[output_rows=..., input_rows=..., repart_time=..., write_time=...] |", + "| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), id@0 as id], metrics=[output_rows=..., elapsed_compute=..., output_bytes=...] |", + "| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=..., elapsed_compute=..., output_bytes=..., spill_count=..., spilled_bytes=..., spilled_rows=..., peak_mem_used=..., aggregate_arguments_time=..., aggregation_time=..., emitting_time=..., time_calculating_group_ids=...] |", + "| | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=..., elapsed_compute=..., output_bytes=...] |", + "| | ShuffleReaderExec: partitioning: Hash([id@0], 16), metrics=[...] |", + "+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+", + ]; + assert_batches_eq!(expected, &[sanitized]); + + Ok(()) + } } diff --git a/ballista/core/proto/ballista.proto b/ballista/core/proto/ballista.proto index d5ec1b5694..f778d77a23 100644 --- a/ballista/core/proto/ballista.proto +++ b/ballista/core/proto/ballista.proto @@ -38,6 +38,28 @@ message BallistaPhysicalPlanNode { } } +/////////////////////////////////////////////////////////////////////////////////////////////////// +// Ballista Logical Plan extensions +/////////////////////////////////////////////////////////////////////////////////////////////////// + +// Ballista wrapper around datafusion Explain logical plan node. +// Used to preserve `explain_format` across the client -> scheduler boundary +// because the datafusion-proto `ExplainNode` does not encode that field. +message BallistaExplainNode { + bool verbose = 1; + // One of: "indent", "tree", "pgjson", "graphviz" + string explain_format = 2; +} + +// Discriminating wrapper encoded as the payload of a Ballista logical +// extension node. Allows a single codec entry point to route to the +// appropriate Ballista logical extension type. +message BallistaLogicalExtensionNode { + oneof node { + BallistaExplainNode explain = 1; + } +} + message ShuffleWriterExecNode { //TODO it seems redundant to provide job and stage id here since we also have them // in the TaskDefinition that wraps this plan @@ -606,6 +628,27 @@ message GetJobStatusParams { string job_id = 1; } +message GetJobMetricsParams { + string job_id = 1; +} + +message JobStageMetrics { + uint32 stage_id = 1; + uint32 partitions = 2; + repeated OperatorWithMetrics operators = 3; +} + +message OperatorWithMetrics { + uint32 depth = 1; + string operator_type = 2; + string operator_desc = 3; + repeated OperatorMetric metrics = 4; +} + +message GetJobMetricsResult { + repeated JobStageMetrics stages = 1; +} + message SuccessfulJob { repeated PartitionLocation partition_location = 1; uint64 queued_at = 2; @@ -799,6 +842,8 @@ service SchedulerGrpc { rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {} + rpc GetJobMetrics (GetJobMetricsParams) returns (GetJobMetricsResult) {} + // Used by Executor to tell Scheduler it is stopped. rpc ExecutorStopped (ExecutorStoppedParams) returns (ExecutorStoppedResult) {} diff --git a/ballista/core/src/execution_plans/distributed_explain_analyze.rs b/ballista/core/src/execution_plans/distributed_explain_analyze.rs new file mode 100644 index 0000000000..4638c948b3 --- /dev/null +++ b/ballista/core/src/execution_plans/distributed_explain_analyze.rs @@ -0,0 +1,375 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::execution_plans::DistributedQueryExec; +use crate::extension::SessionConfigExt; +use crate::serde::protobuf::{ + self, GetJobMetricsParams, GetJobMetricsResult, + scheduler_grpc_client::SchedulerGrpcClient, +}; +use crate::utils::create_grpc_client_endpoint; +use datafusion::arrow::array::StringBuilder; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::{DataFusionError, Result, internal_err}; +use datafusion::execution::context::TaskContext; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, +}; +use datafusion_proto::logical_plan::AsLogicalPlan; +use futures::StreamExt; +use std::any::Any; +use std::convert::TryInto; +use std::marker::PhantomData; +use std::sync::Arc; + +/// This operator fetches stage metrics from the Ballista scheduler after the query +/// has been executed in children (DistributedQueryExec). +#[derive(Debug, Clone)] +pub struct DistributedExplainAnalyzeExec { + child: Arc, + scheduler_url: String, + schema: SchemaRef, + properties: PlanProperties, + phantom: PhantomData, +} + +impl DistributedExplainAnalyzeExec { + /// Creates a new explain-analyze wrapper around a distributed query exec. + pub fn new( + child: Arc>, + scheduler_url: String, + schema: SchemaRef, + ) -> Self { + let properties = Self::compute_properties(Arc::clone(&schema)); + Self { + child, + scheduler_url, + schema, + properties, + phantom: PhantomData, + } + } + + fn compute_properties(schema: SchemaRef) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + datafusion::physical_plan::execution_plan::EmissionType::Final, + datafusion::physical_plan::execution_plan::Boundedness::Bounded, + ) + } +} + +impl DisplayAs for DistributedExplainAnalyzeExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "DistributedExplainAnalyzeExec: scheduler_url={}", + self.scheduler_url + ) + } + DisplayFormatType::TreeRender => { + writeln!(f, "scheduler_url={}", self.scheduler_url) + } + } + } +} + +impl ExecutionPlan for DistributedExplainAnalyzeExec { + fn name(&self) -> &str { + "DistributedExplainAnalyzeExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + if children.len() != 1 { + return internal_err!( + "DistributedExplainAnalyzeExec expected one child, got {}", + children.len() + ); + } + + let child = children.pop().unwrap(); + if child + .as_any() + .downcast_ref::>() + .is_some() + { + return Ok(Arc::new(Self { + child, + scheduler_url: self.scheduler_url.clone(), + schema: Arc::clone(&self.schema), + properties: Self::compute_properties(Arc::clone(&self.schema)), + phantom: PhantomData, + })); + } + + internal_err!( + "DistributedExplainAnalyzeExec requires a DistributedQueryExec child" + ) + } + + fn execute( + &self, + partition: usize, + ctx: Arc, + ) -> Result { + assert_eq!(0, partition); + + let child = Arc::clone(&self.child); + let scheduler_url = self.scheduler_url.clone(); + let schema = Arc::clone(&self.schema); + let stream_schema = Arc::clone(&self.schema); + let session_config = ctx.session_config().clone(); + + let stream = futures::stream::once(async move { + let mut child_stream = child.execute(partition, ctx)?; + while let Some(batch) = child_stream.next().await { + batch?; + } + + let job_id = child + .as_any() + .downcast_ref::>() + .ok_or_else(|| { + DataFusionError::Internal( + "DistributedExplainAnalyzeExec requires a DistributedQueryExec child" + .into(), + ) + })? + .job_id() + .ok_or_else(|| { + DataFusionError::Internal( + "Distributed query completed without exposing a job_id".into(), + ) + })?; + let job_metrics = + fetch_job_metrics(&scheduler_url, &job_id, session_config).await?; + format_metrics_as_record_batch(&job_metrics, schema) + }); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + stream_schema, + stream, + ))) + } + + fn metrics(&self) -> Option { + None + } +} + +async fn fetch_job_metrics( + scheduler_url: &str, + job_id: &str, + session_config: datafusion::prelude::SessionConfig, +) -> Result { + let grpc_interceptor = session_config.ballista_grpc_interceptor(); + let customize_endpoint = + session_config.ballista_override_create_grpc_client_endpoint(); + let config = session_config.ballista_config(); + + let mut endpoint = create_grpc_client_endpoint(scheduler_url.to_string()) + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + + if let Some(ref customize) = customize_endpoint { + endpoint = customize + .configure_endpoint(endpoint) + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + } + + let connection = endpoint + .connect() + .await + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + + let max_message_size = config.default_grpc_client_max_message_size(); + let mut scheduler = SchedulerGrpcClient::with_interceptor( + connection, + grpc_interceptor.as_ref().clone(), + ) + .max_encoding_message_size(max_message_size) + .max_decoding_message_size(max_message_size); + + scheduler + .get_job_metrics(GetJobMetricsParams { + job_id: job_id.to_string(), + }) + .await + .map(|response| response.into_inner()) + .map_err(|e| DataFusionError::Execution(format!("{e:?}"))) +} + +fn format_metrics_as_record_batch( + job_metrics: &GetJobMetricsResult, + schema: SchemaRef, +) -> Result { + let plan = job_metrics + .stages + .iter() + .map(|stage| { + let mut stage_plan = Vec::with_capacity(stage.operators.len() + 1); + stage_plan.push(format!( + "=========SuccessfulStage[stage_id={}, partitions={}]=========", + stage.stage_id, stage.partitions + )); + + for operator in &stage.operators { + let metrics_set: datafusion::physical_plan::metrics::MetricsSet = + protobuf::OperatorMetricsSet { + metrics: operator.metrics.clone(), + } + .try_into() + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + let metrics = metrics_set + .aggregate_by_name() + .sorted_for_display() + .timestamps_removed() + .to_string(); + let indent = " ".repeat(operator.depth as usize); + stage_plan.push(format!( + "{indent}{}, metrics=[{metrics}]", + operator.operator_desc + )); + } + + Ok(stage_plan.join("\n")) + }) + .collect::>>()? + .join("\n\n"); + + let mut type_builder = StringBuilder::with_capacity(1, "Plan with Metrics".len()); + let mut plan_builder = StringBuilder::with_capacity(1, plan.len()); + + type_builder.append_value("Plan with Metrics"); + plan_builder.append_value(plan); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(type_builder.finish()), + Arc::new(plan_builder.finish()), + ], + ) + .map_err(DataFusionError::from) +} + +#[cfg(test)] +mod tests { + use super::format_metrics_as_record_batch; + use datafusion::arrow::array::StringArray; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use std::sync::Arc; + + use crate::serde::protobuf::{ + GetJobMetricsResult, JobStageMetrics, OperatorMetric, OperatorWithMetrics, + operator_metric, + }; + + #[test] + fn test_format_metrics_as_record_batch() { + let schema = Arc::new(Schema::new(vec![ + Field::new("plan_type", DataType::Utf8, false), + Field::new("plan", DataType::Utf8, false), + ])); + let response = GetJobMetricsResult { + stages: vec![ + JobStageMetrics { + stage_id: 0, + partitions: 1, + operators: vec![OperatorWithMetrics { + depth: 0, + operator_type: "ProjectionExec".to_string(), + operator_desc: "ProjectionExec: expr=[a]".to_string(), + metrics: vec![ + OperatorMetric { + metric: Some(operator_metric::Metric::OutputRows(4)), + }, + OperatorMetric { + metric: Some(operator_metric::Metric::ElapseTime( + 15_000_000, + )), + }, + ], + }], + }, + JobStageMetrics { + stage_id: 1, + partitions: 2, + operators: vec![OperatorWithMetrics { + depth: 0, + operator_type: "FilterExec".to_string(), + operator_desc: "FilterExec: a@0 > 1".to_string(), + metrics: vec![OperatorMetric { + metric: Some(operator_metric::Metric::OutputRows(2)), + }], + }], + }, + ], + }; + + let batch = format_metrics_as_record_batch(&response, schema).unwrap(); + + let plan_type = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let plan = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(plan_type.value(0), "Plan with Metrics"); + let expected = [ + "=========SuccessfulStage[stage_id=0, partitions=1]=========", + "ProjectionExec: expr=[a], metrics=[output_rows=4, elapsed_compute=15ms]", + "", + "=========SuccessfulStage[stage_id=1, partitions=2]=========", + "FilterExec: a@0 > 1, metrics=[output_rows=2]", + ] + .join("\n"); + assert_eq!(plan.value(0), expected); + } +} diff --git a/ballista/core/src/execution_plans/distributed_query.rs b/ballista/core/src/execution_plans/distributed_query.rs index d632e1063a..afed7390f1 100644 --- a/ballista/core/src/execution_plans/distributed_query.rs +++ b/ballista/core/src/execution_plans/distributed_query.rs @@ -48,8 +48,9 @@ use datafusion::physical_plan::{ use datafusion_proto::logical_plan::{ AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec, }; -use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{StreamExt, TryFutureExt, TryStreamExt}; use log::{debug, error, info}; +use parking_lot::Mutex; use std::any::Any; use std::fmt::Debug; use std::marker::PhantomData; @@ -84,6 +85,10 @@ pub struct DistributedQueryExec { /// - job_execution_time_ms: Time spent executing on the cluster (ended_at - started_at) /// - job_scheduling_in_ms: Time job waited in scheduler queue (started_at - queued_at) metrics: ExecutionPlanMetricsSet, + /// The scheduler job id after the query has been accepted. Populated + /// once by the execute path and read by the parent + /// `DistributedExplainAnalyzeExec` to fetch stage metrics. + job_id: Arc>>, } impl DistributedQueryExec { @@ -105,6 +110,7 @@ impl DistributedQueryExec { session_id, properties, metrics: ExecutionPlanMetricsSet::new(), + job_id: Arc::new(Mutex::new(None)), } } @@ -127,9 +133,15 @@ impl DistributedQueryExec { session_id, properties, metrics: ExecutionPlanMetricsSet::new(), + job_id: Arc::new(Mutex::new(None)), } } + /// Returns the scheduler job id after the query has been accepted. + pub fn job_id(&self) -> Option { + self.job_id.lock().clone() + } + fn compute_properties(schema: SchemaRef) -> PlanProperties { PlanProperties::new( EquivalenceProperties::new(schema), @@ -197,6 +209,7 @@ impl ExecutionPlan for DistributedQueryExec { self.plan.schema().as_arrow().clone().into(), ), metrics: ExecutionPlanMetricsSet::new(), + job_id: Arc::clone(&self.job_id), })) } @@ -265,6 +278,7 @@ impl ExecutionPlan for DistributedQueryExec { self.session_id.clone(), query, Arc::new(self.metrics.clone()), + Arc::clone(&self.job_id), partition, self.config.clone(), interceptor, @@ -308,13 +322,14 @@ async fn execute_query( session_id: String, query: ExecuteQueryParams, metrics: Arc, + job_id_handle: Arc>>, partition: usize, config: BallistaConfig, grpc_interceptor: Arc, customize_endpoint: Option>, use_tls: bool, result_fetch_callback: Option>, -) -> Result> + Send> { +) -> Result> + Send> { // Capture query submission time for total_query_time_ms let query_start_time = std::time::Instant::now(); @@ -364,6 +379,7 @@ async fn execute_query( ); let job_id = query_result.job_id; + *job_id_handle.lock() = Some(job_id.clone()); let mut prev_status: Option = None; loop { @@ -409,7 +425,6 @@ async fn execute_query( started_at, ended_at, partition_location, - .. })) => { // Calculate job execution time (server-side execution) let job_execution_ms = ended_at.saturating_sub(started_at); diff --git a/ballista/core/src/execution_plans/mod.rs b/ballista/core/src/execution_plans/mod.rs index ad33d65743..661ebe9d70 100644 --- a/ballista/core/src/execution_plans/mod.rs +++ b/ballista/core/src/execution_plans/mod.rs @@ -18,6 +18,7 @@ //! This module contains execution plans that are needed to distribute DataFusion's execution plans into //! several Ballista executors. +mod distributed_explain_analyze; mod distributed_query; mod shuffle_manager; mod shuffle_reader; @@ -27,6 +28,7 @@ mod unresolved_shuffle; #[cfg(feature = "vortex")] pub mod vortex_shuffle; +pub use distributed_explain_analyze::DistributedExplainAnalyzeExec; pub use distributed_query::DistributedQueryExec; pub use shuffle_manager::{ InMemoryShuffleManager, ShufflePartitionData, ShufflePartitionKey, diff --git a/ballista/core/src/planner.rs b/ballista/core/src/planner.rs index 726393d940..5047b932de 100644 --- a/ballista/core/src/planner.rs +++ b/ballista/core/src/planner.rs @@ -16,15 +16,16 @@ // under the License. use crate::config::BallistaConfig; -use crate::execution_plans::DistributedQueryExec; +use crate::execution_plans::{DistributedExplainAnalyzeExec, DistributedQueryExec}; use crate::serde::BallistaLogicalExtensionCodec; +use crate::serde::logical_plan_ext::BallistaExplainNode; use async_trait::async_trait; use datafusion::arrow::datatypes::Schema; use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor}; use datafusion::error::DataFusionError; use datafusion::execution::context::{QueryPlanner, SessionState}; -use datafusion::logical_expr::{LogicalPlan, TableScan}; +use datafusion::logical_expr::{Extension, LogicalPlan, TableScan}; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; @@ -128,13 +129,43 @@ impl QueryPlanner for BallistaQueryPlanner { log::debug!("create_physical_plan - handling empty exec"); Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))) } + LogicalPlan::Analyze(analyze) => { + log::debug!( + "create_physical_plan - handling explain analyze statement" + ); + // Strip the `Analyze` wrapper and run the inner plan as a + // regular distributed job. After the child query stream + // drains, `DistributedExplainAnalyzeExec` fetches per-stage + // metrics via `GetJobMetrics` and renders them locally. + let inner_plan = analyze.input.as_ref().clone(); + let distributed_query_exec = + Arc::new(DistributedQueryExec::::with_extension( + self.scheduler_url.clone(), + self.config.clone(), + inner_plan, + self.extension_codec.clone(), + session_state.session_id().to_string(), + )); + + Ok(Arc::new(DistributedExplainAnalyzeExec::new( + distributed_query_exec, + self.scheduler_url.clone(), + Arc::clone(analyze.schema.inner()), + ))) + } _ => { log::debug!("create_physical_plan - handling general statement"); + // For EXPLAIN, wrap the plan in a Ballista logical + // extension so fields (like `explain_format`) survive + // the client -> scheduler serialization round-trip. The + // scheduler unwraps these before physical planning. + let plan_to_send = wrap_explain_for_distribution(logical_plan); + Ok(Arc::new(DistributedQueryExec::::with_extension( self.scheduler_url.clone(), self.config.clone(), - logical_plan.clone(), + plan_to_send, self.extension_codec.clone(), session_state.session_id().to_string(), ))) @@ -144,6 +175,26 @@ impl QueryPlanner for BallistaQueryPlanner { } } +/// Wrap `LogicalPlan::Explain` in a Ballista logical extension node so that +/// fields such as `explain_format` survive serialization to the scheduler +/// via `datafusion-proto`. Other plans are returned unchanged. +fn wrap_explain_for_distribution(plan: &LogicalPlan) -> LogicalPlan { + match plan { + LogicalPlan::Explain(explain) => { + let node = BallistaExplainNode { + verbose: explain.verbose, + explain_format: explain.explain_format.clone(), + plan: explain.plan.clone(), + schema: explain.schema.clone(), + }; + LogicalPlan::Extension(Extension { + node: Arc::new(node), + }) + } + _ => plan.clone(), + } +} + /// A Visitor which detect if query is using local tables, /// such as tables located in `information_schema` and returns true /// only if all scans are in from local tables diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs index 94c6e8cca4..7cd81bd23c 100644 --- a/ballista/core/src/serde/generated/ballista.rs +++ b/ballista/core/src/serde/generated/ballista.rs @@ -21,6 +21,33 @@ pub mod ballista_physical_plan_node { UnresolvedShuffle(super::UnresolvedShuffleExecNode), } } +/// Ballista wrapper around datafusion Explain logical plan node. +/// Used to preserve `explain_format` across the client -> scheduler boundary +/// because the datafusion-proto `ExplainNode` does not encode that field. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct BallistaExplainNode { + #[prost(bool, tag = "1")] + pub verbose: bool, + /// One of: "indent", "tree", "pgjson", "graphviz" + #[prost(string, tag = "2")] + pub explain_format: ::prost::alloc::string::String, +} +/// Discriminating wrapper encoded as the payload of a Ballista logical +/// extension node. Allows a single codec entry point to route to the +/// appropriate Ballista logical extension type. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct BallistaLogicalExtensionNode { + #[prost(oneof = "ballista_logical_extension_node::Node", tags = "1")] + pub node: ::core::option::Option, +} +/// Nested message and enum types in `BallistaLogicalExtensionNode`. +pub mod ballista_logical_extension_node { + #[derive(Clone, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum Node { + #[prost(message, tag = "1")] + Explain(super::BallistaExplainNode), + } +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct ShuffleWriterExecNode { /// TODO it seems redundant to provide job and stage id here since we also have them @@ -919,6 +946,36 @@ pub struct GetJobStatusParams { #[prost(string, tag = "1")] pub job_id: ::prost::alloc::string::String, } +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetJobMetricsParams { + #[prost(string, tag = "1")] + pub job_id: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct JobStageMetrics { + #[prost(uint32, tag = "1")] + pub stage_id: u32, + #[prost(uint32, tag = "2")] + pub partitions: u32, + #[prost(message, repeated, tag = "3")] + pub operators: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct OperatorWithMetrics { + #[prost(uint32, tag = "1")] + pub depth: u32, + #[prost(string, tag = "2")] + pub operator_type: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub operator_desc: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "4")] + pub metrics: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetJobMetricsResult { + #[prost(message, repeated, tag = "1")] + pub stages: ::prost::alloc::vec::Vec, +} #[derive(Clone, PartialEq, ::prost::Message)] pub struct SuccessfulJob { #[prost(message, repeated, tag = "1")] @@ -1457,6 +1514,32 @@ pub mod scheduler_grpc_client { ); self.inner.unary(req, path, codec).await } + pub async fn get_job_metrics( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/ballista.protobuf.SchedulerGrpc/GetJobMetrics", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("ballista.protobuf.SchedulerGrpc", "GetJobMetrics"), + ); + self.inner.unary(req, path, codec).await + } /// Used by Executor to tell Scheduler it is stopped. pub async fn executor_stopped( &mut self, @@ -1659,6 +1742,13 @@ pub mod scheduler_grpc_server { tonic::Response, tonic::Status, >; + async fn get_job_metrics( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Used by Executor to tell Scheduler it is stopped. async fn executor_stopped( &self, @@ -2138,6 +2228,51 @@ pub mod scheduler_grpc_server { }; Box::pin(fut) } + "/ballista.protobuf.SchedulerGrpc/GetJobMetrics" => { + #[allow(non_camel_case_types)] + struct GetJobMetricsSvc(pub Arc); + impl< + T: SchedulerGrpc, + > tonic::server::UnaryService + for GetJobMetricsSvc { + type Response = super::GetJobMetricsResult; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_job_metrics(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetJobMetricsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/ballista.protobuf.SchedulerGrpc/ExecutorStopped" => { #[allow(non_camel_case_types)] struct ExecutorStoppedSvc(pub Arc); diff --git a/ballista/core/src/serde/logical_plan_ext.rs b/ballista/core/src/serde/logical_plan_ext.rs new file mode 100644 index 0000000000..87adcd15af --- /dev/null +++ b/ballista/core/src/serde/logical_plan_ext.rs @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Ballista-specific logical plan extension nodes. +//! +//! These extension nodes wrap DataFusion `LogicalPlan::Explain` so that +//! fields not covered by `datafusion-proto` (notably `ExplainFormat`) are +//! preserved when the plan is sent from the client to the Ballista +//! scheduler. +//! +//! The client planner wraps an `Explain` in `BallistaExplainNode` before +//! submitting it to the scheduler. The scheduler unwraps it back to the +//! native `LogicalPlan::Explain` before physical planning. + +use std::cmp::Ordering; +use std::collections::HashSet; +use std::fmt; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use datafusion::common::DFSchemaRef; +use datafusion::common::format::ExplainFormat; +use datafusion::logical_expr::{ + Expr, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore, +}; + +/// Ballista wrapper for `LogicalPlan::Explain`. +/// +/// Preserves fields that are lost by the default `datafusion-proto` +/// `ExplainNode` (in particular `explain_format`). +#[derive(Debug, Clone)] +pub struct BallistaExplainNode { + /// Whether verbose output was requested. + pub verbose: bool, + /// The explain output format (Indent, Tree, PostgresJSON, Graphviz). + pub explain_format: ExplainFormat, + /// The plan being explained. + pub plan: Arc, + /// Output schema for the explain (typically `plan_type`, `plan`). + pub schema: DFSchemaRef, +} + +impl BallistaExplainNode { + /// Serialize `ExplainFormat` to a stable string identifier. + pub fn format_as_str(format: &ExplainFormat) -> &'static str { + match format { + ExplainFormat::Indent => "indent", + ExplainFormat::Tree => "tree", + ExplainFormat::PostgresJSON => "pgjson", + ExplainFormat::Graphviz => "graphviz", + } + } + + /// Parse a stable string identifier back into `ExplainFormat`. + pub fn format_from_str(s: &str) -> Option { + match s { + "indent" => Some(ExplainFormat::Indent), + "tree" => Some(ExplainFormat::Tree), + "pgjson" => Some(ExplainFormat::PostgresJSON), + "graphviz" => Some(ExplainFormat::Graphviz), + _ => None, + } + } +} + +impl PartialEq for BallistaExplainNode { + fn eq(&self, other: &Self) -> bool { + self.verbose == other.verbose + && self.explain_format == other.explain_format + && self.plan == other.plan + } +} + +impl Eq for BallistaExplainNode {} + +impl PartialOrd for BallistaExplainNode { + fn partial_cmp(&self, other: &Self) -> Option { + match self.verbose.partial_cmp(&other.verbose) { + Some(Ordering::Equal) => { + match Self::format_as_str(&self.explain_format) + .partial_cmp(Self::format_as_str(&other.explain_format)) + { + Some(Ordering::Equal) => self.plan.partial_cmp(&other.plan), + cmp => cmp, + } + } + cmp => cmp, + } + } +} + +impl Hash for BallistaExplainNode { + fn hash(&self, state: &mut H) { + self.verbose.hash(state); + Self::format_as_str(&self.explain_format).hash(state); + self.plan.hash(state); + } +} + +impl UserDefinedLogicalNodeCore for BallistaExplainNode { + fn name(&self) -> &str { + "BallistaExplain" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![self.plan.as_ref()] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn prevent_predicate_push_down_columns(&self) -> HashSet { + HashSet::new() + } + + fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "BallistaExplain: verbose={}, format={}", + self.verbose, + Self::format_as_str(&self.explain_format) + ) + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + mut inputs: Vec, + ) -> datafusion::error::Result { + if inputs.len() != 1 { + return Err(datafusion::error::DataFusionError::Internal(format!( + "BallistaExplainNode expects exactly 1 input, got {}", + inputs.len() + ))); + } + Ok(BallistaExplainNode { + verbose: self.verbose, + explain_format: self.explain_format.clone(), + plan: Arc::new(inputs.pop().unwrap()), + schema: self.schema.clone(), + }) + } +} + +/// Downcast an `Extension`'s node to `BallistaExplainNode` if it is one. +pub fn as_ballista_explain( + node: &dyn UserDefinedLogicalNode, +) -> Option<&BallistaExplainNode> { + node.as_any().downcast_ref::() +} diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs index 235603e416..53af9b1db2 100644 --- a/ballista/core/src/serde/mod.rs +++ b/ballista/core/src/serde/mod.rs @@ -56,6 +56,8 @@ pub use generated::ballista as protobuf; /// Generated protobuf code from Ballista protocol definitions. pub mod generated; +/// Ballista-specific logical plan extension nodes. +pub mod logical_plan_ext; /// Scheduler-specific serialization types and conversions. pub mod scheduler; @@ -187,6 +189,58 @@ impl LogicalExtensionCodec for BallistaLogicalExtensionCodec { inputs: &[datafusion::logical_expr::LogicalPlan], ctx: &TaskContext, ) -> Result { + // Try to decode as a Ballista extension wrapper first. + if let Ok(wrapper) = protobuf::BallistaLogicalExtensionNode::decode(buf) { + use crate::serde::logical_plan_ext::BallistaExplainNode; + use datafusion::logical_expr::Extension; + use protobuf::ballista_logical_extension_node::Node as ExtNode; + + match wrapper.node { + Some(ExtNode::Explain(explain)) => { + if let Some(explain_format) = + BallistaExplainNode::format_from_str(&explain.explain_format) + { + if inputs.len() != 1 { + return Err(DataFusionError::Internal(format!( + "BallistaExplainNode expects 1 input, got {}", + inputs.len() + ))); + } + // Build the output schema for explain: (plan_type, plan). + let schema = Arc::new(datafusion::common::DFSchema::try_from( + datafusion::arrow::datatypes::Schema::new(vec![ + datafusion::arrow::datatypes::Field::new( + "plan_type", + datafusion::arrow::datatypes::DataType::Utf8, + false, + ), + datafusion::arrow::datatypes::Field::new( + "plan", + datafusion::arrow::datatypes::DataType::Utf8, + false, + ), + ]), + )?); + let node = BallistaExplainNode { + verbose: explain.verbose, + explain_format, + plan: Arc::new(inputs[0].clone()), + schema, + }; + return Ok(Extension { + node: Arc::new(node), + }); + } + // Unknown explain format likely means this payload decoded + // permissively into the Ballista wrapper by accident. Fall + // through to the default codec instead of erroring. + } + None => { + // Fall through to default codec. + } + } + } + self.default_codec.try_decode(buf, inputs, ctx) } @@ -195,6 +249,26 @@ impl LogicalExtensionCodec for BallistaLogicalExtensionCodec { node: &datafusion::logical_expr::Extension, buf: &mut Vec, ) -> Result<()> { + use crate::serde::logical_plan_ext::as_ballista_explain; + use protobuf::ballista_logical_extension_node::Node as ExtNode; + + if let Some(explain) = as_ballista_explain(node.node.as_ref()) { + let wrapper = protobuf::BallistaLogicalExtensionNode { + node: Some(ExtNode::Explain(protobuf::BallistaExplainNode { + verbose: explain.verbose, + explain_format: + crate::serde::logical_plan_ext::BallistaExplainNode::format_as_str( + &explain.explain_format, + ) + .to_string(), + })), + }; + wrapper + .encode(buf) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + return Ok(()); + } + self.default_codec.try_encode(node, buf) } @@ -618,4 +692,91 @@ mod test { assert_eq!(decoded_exec.schema().as_ref(), schema.as_ref()); assert_eq!(&decoded_exec.properties().partitioning, &partitioning); } + + /// Build a simple logical plan wrapped in a `BallistaExplainNode` to + /// exercise the logical-extension codec round-trip. + fn wrap_in_explain( + format: datafusion::common::format::ExplainFormat, + verbose: bool, + ) -> datafusion::logical_expr::LogicalPlan { + use crate::serde::logical_plan_ext::BallistaExplainNode; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::logical_expr::{EmptyRelation, Extension, LogicalPlan}; + + let inner = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + let schema = Arc::new( + DFSchema::try_from(Schema::new(vec![ + Field::new("plan_type", DataType::Utf8, false), + Field::new("plan", DataType::Utf8, false), + ])) + .unwrap(), + ); + LogicalPlan::Extension(Extension { + node: Arc::new(BallistaExplainNode { + verbose, + explain_format: format, + plan: Arc::new(inner), + schema, + }), + }) + } + + /// All four `ExplainFormat` variants round-trip through the codec. + #[tokio::test] + async fn test_ballista_explain_node_codec_roundtrip() { + use crate::serde::logical_plan_ext::as_ballista_explain; + use datafusion::common::format::ExplainFormat; + use datafusion::logical_expr::LogicalPlan; + + let ctx = SessionContext::new().task_ctx(); + let codec = BallistaLogicalExtensionCodec::default(); + + for (format, verbose) in [ + (ExplainFormat::Indent, false), + (ExplainFormat::Tree, false), + (ExplainFormat::PostgresJSON, true), + (ExplainFormat::Graphviz, true), + ] { + let original = wrap_in_explain(format.clone(), verbose); + + let plan_message = + LogicalPlanNode::try_from_logical_plan(&original, &codec).unwrap(); + let mut buf: Vec = vec![]; + plan_message.try_encode(&mut buf).unwrap(); + + let decoded_message = LogicalPlanNode::try_decode(&buf).unwrap(); + let decoded = decoded_message.try_into_logical_plan(&ctx, &codec).unwrap(); + + let LogicalPlan::Extension(ext) = &decoded else { + panic!("expected Extension, got {decoded:?}"); + }; + let ballista = as_ballista_explain(ext.node.as_ref()) + .expect("decoded node must be BallistaExplainNode"); + assert_eq!(ballista.verbose, verbose); + assert_eq!(ballista.explain_format, format); + } + } + + /// `ExplainFormat` serializes to a stable set of string identifiers. + #[test] + fn test_explain_format_str_stable() { + use crate::serde::logical_plan_ext::BallistaExplainNode; + use datafusion::common::format::ExplainFormat; + + for f in [ + ExplainFormat::Indent, + ExplainFormat::Tree, + ExplainFormat::PostgresJSON, + ExplainFormat::Graphviz, + ] { + let s = BallistaExplainNode::format_as_str(&f); + let parsed = + BallistaExplainNode::format_from_str(s).expect("round-trip must succeed"); + assert_eq!(parsed, f); + } + assert!(BallistaExplainNode::format_from_str("bogus").is_none()); + } } diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index a29605de44..8990ce0206 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -25,12 +25,12 @@ use ballista_core::serde::protobuf::{ CleanJobDataResult, CreateUpdateSessionParams, CreateUpdateSessionResult, ExecuteQueryFailureResult, ExecuteQueryParams, ExecuteQueryResult, ExecuteQuerySuccessResult, ExecutorHeartbeat, ExecutorStoppedParams, - ExecutorStoppedResult, GetCatalogParams, GetCatalogResult, GetJobStatusParams, - GetJobStatusResult, GetRemoteFunctionsParams, GetRemoteFunctionsResult, - HeartBeatParams, HeartBeatResult, PollWorkParams, PollWorkResult, - RegisterExecutorParams, RegisterExecutorResult, RemoveSessionParams, - RemoveSessionResult, UpdateTaskStatusParams, UpdateTaskStatusResult, - execute_query_failure_result, execute_query_result, + ExecutorStoppedResult, GetCatalogParams, GetCatalogResult, GetJobMetricsParams, + GetJobMetricsResult, GetJobStatusParams, GetJobStatusResult, + GetRemoteFunctionsParams, GetRemoteFunctionsResult, HeartBeatParams, HeartBeatResult, + PollWorkParams, PollWorkResult, RegisterExecutorParams, RegisterExecutorResult, + RemoveSessionParams, RemoveSessionResult, UpdateTaskStatusParams, + UpdateTaskStatusResult, execute_query_failure_result, execute_query_result, }; use ballista_core::serde::scheduler::ExecutorMetadata; use datafusion_proto::logical_plan::AsLogicalPlan; @@ -516,6 +516,138 @@ impl SchedulerGrpc } } + async fn get_job_metrics( + &self, + request: Request, + ) -> Result, Status> { + use ballista_core::error::BallistaError; + use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan}; + + let job_id = request.into_inner().job_id; + trace!("Received get_job_metrics request for job {}", job_id); + + let graph = self + .state + .task_manager + .get_job_execution_graph(&job_id) + .await + .map_err(|e| { + let msg = + format!("Error fetching execution graph for job {job_id}: {e:?}"); + error!("{msg}"); + Status::internal(msg) + })?; + + let graph = graph.ok_or_else(|| { + Status::not_found(format!("Execution graph not found for job {job_id}")) + })?; + + let mut stage_metrics_list = graph + .stages() + .iter() + .filter_map(|(stage_id, stage)| { + let successful = match stage { + crate::state::execution_graph::ExecutionStage::Successful(stage) => { + stage + } + _ => return None, + }; + + let raw_metrics = &successful.stage_metrics; + let mut operators = Vec::with_capacity(raw_metrics.len()); + let mut metric_index = 0; + let operators = (|| -> Result< + Vec, + BallistaError, + > { + let mut stack = vec![(successful.plan.as_ref(), 0_u32)]; + + while let Some((plan, depth)) = stack.pop() { + let operator_desc = { + struct DisplayableOperator<'a> { + plan: &'a dyn ExecutionPlan, + } + + impl std::fmt::Display for DisplayableOperator<'_> { + fn fmt( + &self, + f: &mut std::fmt::Formatter<'_>, + ) -> std::fmt::Result { + self.plan.fmt_as(DisplayFormatType::Default, f) + } + } + + DisplayableOperator { plan }.to_string() + }; + let metrics = if plan.metrics().is_some() { + let metrics: ballista_core::serde::protobuf::OperatorMetricsSet = + raw_metrics + .get(metric_index) + .ok_or_else(|| { + BallistaError::Internal(format!( + "Missing metrics for operator {} at depth {}", + plan.name(), + depth + )) + })? + .clone() + .try_into()?; + metric_index += 1; + metrics.metrics + } else { + vec![] + }; + + operators.push( + ballista_core::serde::protobuf::OperatorWithMetrics { + depth, + operator_type: plan.name().to_string(), + operator_desc, + metrics, + }, + ); + + for child in plan.children().into_iter().rev() { + stack.push((child.as_ref(), depth + 1)); + } + } + + Ok(operators) + })() + .and_then(|operators| { + if metric_index == raw_metrics.len() { + Ok(operators) + } else { + Err(BallistaError::Internal(format!( + "Stage metrics size mismatch for job {job_id} stage {stage_id}: operators {} != stage metrics {}", + metric_index, + raw_metrics.len() + ))) + } + }) + .map_err(|e| { + Status::internal(format!( + "Error serializing job metrics for job {job_id} stage {stage_id}: {e:?}" + )) + }); + + Some(operators.map(|operators| { + ballista_core::serde::protobuf::JobStageMetrics { + stage_id: *stage_id as u32, + partitions: successful.partitions as u32, + operators, + } + })) + }) + .collect::, Status>>()?; + + stage_metrics_list.sort_by_key(|stage| stage.stage_id); + + Ok(Response::new(GetJobMetricsResult { + stages: stage_metrics_list, + })) + } + async fn executor_stopped( &self, request: Request, diff --git a/ballista/scheduler/src/state/distributed_explain.rs b/ballista/scheduler/src/state/distributed_explain.rs index 340aa5013b..4a4c7846b7 100644 --- a/ballista/scheduler/src/state/distributed_explain.rs +++ b/ballista/scheduler/src/state/distributed_explain.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use ballista_core::error::Result; use datafusion::arrow::array::{ListArray, ListBuilder, StringBuilder}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::common::format::ExplainFormat; use datafusion::common::{ScalarValue, UnnestOptions}; use datafusion::logical_expr::{LogicalPlan, PlanType, StringifiedPlan}; use datafusion::physical_expr::PhysicalExpr; @@ -60,32 +61,65 @@ pub(crate) async fn generate_distributed_explain_plan( pub(crate) fn extract_logical_and_physical_plans( plans: &[StringifiedPlan], + format: &ExplainFormat, ) -> (String, String) { - let logical_txt = plans - .iter() - .rev() - .find(|p| matches!(p.plan_type, PlanType::FinalAnalyzedLogicalPlan)) - .or_else(|| plans.first()) - .map(|p| p.plan.to_string()) - .unwrap_or("logical plan not available".to_string()); - - let physical_txt = plans - .iter() - .find(|p| matches!(p.plan_type, PlanType::FinalPhysicalPlan)) - .map(|p| p.plan.to_string()) - .unwrap_or_else(|| "".to_string()); - - (logical_txt, physical_txt) + // For Tree format, DataFusion only provides physical_plan with tree rendering + // and doesn't include logical plan. We match that behavior. + match format { + ExplainFormat::Tree => { + // For tree format, logical plan is not available (matching DataFusion behavior) + let logical_txt = String::new(); + + let physical_txt = plans + .iter() + .find(|p| matches!(p.plan_type, PlanType::FinalPhysicalPlan)) + .map(|p| p.plan.to_string()) + .unwrap_or_else(|| "".to_string()); + + (logical_txt, physical_txt) + } + ExplainFormat::Indent | ExplainFormat::PostgresJSON | ExplainFormat::Graphviz => { + let logical_txt = plans + .iter() + .rev() + .find(|p| matches!(p.plan_type, PlanType::FinalAnalyzedLogicalPlan)) + .or_else(|| { + // Fall back to the pre-analysis FinalLogicalPlan when the + // analyzed plan is not present. + plans + .iter() + .find(|p| matches!(p.plan_type, PlanType::FinalLogicalPlan)) + }) + .or_else(|| plans.first()) + .map(|p| p.plan.to_string()) + .unwrap_or_else(|| "logical plan not available".to_string()); + + let physical_txt = plans + .iter() + .find(|p| matches!(p.plan_type, PlanType::FinalPhysicalPlan)) + .map(|p| p.plan.to_string()) + .unwrap_or_else(|| "".to_string()); + + (logical_txt, physical_txt) + } + } } /// Build a distributed explain execution plan that produces a two-column table: /// +/// For Indent format (default): /// | plan_type | plan | /// |------------------|-----------------| /// | logical_plan | logical_txt | /// | physical_plan | physical_txt | /// | distributed_plan | distributed_txt | /// +/// For Tree format (matching DataFusion behavior - only physical plan is shown): +/// | plan_type | plan | +/// |------------------|-----------------| +/// | physical_plan | physical_txt | +/// | distributed_plan | distributed_txt | +/// /// The transformed physical tree looks like: /// CoalescePartitionsExec /// └─ ProjectionExec: expr=[list_type -> plan_type, list_plan -> plan] @@ -96,28 +130,44 @@ pub(crate) fn construct_distributed_explain_exec( logical_txt: String, physical_txt: String, distributed_txt: String, + format: &ExplainFormat, ) -> Result> { let place_holder_row: Arc = Arc::new(PlaceholderRowExec::new(Arc::new(Schema::empty()))); - // construct list_type as ["logical_plan","physical_plan","distributed_plan"] + // Determine which rows to include based on format + // For Tree format, DataFusion omits logical_plan + let (plan_types, plan_texts): (Vec<&str>, Vec<&str>) = match format { + ExplainFormat::Tree => ( + vec!["physical_plan", "distributed_plan"], + vec![&physical_txt, &distributed_txt], + ), + ExplainFormat::Indent | ExplainFormat::PostgresJSON | ExplainFormat::Graphviz => { + ( + vec!["logical_plan", "physical_plan", "distributed_plan"], + vec![&logical_txt, &physical_txt, &distributed_txt], + ) + } + }; + + // construct list_type from plan_types let mut type_list_builder = ListBuilder::new(StringBuilder::new()); { let vb = type_list_builder.values(); - vb.append_value("logical_plan"); - vb.append_value("physical_plan"); - vb.append_value("distributed_plan"); + for plan_type in &plan_types { + vb.append_value(plan_type); + } } type_list_builder.append(true); let list_type_array: Arc = Arc::new(type_list_builder.finish()); - // construct list_plan as [, , ] + // construct list_plan from plan_texts let mut plan_list_builder = ListBuilder::new(StringBuilder::new()); { let vb = plan_list_builder.values(); - vb.append_value(&logical_txt); - vb.append_value(&physical_txt); - vb.append_value(&distributed_txt); + for plan_text in &plan_texts { + vb.append_value(plan_text); + } } plan_list_builder.append(true); let list_plan_array: Arc = Arc::new(plan_list_builder.finish()); diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index 6f0862911d..87cb8e3dfb 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -1011,8 +1011,8 @@ impl ExecutionGraph { /// Returns all currently running tasks along with the executor ID on which they are assigned. pub fn running_tasks(&self) -> Vec { self.stages - .iter() - .flat_map(|(_, stage)| { + .values() + .flat_map(|stage| { if let ExecutionStage::Running(stage) = stage { stage .running_tasks() diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs index 15d44774af..03683b8e8e 100644 --- a/ballista/scheduler/src/state/executor_manager.rs +++ b/ballista/scheduler/src/state/executor_manager.rs @@ -437,8 +437,8 @@ impl ExecutorManager { self.cluster_state .executor_heartbeats() - .iter() - .filter_map(|(_exec, heartbeat)| { + .values() + .filter_map(|heartbeat| { let terminating = matches!( heartbeat .status diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index aef8df736e..c7cc216341 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -42,8 +42,10 @@ use crate::state::execution_graph::TaskDescription; use ballista_core::error::{BallistaError, Result}; use ballista_core::event_loop::EventSender; use ballista_core::serde::BallistaCodec; +use ballista_core::serde::logical_plan_ext::as_ballista_explain; use ballista_core::serde::protobuf::TaskStatus; -use datafusion::logical_expr::LogicalPlan; +use datafusion::common::format::ExplainFormat; +use datafusion::logical_expr::{Explain as DFExplain, LogicalPlan}; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::empty::EmptyExec; use datafusion::prelude::SessionContext; @@ -103,6 +105,26 @@ pub fn encode_protobuf(msg: &T) -> Result> { Ok(value) } +/// If the root `LogicalPlan` is a Ballista logical extension wrapping an +/// `Explain`, return a native `LogicalPlan::Explain` reconstructed with all +/// fields intact. +fn unwrap_ballista_explain(plan: &LogicalPlan) -> Option { + let LogicalPlan::Extension(ext) = plan else { + return None; + }; + if let Some(explain) = as_ballista_explain(ext.node.as_ref()) { + return Some(LogicalPlan::Explain(DFExplain { + verbose: explain.verbose, + explain_format: explain.explain_format.clone(), + plan: explain.plan.clone(), + stringified_plans: vec![], + schema: explain.schema.clone(), + logical_optimization_succeeded: false, + })); + } + None +} + /// Shared state for the Ballista scheduler. /// /// Contains managers for executors, tasks, and sessions. @@ -475,6 +497,14 @@ impl SchedulerState Result<()> { let start = Instant::now(); let session_config = Arc::new(session_ctx.copied_config()); + + // Unwrap any Ballista logical extension Explain node that was wrapped + // by the client to preserve the explain format through serialization, + // and restore the native LogicalPlan::Explain so DataFusion's + // physical planner can handle it. + let unwrapped_plan = unwrap_ballista_explain(plan); + let plan = unwrapped_plan.as_ref().unwrap_or(plan); + if log::max_level() >= log::Level::Debug { // optimizing the plan here is redundant because the physical planner will do this again // but it is helpful to see what the optimized plan will be @@ -483,6 +513,7 @@ impl SchedulerState> = None; + let mut explain_format: Option = None; plan.apply(&mut |plan: &LogicalPlan| { if let LogicalPlan::TableScan(scan) = plan { let provider = source_as_provider(&scan.source)?; @@ -520,6 +551,7 @@ impl SchedulerState SchedulerState| { if node.output_partitioning().partition_count() == 0 { let empty: Arc = @@ -552,7 +587,7 @@ impl SchedulerState = @@ -560,6 +595,7 @@ impl SchedulerState TaskManager /// Get the execution graph of of a job. First look in the active cache. /// If no one found, then in the Active/Completed jobs. - #[cfg(feature = "rest-api")] pub(crate) async fn get_job_execution_graph( &self, job_id: &str,