From fd0531358e5bb049bd324936ba8a7bfab6ecdaa6 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc <879445+phillipleblanc@users.noreply.github.com> Date: Thu, 23 Apr 2026 15:32:37 +0900 Subject: [PATCH 1/8] feat: distributed EXPLAIN, EXPLAIN FORMAT TREE, and EXPLAIN ANALYZE Reimplements PR #31 on top of spiceai-52.5 (DataFusion 52). EXPLAIN Round-trip the ExplainFormat through the client -> scheduler boundary by wrapping the LogicalPlan::Explain in a BallistaExplainNode logical extension before serialization. The scheduler unwraps it back to a native LogicalPlan::Explain so its existing physical-planning intercept can substitute a distributed-aware ExplainExec replacement. EXPLAIN FORMAT TREE Honored end-to-end by threading the ExplainFormat through extract_logical_and_physical_plans and construct_distributed_explain_exec in scheduler/state/distributed_explain.rs (Tree format omits the logical_plan row to match DataFusion's native behavior). EXPLAIN ANALYZE - Client (BallistaQueryPlanner): strips the LogicalPlan::Analyze and runs the inner plan via DistributedQueryExec, wrapped in a new DistributedExplainAnalyzeExec. After the child stream drains, the wrapper publishes the job_id (added Arc>> handle on DistributedQueryExec) and calls the scheduler's GetJobMetrics RPC. - Scheduler: new GetJobMetrics RPC walks the execution graph in the same pre-order DFS order as ballista_core::utils::collect_plan_metrics so per-operator metrics line up with the rendered plan text. Falls back from the active-job cache to the saved completed-job graph so the call still succeeds after succeed_job moves the graph out of active_job_cache. Includes ballista/client tests covering all three forms in both standalone and remote modes. --- ballista/client/tests/context_checks.rs | 159 ++++++- ballista/core/proto/ballista.proto | 41 ++ .../distributed_explain_analyze.rs | 389 ++++++++++++++++++ .../src/execution_plans/distributed_query.rs | 27 ++ ballista/core/src/execution_plans/mod.rs | 2 + ballista/core/src/extension.rs | 126 ++++++ ballista/core/src/planner.rs | 62 ++- ballista/core/src/serde/generated/ballista.rs | 134 +++++- ballista/core/src/serde/mod.rs | 148 ++++++- ballista/executor/src/execution_loop.rs | 10 +- .../scheduler/src/scheduler_server/grpc.rs | 148 ++++++- .../src/state/distributed_explain.rs | 97 +++-- ballista/scheduler/src/state/mod.rs | 53 ++- ballista/scheduler/src/state/task_manager.rs | 1 - 14 files changed, 1363 insertions(+), 34 deletions(-) create mode 100644 ballista/core/src/execution_plans/distributed_explain_analyze.rs diff --git a/ballista/client/tests/context_checks.rs b/ballista/client/tests/context_checks.rs index 0faf3b9b7d..e283e29707 100644 --- a/ballista/client/tests/context_checks.rs +++ b/ballista/client/tests/context_checks.rs @@ -24,11 +24,14 @@ mod supported { standalone_context_with_state, }; use ballista_core::config::BallistaConfig; - + use datafusion::arrow::array::StringArray; + use datafusion::arrow::array::Array; + use datafusion::arrow::record_batch::RecordBatch; use datafusion::physical_plan::collect; use datafusion::prelude::*; use datafusion::{assert_batches_eq, prelude::SessionContext}; use rstest::*; + use std::sync::Arc; use std::path::PathBuf; #[rstest::fixture] @@ -1105,4 +1108,158 @@ mod supported { Ok(()) } + + #[rstest] + #[case::standalone(standalone_context())] + #[case::remote(remote_context())] + #[tokio::test] + async fn should_execute_explain_format_tree_query_correctly( + #[future(awt)] + #[case] + ctx: SessionContext, + ) { + let result = ctx + .sql("EXPLAIN FORMAT TREE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // With the Ballista logical extension codec, FORMAT TREE round-trips + // through the distributed scheduler. The result contains both the + // tree-rendered physical_plan and the Ballista distributed_plan. + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_columns(), 2); + // Tree format: physical_plan and distributed_plan (no logical_plan) + assert_eq!(batch.column(0).len(), 2); + + let plan_type_arr = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(plan_type_arr.value(0), "physical_plan"); + assert_eq!(plan_type_arr.value(1), "distributed_plan"); + + let plan_arr = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + let physical_plan_txt = plan_arr.value(0); + // Tree format uses box drawing characters like ┌, ─, ┐, │, └, ┘. + assert!( + physical_plan_txt.contains('┌') || physical_plan_txt.contains('│'), + "Expected tree format with box characters in physical_plan, got: {physical_plan_txt}" + ); + + let distributed_plan_txt = plan_arr.value(1); + assert!( + !distributed_plan_txt.is_empty(), + "Expected non-empty distributed_plan" + ); + } + + #[rstest] + #[case::standalone(standalone_context())] + #[case::remote(remote_context())] + #[tokio::test] + async fn should_execute_explain_analyze_query( + #[future(awt)] + #[case] + ctx: SessionContext, + ) -> datafusion::error::Result<()> { + let result = ctx + .sql( + "EXPLAIN ANALYZE select count(*), id from (select unnest([1,2,3,4,5]) as id) group by id", + ) + .await? + .collect() + .await?; + + // Replace metric values with "..." so the assertion isn't sensitive to + // varying timings/byte counts. + let sanitized_plan_text = result[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .value(0) + .lines() + .map(|line| { + if let Some(index) = line.find("metrics=[") { + let prefix = &line[..index]; + let metrics = &line[index + "metrics=[".len()..]; + let sanitized_metrics = metrics.strip_suffix(']').map_or_else( + || "...".to_string(), + |body| { + body.split(", ") + .map(|metric| { + metric.split_once('=').map_or_else( + || "...".to_string(), + |(name, _)| format!("{name}=..."), + ) + }) + .collect::>() + .join(", ") + }, + ); + format!("{prefix}metrics=[{sanitized_metrics}]") + } else { + line.to_string() + } + }) + .collect::>() + .join("\n"); + + let sanitized = RecordBatch::try_new( + result[0].schema(), + vec![ + result[0].column(0).clone(), + Arc::new(StringArray::from(vec![sanitized_plan_text])), + ], + )?; + + // Loose assertions rather than a frozen golden snapshot, since the + // exact metric set per operator can change between DataFusion releases. + let plan_type_arr = sanitized + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(plan_type_arr.len(), 1); + assert_eq!(plan_type_arr.value(0), "Plan with Metrics"); + + let plan_arr = sanitized + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let plan_text = plan_arr.value(0); + assert!( + plan_text.contains("SuccessfulStage[stage_id=1"), + "expected stage 1 in plan, got:\n{plan_text}" + ); + assert!( + plan_text.contains("SuccessfulStage[stage_id=2"), + "expected stage 2 in plan, got:\n{plan_text}" + ); + assert!( + plan_text.contains("ShuffleWriterExec"), + "expected ShuffleWriterExec in plan, got:\n{plan_text}" + ); + assert!( + plan_text.contains("AggregateExec"), + "expected AggregateExec in plan, got:\n{plan_text}" + ); + assert!( + plan_text.contains("metrics=["), + "expected per-operator metrics in plan, got:\n{plan_text}" + ); + + Ok(()) + } } diff --git a/ballista/core/proto/ballista.proto b/ballista/core/proto/ballista.proto index add7b035cd..ccbaa974ed 100644 --- a/ballista/core/proto/ballista.proto +++ b/ballista/core/proto/ballista.proto @@ -33,6 +33,7 @@ import "datafusion_common.proto"; message BallistaLogicalPlanNode { oneof LogicalPlanType { LogicalPlanCacheNode cache_node = 1; + LogicalPlanExplainNode explain_node = 2; } } @@ -41,6 +42,16 @@ message LogicalPlanCacheNode { string session_id = 2; } +// Ballista wrapper around datafusion's Explain logical plan node. +// Used to preserve `explain_format` across the client -> scheduler boundary, +// because `datafusion-proto`'s `ExplainNode` does not encode that field. +message LogicalPlanExplainNode { + bool verbose = 1; + // One of: "indent", "tree", "pgjson", "graphviz". See `BallistaExplainNode` + // in `core/src/extension.rs` for the canonical mapping. + string explain_format = 2; +} + /////////////////////////////////////////////////////////////////////////////////////////////////// // Ballista Physical Plan /////////////////////////////////////////////////////////////////////////////////////////////////// @@ -631,6 +642,31 @@ message ExecuteQueryFailureResult { } } +message GetJobMetricsParams { + string job_id = 1; +} + +message JobStageMetrics { + uint32 stage_id = 1; + uint32 partitions = 2; + repeated OperatorWithMetrics operators = 3; +} + +message OperatorWithMetrics { + // Pre-order DFS depth in the stage's plan tree (root = 0). + uint32 depth = 1; + // ExecutionPlan::name(), e.g. "FilterExec". + string operator_type = 2; + // Single-line operator description, equivalent to + // `DisplayableExecutionPlan::indent` for that node only. + string operator_desc = 3; + repeated OperatorMetric metrics = 4; +} + +message GetJobMetricsResult { + repeated JobStageMetrics stages = 1; +} + message GetJobStatusParams { string job_id = 1; } @@ -834,6 +870,11 @@ service SchedulerGrpc { rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {} + // Returns per-stage / per-operator metrics for a successfully-finished job. + // Used by the client `DistributedExplainAnalyzeExec` to render an + // `EXPLAIN ANALYZE` result without changing the wire format of normal jobs. + rpc GetJobMetrics (GetJobMetricsParams) returns (GetJobMetricsResult) {} + // Used by Executor to tell Scheduler it is stopped. rpc ExecutorStopped (ExecutorStoppedParams) returns (ExecutorStoppedResult) {} diff --git a/ballista/core/src/execution_plans/distributed_explain_analyze.rs b/ballista/core/src/execution_plans/distributed_explain_analyze.rs new file mode 100644 index 0000000000..53d8c13248 --- /dev/null +++ b/ballista/core/src/execution_plans/distributed_explain_analyze.rs @@ -0,0 +1,389 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! `EXPLAIN ANALYZE` for distributed Ballista jobs. +//! +//! On the client side, [`BallistaQueryPlanner`] strips `LogicalPlan::Analyze` +//! and runs the inner plan as a regular distributed job wrapped in +//! [`DistributedExplainAnalyzeExec`]. After the child stream drains (i.e. the +//! distributed job has succeeded), this exec calls the new +//! `SchedulerGrpc::GetJobMetrics` RPC and renders per-stage / per-operator +//! metrics into a single-row `RecordBatch` matching DataFusion's `Analyze` +//! output schema. + +use crate::execution_plans::DistributedQueryExec; +use crate::extension::SessionConfigExt; +use crate::serde::protobuf::{ + self, GetJobMetricsParams, GetJobMetricsResult, + scheduler_grpc_client::SchedulerGrpcClient, +}; +use crate::utils::{GrpcClientConfig, create_grpc_client_endpoint}; +use datafusion::arrow::array::StringBuilder; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::{DataFusionError, Result, internal_err}; +use datafusion::execution::context::TaskContext; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + SendableRecordBatchStream, +}; +use datafusion_proto::logical_plan::AsLogicalPlan; +use futures::StreamExt; +use std::any::Any; +use std::convert::TryInto; +use std::marker::PhantomData; +use std::sync::Arc; + +/// Wrapper around [`DistributedQueryExec`] that fetches stage metrics from +/// the scheduler and renders them after the inner stream completes. +#[derive(Debug, Clone)] +pub struct DistributedExplainAnalyzeExec { + child: Arc, + scheduler_url: String, + schema: SchemaRef, + properties: PlanProperties, + phantom: PhantomData, +} + +impl DistributedExplainAnalyzeExec { + /// Creates a new explain-analyze wrapper around a `DistributedQueryExec`. + pub fn new( + child: Arc>, + scheduler_url: String, + schema: SchemaRef, + ) -> Self { + let properties = Self::compute_properties(Arc::clone(&schema)); + Self { + child, + scheduler_url, + schema, + properties, + phantom: PhantomData, + } + } + + fn compute_properties(schema: SchemaRef) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + datafusion::physical_plan::execution_plan::EmissionType::Final, + datafusion::physical_plan::execution_plan::Boundedness::Bounded, + ) + } +} + +impl DisplayAs for DistributedExplainAnalyzeExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "DistributedExplainAnalyzeExec: scheduler_url={}", + self.scheduler_url + ) + } + DisplayFormatType::TreeRender => { + writeln!(f, "scheduler_url={}", self.scheduler_url) + } + } + } +} + +impl ExecutionPlan for DistributedExplainAnalyzeExec { + fn name(&self) -> &str { + "DistributedExplainAnalyzeExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> Result> { + if children.len() != 1 { + return internal_err!( + "DistributedExplainAnalyzeExec expected one child, got {}", + children.len() + ); + } + + let child = children.pop().unwrap(); + if child + .as_any() + .downcast_ref::>() + .is_some() + { + return Ok(Arc::new(Self { + child, + scheduler_url: self.scheduler_url.clone(), + schema: Arc::clone(&self.schema), + properties: Self::compute_properties(Arc::clone(&self.schema)), + phantom: PhantomData, + })); + } + + internal_err!( + "DistributedExplainAnalyzeExec requires a DistributedQueryExec child" + ) + } + + fn execute( + &self, + partition: usize, + ctx: Arc, + ) -> Result { + assert_eq!(0, partition); + + let child = Arc::clone(&self.child); + let scheduler_url = self.scheduler_url.clone(); + let schema = Arc::clone(&self.schema); + let stream_schema = Arc::clone(&self.schema); + let session_config = ctx.session_config().clone(); + + let stream = futures::stream::once(async move { + // Drain the child stream so the distributed job actually runs and + // its stages publish metrics back to the scheduler. + let mut child_stream = child.execute(partition, ctx)?; + while let Some(batch) = child_stream.next().await { + batch?; + } + + let job_id = child + .as_any() + .downcast_ref::>() + .ok_or_else(|| { + DataFusionError::Internal( + "DistributedExplainAnalyzeExec requires a DistributedQueryExec child" + .into(), + ) + })? + .job_id() + .ok_or_else(|| { + DataFusionError::Internal( + "Distributed query completed without exposing a job_id".into(), + ) + })?; + let job_metrics = + fetch_job_metrics(&scheduler_url, &job_id, session_config).await?; + format_metrics_as_record_batch(&job_metrics, schema) + }); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + stream_schema, + stream, + ))) + } + + fn metrics(&self) -> Option { + None + } +} + +async fn fetch_job_metrics( + scheduler_url: &str, + job_id: &str, + session_config: datafusion::prelude::SessionConfig, +) -> Result { + let grpc_interceptor = session_config.ballista_grpc_interceptor(); + let customize_endpoint = + session_config.ballista_override_create_grpc_client_endpoint(); + let config = session_config.ballista_config(); + let grpc_config = GrpcClientConfig::from(&config); + + let mut endpoint = + create_grpc_client_endpoint(scheduler_url.to_string(), Some(&grpc_config)) + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + + if let Some(ref customize) = customize_endpoint { + endpoint = customize + .configure_endpoint(endpoint) + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + } + + let connection = endpoint + .connect() + .await + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + + let max_message_size = config.default_grpc_client_max_message_size(); + let mut scheduler = SchedulerGrpcClient::with_interceptor( + connection, + grpc_interceptor.as_ref().clone(), + ) + .max_encoding_message_size(max_message_size) + .max_decoding_message_size(max_message_size); + + scheduler + .get_job_metrics(GetJobMetricsParams { + job_id: job_id.to_string(), + }) + .await + .map(|response| response.into_inner()) + .map_err(|e| DataFusionError::Execution(format!("{e:?}"))) +} + +fn format_metrics_as_record_batch( + job_metrics: &GetJobMetricsResult, + schema: SchemaRef, +) -> Result { + let plan = job_metrics + .stages + .iter() + .map(|stage| { + let mut stage_plan = Vec::with_capacity(stage.operators.len() + 1); + stage_plan.push(format!( + "=========SuccessfulStage[stage_id={}, partitions={}]=========", + stage.stage_id, stage.partitions + )); + + for operator in &stage.operators { + let metrics_set: datafusion::physical_plan::metrics::MetricsSet = + protobuf::OperatorMetricsSet { + metrics: operator.metrics.clone(), + } + .try_into() + .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?; + let metrics = metrics_set + .aggregate_by_name() + .sorted_for_display() + .timestamps_removed() + .to_string(); + let indent = " ".repeat(operator.depth as usize); + stage_plan.push(format!( + "{indent}{}, metrics=[{metrics}]", + operator.operator_desc + )); + } + + Ok(stage_plan.join("\n")) + }) + .collect::>>()? + .join("\n\n"); + + let mut type_builder = StringBuilder::with_capacity(1, "Plan with Metrics".len()); + let mut plan_builder = StringBuilder::with_capacity(1, plan.len()); + + type_builder.append_value("Plan with Metrics"); + plan_builder.append_value(plan); + + RecordBatch::try_new( + schema, + vec![ + Arc::new(type_builder.finish()), + Arc::new(plan_builder.finish()), + ], + ) + .map_err(DataFusionError::from) +} + +#[cfg(test)] +mod tests { + use super::format_metrics_as_record_batch; + use datafusion::arrow::array::StringArray; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use std::sync::Arc; + + use crate::serde::protobuf::{ + GetJobMetricsResult, JobStageMetrics, OperatorMetric, OperatorWithMetrics, + operator_metric, + }; + + #[test] + fn test_format_metrics_as_record_batch() { + let schema = Arc::new(Schema::new(vec![ + Field::new("plan_type", DataType::Utf8, false), + Field::new("plan", DataType::Utf8, false), + ])); + let response = GetJobMetricsResult { + stages: vec![ + JobStageMetrics { + stage_id: 0, + partitions: 1, + operators: vec![OperatorWithMetrics { + depth: 0, + operator_type: "ProjectionExec".to_string(), + operator_desc: "ProjectionExec: expr=[a]".to_string(), + metrics: vec![ + OperatorMetric { + metric: Some(operator_metric::Metric::OutputRows(4)), + }, + OperatorMetric { + metric: Some(operator_metric::Metric::ElapseTime( + 15_000_000, + )), + }, + ], + }], + }, + JobStageMetrics { + stage_id: 1, + partitions: 2, + operators: vec![OperatorWithMetrics { + depth: 0, + operator_type: "FilterExec".to_string(), + operator_desc: "FilterExec: a@0 > 1".to_string(), + metrics: vec![OperatorMetric { + metric: Some(operator_metric::Metric::OutputRows(2)), + }], + }], + }, + ], + }; + + let batch = format_metrics_as_record_batch(&response, schema).unwrap(); + + let plan_type = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let plan = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(plan_type.value(0), "Plan with Metrics"); + let expected = [ + "=========SuccessfulStage[stage_id=0, partitions=1]=========", + "ProjectionExec: expr=[a], metrics=[output_rows=4, elapsed_compute=15.00ms]", + "", + "=========SuccessfulStage[stage_id=1, partitions=2]=========", + "FilterExec: a@0 > 1, metrics=[output_rows=2]", + ] + .join("\n"); + assert_eq!(plan.value(0), expected); + } +} diff --git a/ballista/core/src/execution_plans/distributed_query.rs b/ballista/core/src/execution_plans/distributed_query.rs index 218e65af34..a93df70c71 100644 --- a/ballista/core/src/execution_plans/distributed_query.rs +++ b/ballista/core/src/execution_plans/distributed_query.rs @@ -49,6 +49,7 @@ use datafusion_proto::logical_plan::{ }; use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use log::{debug, error, info}; +use parking_lot::Mutex; use std::any::Any; use std::fmt::Debug; use std::marker::PhantomData; @@ -84,6 +85,11 @@ pub struct DistributedQueryExec { /// - job_execution_time_ms: Time spent executing on the cluster (ended_at - started_at) /// - job_scheduling_in_ms: Time job waited in scheduler queue (started_at - queued_at) metrics: ExecutionPlanMetricsSet, + /// Scheduler-assigned job id, populated once after the query is accepted. + /// Read by the parent `DistributedExplainAnalyzeExec` (when present) to + /// fetch per-stage metrics via the `GetJobMetrics` RPC after the result + /// stream drains. + job_id: Arc>>, } impl DistributedQueryExec { @@ -105,6 +111,7 @@ impl DistributedQueryExec { session_id, properties, metrics: ExecutionPlanMetricsSet::new(), + job_id: Arc::new(Mutex::new(None)), } } @@ -127,9 +134,17 @@ impl DistributedQueryExec { session_id, properties, metrics: ExecutionPlanMetricsSet::new(), + job_id: Arc::new(Mutex::new(None)), } } + /// Returns the scheduler-assigned job id once the query has been accepted. + /// Returns `None` if `execute` has not yet submitted the query, or if + /// submission failed. + pub fn job_id(&self) -> Option { + self.job_id.lock().clone() + } + fn compute_properties(schema: SchemaRef) -> PlanProperties { PlanProperties::new( EquivalenceProperties::new(schema), @@ -197,6 +212,7 @@ impl ExecutionPlan for DistributedQueryExec { self.plan.schema().as_arrow().clone().into(), ), metrics: ExecutionPlanMetricsSet::new(), + job_id: Arc::new(Mutex::new(None)), })) } @@ -258,6 +274,7 @@ impl ExecutionPlan for DistributedQueryExec { self.config.default_grpc_client_max_message_size(), GrpcClientConfig::from(&self.config), Arc::new(self.metrics.clone()), + Arc::clone(&self.job_id), partition, session_config, ) @@ -285,6 +302,7 @@ impl ExecutionPlan for DistributedQueryExec { self.config.default_grpc_client_max_message_size(), GrpcClientConfig::from(&self.config), Arc::new(self.metrics.clone()), + Arc::clone(&self.job_id), partition, session_config, ) @@ -330,6 +348,7 @@ async fn execute_query_pull( max_message_size: usize, grpc_config: GrpcClientConfig, metrics: Arc, + job_id_handle: Arc>>, partition: usize, session_config: SessionConfig, ) -> Result> + Send> { @@ -387,6 +406,7 @@ async fn execute_query_pull( ); let job_id = query_result.job_id; + *job_id_handle.lock() = Some(job_id.clone()); let mut prev_status: Option = None; loop { @@ -500,6 +520,7 @@ async fn execute_query_push( max_message_size: usize, grpc_config: GrpcClientConfig, metrics: Arc, + job_id_handle: Arc>>, partition: usize, session_config: SessionConfig, ) -> Result> + Send> { @@ -561,6 +582,12 @@ async fn execute_query_push( .as_ref() .map(|s| s.job_id.to_owned()) .unwrap_or("unknown_job_id".to_string()); // should not happen + if status.is_some() && job_id != "unknown_job_id" { + // Best-effort: publish the job id once it's known so a parent + // `DistributedExplainAnalyzeExec` can find it. Repeated writes are + // cheap and idempotent. + *job_id_handle.lock() = Some(job_id.clone()); + } let status = status.and_then(|s| s.status); let has_status_change = prev_status != status; match status { diff --git a/ballista/core/src/execution_plans/mod.rs b/ballista/core/src/execution_plans/mod.rs index c98af166d7..9f8f944b2b 100644 --- a/ballista/core/src/execution_plans/mod.rs +++ b/ballista/core/src/execution_plans/mod.rs @@ -18,6 +18,7 @@ //! This module contains execution plans that are needed to distribute DataFusion's execution plans into //! several Ballista executors. +mod distributed_explain_analyze; mod distributed_query; mod shuffle_manager; mod shuffle_reader; @@ -29,6 +30,7 @@ mod unresolved_shuffle; #[cfg(feature = "vortex")] pub mod vortex_shuffle; +pub use distributed_explain_analyze::DistributedExplainAnalyzeExec; pub use distributed_query::DistributedQueryExec; pub use shuffle_manager::{ InMemoryShuffleManager, ShufflePartitionData, ShufflePartitionKey, diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs index 4bf8b81b84..f16b11dbec 100644 --- a/ballista/core/src/extension.rs +++ b/ballista/core/src/extension.rs @@ -1159,6 +1159,132 @@ impl UserDefinedLogicalNodeCore for BallistaCacheNode { }) } } +/// Ballista wrapper around `LogicalPlan::Explain`. +/// +/// `datafusion-proto` serializes `LogicalPlan::Explain` through its built-in +/// `ExplainNode`, which discards the `explain_format` field (the variant +/// selected by `EXPLAIN FORMAT TREE` / `EXPLAIN FORMAT JSON` / etc.). The +/// client wraps the `Explain` in this extension before serialization, and the +/// scheduler unwraps it (see `unwrap_ballista_explain` in +/// `scheduler::state::mod`) into a native `LogicalPlan::Explain` before +/// physical planning, so DataFusion renders the user-requested format. +#[derive(Debug, Clone)] +pub struct BallistaExplainNode { + /// Whether `EXPLAIN VERBOSE` was requested. + pub verbose: bool, + /// The explain output format. Stored as a stable string so changes to the + /// `ExplainFormat` enum on either side do not break the wire format. + pub explain_format: String, + /// The plan being explained. + pub plan: Arc, + /// Output schema for `EXPLAIN`: `(plan_type, plan)`. + pub schema: DFSchemaRef, +} + +impl BallistaExplainNode { + /// Stable string identifier for an `ExplainFormat`. Must round-trip with + /// [`Self::format_from_str`]. + pub fn format_as_str( + format: &datafusion::common::format::ExplainFormat, + ) -> &'static str { + use datafusion::common::format::ExplainFormat; + match format { + ExplainFormat::Indent => "indent", + ExplainFormat::Tree => "tree", + ExplainFormat::PostgresJSON => "pgjson", + ExplainFormat::Graphviz => "graphviz", + } + } + + /// Inverse of [`Self::format_as_str`]. Returns `None` for unknown + /// identifiers, so the codec can fall through to the default handler. + pub fn format_from_str(s: &str) -> Option { + use datafusion::common::format::ExplainFormat; + match s { + "indent" => Some(ExplainFormat::Indent), + "tree" => Some(ExplainFormat::Tree), + "pgjson" => Some(ExplainFormat::PostgresJSON), + "graphviz" => Some(ExplainFormat::Graphviz), + _ => None, + } + } +} + +impl PartialEq for BallistaExplainNode { + fn eq(&self, other: &Self) -> bool { + self.verbose == other.verbose + && self.explain_format == other.explain_format + && self.plan == other.plan + } +} + +impl Eq for BallistaExplainNode {} + +impl std::hash::Hash for BallistaExplainNode { + fn hash(&self, state: &mut H) { + self.verbose.hash(state); + self.explain_format.hash(state); + self.plan.hash(state); + } +} + +impl PartialOrd for BallistaExplainNode { + fn partial_cmp(&self, other: &Self) -> Option { + match self.verbose.partial_cmp(&other.verbose) { + Some(std::cmp::Ordering::Equal) => { + match self.explain_format.partial_cmp(&other.explain_format) { + Some(std::cmp::Ordering::Equal) => self.plan.partial_cmp(&other.plan), + other => other, + } + } + other => other, + } + } +} + +impl UserDefinedLogicalNodeCore for BallistaExplainNode { + fn name(&self) -> &str { + "BallistaExplainNode" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![self.plan.as_ref()] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "BallistaExplainNode: verbose={}, format={}", + self.verbose, self.explain_format + ) + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + inputs: Vec, + ) -> datafusion::error::Result { + let [input] = <[LogicalPlan; 1]>::try_from(inputs).map_err(|_| { + datafusion::error::DataFusionError::Plan( + "BallistaExplainNode expects exactly 1 input".to_string(), + ) + })?; + Ok(Self { + verbose: self.verbose, + explain_format: self.explain_format.clone(), + plan: Arc::new(input), + schema: self.schema.clone(), + }) + } +} #[cfg(test)] mod test { use datafusion::{ diff --git a/ballista/core/src/planner.rs b/ballista/core/src/planner.rs index 726393d940..07fde9ed97 100644 --- a/ballista/core/src/planner.rs +++ b/ballista/core/src/planner.rs @@ -16,7 +16,8 @@ // under the License. use crate::config::BallistaConfig; -use crate::execution_plans::DistributedQueryExec; +use crate::execution_plans::{DistributedExplainAnalyzeExec, DistributedQueryExec}; +use crate::extension::BallistaExplainNode; use crate::serde::BallistaLogicalExtensionCodec; use async_trait::async_trait; @@ -24,7 +25,7 @@ use datafusion::arrow::datatypes::Schema; use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor}; use datafusion::error::DataFusionError; use datafusion::execution::context::{QueryPlanner, SessionState}; -use datafusion::logical_expr::{LogicalPlan, TableScan}; +use datafusion::logical_expr::{Extension, LogicalPlan, TableScan}; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; @@ -128,13 +129,45 @@ impl QueryPlanner for BallistaQueryPlanner { log::debug!("create_physical_plan - handling empty exec"); Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))) } + LogicalPlan::Analyze(analyze) => { + log::debug!( + "create_physical_plan - handling EXPLAIN ANALYZE statement" + ); + // Strip the `Analyze` wrapper and run the inner plan as a + // regular distributed job. After the child query stream + // drains, `DistributedExplainAnalyzeExec` fetches per-stage + // metrics via the `GetJobMetrics` RPC and renders them + // into a single `(Plan with Metrics, ...)` row. + let inner_plan = analyze.input.as_ref().clone(); + let distributed_query_exec = + Arc::new(DistributedQueryExec::::with_extension( + self.scheduler_url.clone(), + self.config.clone(), + inner_plan, + self.extension_codec.clone(), + session_state.session_id().to_string(), + )); + + Ok(Arc::new(DistributedExplainAnalyzeExec::new( + distributed_query_exec, + self.scheduler_url.clone(), + Arc::clone(analyze.schema.inner()), + ))) + } _ => { log::debug!("create_physical_plan - handling general statement"); + // For `EXPLAIN`, wrap the plan in a Ballista logical + // extension so fields (e.g. `explain_format`) survive the + // client -> scheduler serialization round-trip via + // `datafusion-proto`. The scheduler unwraps these before + // physical planning. Other plans pass through unchanged. + let plan_to_send = wrap_explain_for_distribution(logical_plan); + Ok(Arc::new(DistributedQueryExec::::with_extension( self.scheduler_url.clone(), self.config.clone(), - logical_plan.clone(), + plan_to_send, self.extension_codec.clone(), session_state.session_id().to_string(), ))) @@ -144,6 +177,29 @@ impl QueryPlanner for BallistaQueryPlanner { } } +/// If `plan` is a `LogicalPlan::Explain`, wrap it in a `BallistaExplainNode` +/// extension so `explain_format` survives `datafusion-proto` serialization +/// to the scheduler. Other plans are returned unchanged. +fn wrap_explain_for_distribution(plan: &LogicalPlan) -> LogicalPlan { + match plan { + LogicalPlan::Explain(explain) => { + let node = BallistaExplainNode { + verbose: explain.verbose, + explain_format: BallistaExplainNode::format_as_str( + &explain.explain_format, + ) + .to_string(), + plan: explain.plan.clone(), + schema: explain.schema.clone(), + }; + LogicalPlan::Extension(Extension { + node: Arc::new(node), + }) + } + _ => plan.clone(), + } +} + /// A Visitor which detect if query is using local tables, /// such as tables located in `information_schema` and returns true /// only if all scans are in from local tables diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs index 75e3377d5a..8b724cb637 100644 --- a/ballista/core/src/serde/generated/ballista.rs +++ b/ballista/core/src/serde/generated/ballista.rs @@ -4,7 +4,7 @@ /// ///////////////////////////////////////////////////////////////////////////////////////////////// #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct BallistaLogicalPlanNode { - #[prost(oneof = "ballista_logical_plan_node::LogicalPlanType", tags = "1")] + #[prost(oneof = "ballista_logical_plan_node::LogicalPlanType", tags = "1, 2")] pub logical_plan_type: ::core::option::Option< ballista_logical_plan_node::LogicalPlanType, >, @@ -15,6 +15,8 @@ pub mod ballista_logical_plan_node { pub enum LogicalPlanType { #[prost(message, tag = "1")] CacheNode(super::LogicalPlanCacheNode), + #[prost(message, tag = "2")] + ExplainNode(super::LogicalPlanExplainNode), } } #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -24,6 +26,18 @@ pub struct LogicalPlanCacheNode { #[prost(string, tag = "2")] pub session_id: ::prost::alloc::string::String, } +/// Ballista wrapper around datafusion's Explain logical plan node. +/// Used to preserve `explain_format` across the client -> scheduler boundary, +/// because `datafusion-proto`'s `ExplainNode` does not encode that field. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct LogicalPlanExplainNode { + #[prost(bool, tag = "1")] + pub verbose: bool, + /// One of: "indent", "tree", "pgjson", "graphviz". See `BallistaExplainNode` + /// in `core/src/extension.rs` for the canonical mapping. + #[prost(string, tag = "2")] + pub explain_format: ::prost::alloc::string::String, +} /// ///////////////////////////////////////////////////////////////////////////////////////////////// /// Ballista Physical Plan /// ///////////////////////////////////////////////////////////////////////////////////////////////// @@ -970,6 +984,40 @@ pub mod execute_query_failure_result { } } #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetJobMetricsParams { + #[prost(string, tag = "1")] + pub job_id: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct JobStageMetrics { + #[prost(uint32, tag = "1")] + pub stage_id: u32, + #[prost(uint32, tag = "2")] + pub partitions: u32, + #[prost(message, repeated, tag = "3")] + pub operators: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct OperatorWithMetrics { + /// Pre-order DFS depth in the stage's plan tree (root = 0). + #[prost(uint32, tag = "1")] + pub depth: u32, + /// ExecutionPlan::name(), e.g. "FilterExec". + #[prost(string, tag = "2")] + pub operator_type: ::prost::alloc::string::String, + /// Single-line operator description, equivalent to + /// `DisplayableExecutionPlan::indent` for that node only. + #[prost(string, tag = "3")] + pub operator_desc: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "4")] + pub metrics: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetJobMetricsResult { + #[prost(message, repeated, tag = "1")] + pub stages: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetJobStatusParams { #[prost(string, tag = "1")] pub job_id: ::prost::alloc::string::String, @@ -1553,6 +1601,35 @@ pub mod scheduler_grpc_client { ); self.inner.unary(req, path, codec).await } + /// Returns per-stage / per-operator metrics for a successfully-finished job. + /// Used by the client `DistributedExplainAnalyzeExec` to render an + /// `EXPLAIN ANALYZE` result without changing the wire format of normal jobs. + pub async fn get_job_metrics( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/ballista.protobuf.SchedulerGrpc/GetJobMetrics", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("ballista.protobuf.SchedulerGrpc", "GetJobMetrics"), + ); + self.inner.unary(req, path, codec).await + } /// Used by Executor to tell Scheduler it is stopped. pub async fn executor_stopped( &mut self, @@ -1768,6 +1845,16 @@ pub mod scheduler_grpc_server { tonic::Response, tonic::Status, >; + /// Returns per-stage / per-operator metrics for a successfully-finished job. + /// Used by the client `DistributedExplainAnalyzeExec` to render an + /// `EXPLAIN ANALYZE` result without changing the wire format of normal jobs. + async fn get_job_metrics( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Used by Executor to tell Scheduler it is stopped. async fn executor_stopped( &self, @@ -2294,6 +2381,51 @@ pub mod scheduler_grpc_server { }; Box::pin(fut) } + "/ballista.protobuf.SchedulerGrpc/GetJobMetrics" => { + #[allow(non_camel_case_types)] + struct GetJobMetricsSvc(pub Arc); + impl< + T: SchedulerGrpc, + > tonic::server::UnaryService + for GetJobMetricsSvc { + type Response = super::GetJobMetricsResult; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_job_metrics(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetJobMetricsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/ballista.protobuf.SchedulerGrpc/ExecutorStopped" => { #[allow(non_camel_case_types)] struct ExecutorStoppedSvc(pub Arc); diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs index 1a4433e9e4..91e0861467 100644 --- a/ballista/core/src/serde/mod.rs +++ b/ballista/core/src/serde/mod.rs @@ -18,7 +18,7 @@ //! This crate contains code generated from the Ballista Protocol Buffer Definition as well //! as convenience code for interacting with the generated code. -use crate::extension::BallistaCacheNode; +use crate::extension::{BallistaCacheNode, BallistaExplainNode}; use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction}; use arrow_flight::sql::ProstMessageExt; @@ -214,6 +214,38 @@ impl LogicalExtensionCodec for BallistaLogicalExtensionCodec { .clone(), )), }), + LogicalPlanType::ExplainNode(explain) => { + let input = inputs.first().ok_or_else(|| { + DataFusionError::Plan( + "BallistaExplainNode expects 1 input".to_string(), + ) + })?; + // Recreate the standard EXPLAIN output schema (`plan_type`, + // `plan`) on the scheduler side. The schema is not part of + // the wire format because it is fixed. + let schema = Arc::new(datafusion::common::DFSchema::try_from( + datafusion::arrow::datatypes::Schema::new(vec![ + datafusion::arrow::datatypes::Field::new( + "plan_type", + datafusion::arrow::datatypes::DataType::Utf8, + false, + ), + datafusion::arrow::datatypes::Field::new( + "plan", + datafusion::arrow::datatypes::DataType::Utf8, + false, + ), + ]), + )?); + Ok(Extension { + node: Arc::new(BallistaExplainNode { + verbose: explain.verbose, + explain_format: explain.explain_format, + plan: Arc::new(input.clone()), + schema, + }), + }) + } } } @@ -238,6 +270,25 @@ impl LogicalExtensionCodec for BallistaLogicalExtensionCodec { )) })?; + Ok(()) + } else if let Some(node) = + node.node.as_any().downcast_ref::() + { + let proto = protobuf::BallistaLogicalPlanNode { + logical_plan_type: Some(LogicalPlanType::ExplainNode( + protobuf::LogicalPlanExplainNode { + verbose: node.verbose, + explain_format: node.explain_format.clone(), + }, + )), + }; + + proto.encode(buf).map_err(|e| { + DataFusionError::Internal(format!( + "failed to encode explain node logical plan: {e:?}" + )) + })?; + Ok(()) } else { self.default_codec.try_encode(node, buf) @@ -749,4 +800,99 @@ mod test { assert_eq!(decoded_exec.schema().as_ref(), schema.as_ref()); assert_eq!(&decoded_exec.properties().partitioning, &partitioning); } + + fn wrap_in_explain( + format: datafusion::common::format::ExplainFormat, + verbose: bool, + ) -> LogicalPlan { + use crate::extension::BallistaExplainNode; + use datafusion::logical_expr::Extension; + + let inner = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + let schema = Arc::new( + DFSchema::try_from(Schema::new(vec![ + Field::new("plan_type", DataType::Utf8, false), + Field::new("plan", DataType::Utf8, false), + ])) + .unwrap(), + ); + LogicalPlan::Extension(Extension { + node: Arc::new(BallistaExplainNode { + verbose, + explain_format: BallistaExplainNode::format_as_str(&format).to_string(), + plan: Arc::new(inner), + schema, + }), + }) + } + + /// All four `ExplainFormat` variants round-trip through the codec. + #[tokio::test] + async fn ballista_explain_node_codec_roundtrip() { + use crate::extension::BallistaExplainNode; + use datafusion::common::format::ExplainFormat; + + let ctx = SessionContext::new().task_ctx(); + let codec = BallistaLogicalExtensionCodec::default(); + + for (format, verbose) in [ + (ExplainFormat::Indent, false), + (ExplainFormat::Tree, false), + (ExplainFormat::PostgresJSON, true), + (ExplainFormat::Graphviz, true), + ] { + let original = wrap_in_explain(format.clone(), verbose); + + let plan_message = + LogicalPlanNode::try_from_logical_plan(&original, &codec).unwrap(); + let mut buf: Vec = vec![]; + plan_message.try_encode(&mut buf).unwrap(); + + let decoded_message = LogicalPlanNode::try_decode(&buf).unwrap(); + let decoded = + decoded_message.try_into_logical_plan(&ctx, &codec).unwrap(); + + let LogicalPlan::Extension(ext) = &decoded else { + panic!("expected Extension, got {decoded:?}"); + }; + let ballista = ext + .node + .as_any() + .downcast_ref::() + .expect("decoded node must be BallistaExplainNode"); + assert_eq!(ballista.verbose, verbose); + assert_eq!( + ballista.explain_format, + BallistaExplainNode::format_as_str(&format) + ); + assert_eq!( + BallistaExplainNode::format_from_str(&ballista.explain_format), + Some(format), + ); + } + } + + /// `ExplainFormat` serializes to a stable set of string identifiers, and + /// unknown identifiers are rejected. + #[test] + fn explain_format_str_stable() { + use crate::extension::BallistaExplainNode; + use datafusion::common::format::ExplainFormat; + + for f in [ + ExplainFormat::Indent, + ExplainFormat::Tree, + ExplainFormat::PostgresJSON, + ExplainFormat::Graphviz, + ] { + let s = BallistaExplainNode::format_as_str(&f); + let parsed = BallistaExplainNode::format_from_str(s) + .expect("round-trip must succeed"); + assert_eq!(parsed, f); + } + assert!(BallistaExplainNode::format_from_str("bogus").is_none()); + } } diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 4a0437591f..3033ac7711 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -519,7 +519,8 @@ mod tests { CancelJobParams, CancelJobResult, CleanJobDataParams, CleanJobDataResult, CreateUpdateSessionParams, CreateUpdateSessionResult, ExecuteQueryParams, ExecuteQueryResult, ExecutorRegistration, ExecutorStoppedParams, - ExecutorStoppedResult, GetCatalogParams, GetCatalogResult, GetJobStatusParams, + ExecutorStoppedResult, GetCatalogParams, GetCatalogResult, GetJobMetricsParams, + GetJobMetricsResult, GetJobStatusParams, GetJobStatusResult, GetRemoteFunctionsParams, GetRemoteFunctionsResult, HeartBeatParams, HeartBeatResult, RegisterExecutorParams, RegisterExecutorResult, RemoveSessionParams, RemoveSessionResult, UpdateTaskStatusParams, @@ -601,6 +602,13 @@ mod tests { Err(Status::unimplemented("not needed for test")) } + async fn get_job_metrics( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("not needed for test")) + } + async fn executor_stopped( &self, _request: Request, diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index 77508a9f08..540269cdbb 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -26,7 +26,8 @@ use ballista_core::serde::protobuf::{ CleanJobDataResult, CreateUpdateSessionParams, CreateUpdateSessionResult, ExecuteQueryFailureResult, ExecuteQueryParams, ExecuteQueryResult, ExecuteQuerySuccessResult, ExecutorHeartbeat, ExecutorStoppedParams, - ExecutorStoppedResult, GetCatalogParams, GetCatalogResult, GetJobStatusParams, + ExecutorStoppedResult, GetCatalogParams, GetCatalogResult, GetJobMetricsParams, + GetJobMetricsResult, GetJobStatusParams, GetJobStatusResult, GetRemoteFunctionsParams, GetRemoteFunctionsResult, HeartBeatParams, HeartBeatResult, JobStatus, KeyValuePair, PollWorkParams, PollWorkResult, RegisterExecutorParams, RegisterExecutorResult, RemoveSessionParams, @@ -563,6 +564,65 @@ impl SchedulerGrpc } } + async fn get_job_metrics( + &self, + request: Request, + ) -> Result, Status> { + let job_id = request.into_inner().job_id; + trace!("Received get_job_metrics request for job {}", job_id); + + let graph = self + .state + .task_manager + .get_job_execution_graph(&job_id) + .await + .map_err(|e| { + let msg = format!( + "Error fetching execution graph for job {job_id}: {e:?}" + ); + error!("{msg}"); + Status::internal(msg) + })? + .ok_or_else(|| { + // Most commonly hit when delayed job cleanup ran before the + // client requested metrics. Clients (in particular + // `DistributedExplainAnalyzeExec`) must call this RPC promptly + // after the inner result stream drains. + Status::not_found(format!( + "Execution graph not found for job {job_id} (may have been cleaned up)" + )) + })?; + + let mut stage_metrics_list = graph + .stages() + .iter() + .filter_map(|(stage_id, stage)| { + let successful = match stage { + crate::state::execution_graph::ExecutionStage::Successful(stage) => { + stage + } + _ => return None, + }; + + Some( + serialize_stage_metrics(*stage_id, &job_id, successful).map_err( + |e| { + Status::internal(format!( + "Error serializing job metrics for job {job_id} stage {stage_id}: {e:?}" + )) + }, + ), + ) + }) + .collect::, Status>>()?; + + stage_metrics_list.sort_by_key(|stage| stage.stage_id); + + Ok(Response::new(GetJobMetricsResult { + stages: stage_metrics_list, + })) + } + async fn executor_stopped( &self, request: Request, @@ -684,6 +744,92 @@ fn extract_connect_info(request: &Request) -> Option ballista_core::error::Result { + use ballista_core::error::BallistaError; + use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan}; + + let raw_metrics = &successful.stage_metrics; + let mut operators = Vec::with_capacity(raw_metrics.len()); + let mut metric_index = 0usize; + let mut stack = vec![(successful.plan.as_ref(), 0u32)]; + + while let Some((plan, depth)) = stack.pop() { + // Format just this node (not its children) using the same default + // display format DataFusion uses for `EXPLAIN`. + let operator_desc = { + struct DisplayableOperator<'a> { + plan: &'a dyn ExecutionPlan, + } + impl std::fmt::Display for DisplayableOperator<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.plan.fmt_as(DisplayFormatType::Default, f) + } + } + DisplayableOperator { plan }.to_string() + }; + + let metrics = if plan.metrics().is_some() { + let metrics: ballista_core::serde::protobuf::OperatorMetricsSet = + raw_metrics + .get(metric_index) + .ok_or_else(|| { + BallistaError::Internal(format!( + "Missing metrics for operator {} at depth {}", + plan.name(), + depth + )) + })? + .clone() + .try_into()?; + metric_index += 1; + metrics.metrics + } else { + vec![] + }; + + operators.push(ballista_core::serde::protobuf::OperatorWithMetrics { + depth, + operator_type: plan.name().to_string(), + operator_desc, + metrics, + }); + + // Push children in reverse so the next pop visits the leftmost child + // first, matching `collect_plan_metrics`'s `children().iter()` order. + for child in plan.children().into_iter().rev() { + stack.push((child.as_ref(), depth.saturating_add(1))); + } + } + + if metric_index != raw_metrics.len() { + return Err(BallistaError::Internal(format!( + "Stage metrics size mismatch for job {job_id} stage {stage_id}: \ + consumed {metric_index} != stage_metrics.len() {}", + raw_metrics.len() + ))); + } + + Ok(ballista_core::serde::protobuf::JobStageMetrics { + stage_id: stage_id as u32, + partitions: successful.partitions as u32, + operators, + }) +} + impl SchedulerServer { async fn create_context( &self, diff --git a/ballista/scheduler/src/state/distributed_explain.rs b/ballista/scheduler/src/state/distributed_explain.rs index 6c99456eb1..27aa1d115a 100644 --- a/ballista/scheduler/src/state/distributed_explain.rs +++ b/ballista/scheduler/src/state/distributed_explain.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use ballista_core::error::Result; use datafusion::arrow::array::{ListArray, ListBuilder, StringBuilder}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::common::format::ExplainFormat; use datafusion::common::{ScalarValue, UnnestOptions}; use datafusion::execution::SessionState; use datafusion::logical_expr::{LogicalPlan, PlanType, StringifiedPlan}; @@ -60,32 +61,67 @@ pub(crate) async fn generate_distributed_explain_plan( pub(crate) fn extract_logical_and_physical_plans( plans: &[StringifiedPlan], + format: &ExplainFormat, ) -> (String, String) { - let logical_txt = plans - .iter() - .rev() - .find(|p| matches!(p.plan_type, PlanType::FinalAnalyzedLogicalPlan)) - .or_else(|| plans.first()) - .map(|p| p.plan.to_string()) - .unwrap_or("logical plan not available".to_string()); - - let physical_txt = plans - .iter() - .find(|p| matches!(p.plan_type, PlanType::FinalPhysicalPlan)) - .map(|p| p.plan.to_string()) - .unwrap_or_else(|| "".to_string()); - - (logical_txt, physical_txt) + // For Tree format, DataFusion only emits the physical plan with tree + // rendering and omits the logical plan; we mirror that behavior so the + // Ballista-rendered table matches DataFusion's `EXPLAIN FORMAT TREE`. + match format { + ExplainFormat::Tree => { + let logical_txt = String::new(); + let physical_txt = plans + .iter() + .find(|p| matches!(p.plan_type, PlanType::FinalPhysicalPlan)) + .map(|p| p.plan.to_string()) + .unwrap_or_else(|| "".to_string()); + (logical_txt, physical_txt) + } + ExplainFormat::Indent + | ExplainFormat::PostgresJSON + | ExplainFormat::Graphviz => { + let logical_txt = plans + .iter() + .rev() + .find(|p| matches!(p.plan_type, PlanType::FinalAnalyzedLogicalPlan)) + .or_else(|| { + // Fall back to the pre-analysis FinalLogicalPlan when + // the analyzed plan is not present. + plans + .iter() + .find(|p| matches!(p.plan_type, PlanType::FinalLogicalPlan)) + }) + .or_else(|| plans.first()) + .map(|p| p.plan.to_string()) + .unwrap_or_else(|| "logical plan not available".to_string()); + + let physical_txt = plans + .iter() + .find(|p| matches!(p.plan_type, PlanType::FinalPhysicalPlan)) + .map(|p| p.plan.to_string()) + .unwrap_or_else(|| "".to_string()); + + (logical_txt, physical_txt) + } + } } /// Build a distributed explain execution plan that produces a two-column table: /// +/// For Indent / PostgresJSON / Graphviz format: +/// /// | plan_type | plan | /// |------------------|-----------------| /// | logical_plan | logical_txt | /// | physical_plan | physical_txt | /// | distributed_plan | distributed_txt | /// +/// For Tree format (matching DataFusion behavior - logical plan is omitted): +/// +/// | plan_type | plan | +/// |------------------|-----------------| +/// | physical_plan | physical_txt | +/// | distributed_plan | distributed_txt | +/// /// The transformed physical tree looks like: /// CoalescePartitionsExec /// └─ ProjectionExec: expr=[list_type -> plan_type, list_plan -> plan] @@ -96,28 +132,43 @@ pub(crate) fn construct_distributed_explain_exec( logical_txt: String, physical_txt: String, distributed_txt: String, + format: &ExplainFormat, ) -> Result> { let place_holder_row: Arc = Arc::new(PlaceholderRowExec::new(Arc::new(Schema::empty()))); - // construct list_type as ["logical_plan","physical_plan","distributed_plan"] + // Tree format omits the `logical_plan` row to match DataFusion. + let (plan_types, plan_texts): (Vec<&str>, Vec<&str>) = match format { + ExplainFormat::Tree => ( + vec!["physical_plan", "distributed_plan"], + vec![&physical_txt, &distributed_txt], + ), + ExplainFormat::Indent + | ExplainFormat::PostgresJSON + | ExplainFormat::Graphviz => ( + vec!["logical_plan", "physical_plan", "distributed_plan"], + vec![&logical_txt, &physical_txt, &distributed_txt], + ), + }; + + // construct list_type from plan_types let mut type_list_builder = ListBuilder::new(StringBuilder::new()); { let vb = type_list_builder.values(); - vb.append_value("logical_plan"); - vb.append_value("physical_plan"); - vb.append_value("distributed_plan"); + for plan_type in &plan_types { + vb.append_value(plan_type); + } } type_list_builder.append(true); let list_type_array: Arc = Arc::new(type_list_builder.finish()); - // construct list_plan as [, , ] + // construct list_plan from plan_texts let mut plan_list_builder = ListBuilder::new(StringBuilder::new()); { let vb = plan_list_builder.values(); - vb.append_value(&logical_txt); - vb.append_value(&physical_txt); - vb.append_value(&distributed_txt); + for plan_text in &plan_texts { + vb.append_value(plan_text); + } } plan_list_builder.append(true); let list_plan_array: Arc = Arc::new(plan_list_builder.finish()); diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index b297a89cfb..94e010aeff 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -43,9 +43,11 @@ use crate::metrics::SchedulerMetricsCollector; use crate::state::execution_graph::TaskDescription; use ballista_core::error::{BallistaError, Result}; use ballista_core::event_loop::EventSender; +use ballista_core::extension::BallistaExplainNode; use ballista_core::serde::BallistaCodec; use ballista_core::serde::protobuf::TaskStatus; -use datafusion::logical_expr::LogicalPlan; +use datafusion::common::format::ExplainFormat; +use datafusion::logical_expr::{Explain as DFExplain, LogicalPlan}; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::empty::EmptyExec; use datafusion::prelude::SessionContext; @@ -106,6 +108,39 @@ pub fn encode_protobuf(msg: &T) -> Result> { Ok(value) } +/// If the root `LogicalPlan` is a `BallistaExplainNode` extension wrapper, +/// reconstruct an equivalent native `LogicalPlan::Explain` so the scheduler's +/// downstream pipeline (which only knows about native `Explain`) can handle +/// it as usual. Returns `None` if the plan is not the wrapper, in which case +/// the caller should use the original plan unchanged. +fn unwrap_ballista_explain(plan: &LogicalPlan) -> Option { + let LogicalPlan::Extension(ext) = plan else { + return None; + }; + let explain = ext.node.as_any().downcast_ref::()?; + let explain_format = + BallistaExplainNode::format_from_str(&explain.explain_format).unwrap_or_else( + || { + log::debug!( + "unwrap_ballista_explain: unknown explain_format {:?}, defaulting to Indent", + explain.explain_format + ); + ExplainFormat::Indent + }, + ); + Some(LogicalPlan::Explain(DFExplain { + verbose: explain.verbose, + explain_format, + plan: explain.plan.clone(), + // Repopulated by the physical planner; the wire format does not + // carry these and the optimization-success flag is meaningless + // before re-optimization. + stringified_plans: vec![], + schema: explain.schema.clone(), + logical_optimization_succeeded: false, + })) +} + /// Shared state for the Ballista scheduler. /// /// Contains managers for executors, tasks, and sessions. @@ -478,6 +513,14 @@ impl SchedulerState, ) -> Result<()> { let start = Instant::now(); + + // Unwrap any `BallistaExplainNode` extension that the client used to + // preserve `explain_format` through `datafusion-proto` serialization, + // restoring a native `LogicalPlan::Explain` so the rest of submit_job + // (and DataFusion's physical planner) can handle it normally. + let unwrapped_plan = unwrap_ballista_explain(plan); + let plan = unwrapped_plan.as_ref().unwrap_or(plan); + if log::max_level() >= log::Level::Debug { // optimizing the plan here is redundant because the physical planner will do this again // but it is helpful to see what the optimized plan will be @@ -486,6 +529,7 @@ impl SchedulerState> = None; + let mut explain_format: Option = None; plan.apply(&mut |plan: &LogicalPlan| { if let LogicalPlan::TableScan(scan) = plan { let provider = source_as_provider(&scan.source)?; @@ -523,6 +567,7 @@ impl SchedulerState SchedulerState| { if node.output_partitioning().partition_count() == 0 { let empty: Arc = @@ -585,7 +633,7 @@ impl SchedulerState = @@ -593,6 +641,7 @@ impl SchedulerState TaskManager /// Get the execution graph of of a job. First look in the active cache. /// If no one found, then in the Active/Completed jobs. - #[cfg(feature = "rest-api")] pub(crate) async fn get_job_execution_graph( &self, job_id: &str, From bc42c254d1be9fc4697cd53864f6309d3f9b1bac Mon Sep 17 00:00:00 2001 From: Sergei Grebnov Date: Thu, 23 Apr 2026 12:16:25 +0300 Subject: [PATCH 2/8] Support for Tree formatting + tests --- .../src/state/distributed_explain.rs | 7 +- .../scheduler/src/state/execution_stage.rs | 254 ++++++++++++++++++ ballista/scheduler/src/state/mod.rs | 4 +- ...__tests__resolved_stage_indent_format.snap | 10 + ...ge__tests__resolved_stage_tree_format.snap | 30 +++ ...tests__unresolved_stage_indent_format.snap | 11 + ...__tests__unresolved_stage_tree_format.snap | 31 +++ 7 files changed, 343 insertions(+), 4 deletions(-) create mode 100644 ballista/scheduler/src/state/snapshots/ballista_scheduler__state__execution_stage__tests__resolved_stage_indent_format.snap create mode 100644 ballista/scheduler/src/state/snapshots/ballista_scheduler__state__execution_stage__tests__resolved_stage_tree_format.snap create mode 100644 ballista/scheduler/src/state/snapshots/ballista_scheduler__state__execution_stage__tests__unresolved_stage_indent_format.snap create mode 100644 ballista/scheduler/src/state/snapshots/ballista_scheduler__state__execution_stage__tests__unresolved_stage_tree_format.snap diff --git a/ballista/scheduler/src/state/distributed_explain.rs b/ballista/scheduler/src/state/distributed_explain.rs index 27aa1d115a..60bdaeb4c2 100644 --- a/ballista/scheduler/src/state/distributed_explain.rs +++ b/ballista/scheduler/src/state/distributed_explain.rs @@ -45,6 +45,7 @@ pub(crate) async fn generate_distributed_explain_plan( job_id: &str, session_state: &SessionState, plan: Arc, + format: &ExplainFormat, ) -> Result { let session_config = Arc::new(session_state.config().clone()); @@ -56,7 +57,7 @@ pub(crate) async fn generate_distributed_explain_plan( let builder = ExecutionStageBuilder::new(session_config.clone()); let stages = builder.build(shuffle_stages)?; - Ok(render_stages(stages)) + Ok(render_stages(stages, format)) } pub(crate) fn extract_logical_and_physical_plans( @@ -226,13 +227,13 @@ pub(crate) fn construct_distributed_explain_exec( Ok(Arc::new(CoalescePartitionsExec::new(proj_final)) as Arc) } -fn render_stages(stages: HashMap) -> String { +fn render_stages(stages: HashMap, format: &ExplainFormat) -> String { let mut buf = String::new(); let mut keys: Vec<_> = stages.keys().cloned().collect(); keys.sort(); for k in keys { let stage = &stages[&k]; - writeln!(buf, "{:#?}", stage).ok(); + writeln!(buf, "{}", stage.format_with(format)).ok(); } buf } diff --git a/ballista/scheduler/src/state/execution_stage.rs b/ballista/scheduler/src/state/execution_stage.rs index d36fa55dd4..8c0b702919 100644 --- a/ballista/scheduler/src/state/execution_stage.rs +++ b/ballista/scheduler/src/state/execution_stage.rs @@ -21,6 +21,7 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; +use datafusion::common::format::ExplainFormat; use datafusion::config::ConfigOptions; use datafusion::physical_optimizer::aggregate_statistics::AggregateStatistics; //use datafusion::physical_optimizer::join_selection::JoinSelection; @@ -107,6 +108,17 @@ impl ExecutionStage { ExecutionStage::Failed(stage) => stage.plan.as_ref(), } } + + /// Format the stage with the given explain format (Tree vs Indent) + pub fn format_with(&self, format: &ExplainFormat) -> String { + match self { + ExecutionStage::UnResolved(s) => s.format_with(format), + ExecutionStage::Resolved(s) => s.format_with(format), + ExecutionStage::Running(s) => s.format_with(format), + ExecutionStage::Successful(s) => s.format_with(format), + ExecutionStage::Failed(s) => s.format_with(format), + } + } } /// For a stage whose input stages are not all completed, we say it's a unresolved stage @@ -426,6 +438,29 @@ impl Debug for UnresolvedStage { } } +impl UnresolvedStage { + /// Format the stage with the given explain format + pub fn format_with(&self, format: &ExplainFormat) -> String { + let plan = match format { + ExplainFormat::Tree => DisplayableExecutionPlan::new(self.plan.as_ref()) + .tree_render() + .to_string(), + _ => DisplayableExecutionPlan::new(self.plan.as_ref()) + .indent(false) + .to_string(), + }; + + format!( + "=========UnResolvedStage[stage_id={}.{}, children={}]=========\nInputs{:?}\n{}", + self.stage_id, + self.stage_attempt_num, + self.inputs.len(), + self.inputs, + plan + ) + } +} + impl ResolvedStage { /// Creates a new resolved stage ready for task scheduling. pub fn new( @@ -493,6 +528,25 @@ impl Debug for ResolvedStage { } } +impl ResolvedStage { + /// Format the stage with the given explain format + pub fn format_with(&self, format: &ExplainFormat) -> String { + let plan = match format { + ExplainFormat::Tree => DisplayableExecutionPlan::new(self.plan.as_ref()) + .tree_render() + .to_string(), + _ => DisplayableExecutionPlan::new(self.plan.as_ref()) + .indent(false) + .to_string(), + }; + + format!( + "=========ResolvedStage[stage_id={}.{}, partitions={}]=========\n{}", + self.stage_id, self.stage_attempt_num, self.partitions, plan + ) + } +} + impl RunningStage { /// Creates a new running stage with task tracking initialized. pub fn new( @@ -843,6 +897,31 @@ impl Debug for RunningStage { } } +impl RunningStage { + /// Format the stage with the given explain format + pub fn format_with(&self, format: &ExplainFormat) -> String { + let plan = match format { + ExplainFormat::Tree => DisplayableExecutionPlan::new(self.plan.as_ref()) + .tree_render() + .to_string(), + _ => DisplayableExecutionPlan::new(self.plan.as_ref()) + .indent(false) + .to_string(), + }; + + format!( + "=========RunningStage[stage_id={}.{}, partitions={}, successful_tasks={}, scheduled_tasks={}, available_tasks={}]=========\n{}", + self.stage_id, + self.stage_attempt_num, + self.partitions, + self.successful_tasks(), + self.scheduled_tasks(), + self.available_tasks(), + plan + ) + } +} + impl SuccessfulStage { /// Change to the running state and bump the stage attempt number pub fn to_running(&self) -> RunningStage { @@ -935,6 +1014,29 @@ impl Debug for SuccessfulStage { } } +impl SuccessfulStage { + /// Format the stage with the given explain format + pub fn format_with(&self, format: &ExplainFormat) -> String { + let plan = match format { + // Tree format doesn't include metrics (DisplayableBallistaExecutionPlan doesn't support tree) + ExplainFormat::Tree => DisplayableExecutionPlan::new(self.plan.as_ref()) + .tree_render() + .to_string(), + _ => DisplayableBallistaExecutionPlan::new( + self.plan.as_ref(), + &self.stage_metrics, + ) + .indent() + .to_string(), + }; + + format!( + "=========SuccessfulStage[stage_id={}.{}, partitions={}]=========\n{}", + self.stage_id, self.stage_attempt_num, self.partitions, plan + ) + } +} + impl FailedStage { /// Returns the number of successful tasks pub fn successful_tasks(&self) -> usize { @@ -983,6 +1085,32 @@ impl Debug for FailedStage { } } +impl FailedStage { + /// Format the stage with the given explain format + pub fn format_with(&self, format: &ExplainFormat) -> String { + let plan = match format { + ExplainFormat::Tree => DisplayableExecutionPlan::new(self.plan.as_ref()) + .tree_render() + .to_string(), + _ => DisplayableExecutionPlan::new(self.plan.as_ref()) + .indent(false) + .to_string(), + }; + + format!( + "=========FailedStage[stage_id={}.{}, partitions={}, successful_tasks={}, scheduled_tasks={}, available_tasks={}, error_message={}]=========\n{}", + self.stage_id, + self.stage_attempt_num, + self.partitions, + self.successful_tasks(), + self.scheduled_tasks(), + self.available_tasks(), + self.error_message, + plan + ) + } +} + /// Get the total number of partitions for a stage with plan. /// Only for shuffle writers, the input partition count and the output partition count /// will be different. Here, we should use the input partition count. @@ -1059,7 +1187,16 @@ impl StageOutput { mod tests { use super::*; use ballista_core::serde::protobuf::{SuccessfulTask, TaskStatus, task_status}; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::common::ScalarValue; + use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr}; + use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::empty::EmptyExec; + use datafusion::physical_plan::expressions::{Column, Literal}; + use datafusion::physical_plan::filter::FilterExec; + use datafusion::physical_plan::placeholder_row::PlaceholderRowExec; + use datafusion::physical_plan::projection::ProjectionExec; + use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::prelude::SessionConfig; use std::collections::HashMap; @@ -1172,4 +1309,121 @@ mod tests { // Should gracefully reject the update, not panic. assert!(!result); } + + /// Create a resolved stage with a multi-node plan for snapshot testing + /// Plan: PlaceholderRowExec -> FilterExec -> CoalesceBatchesExec -> SortExec -> ProjectionExec + fn make_resolved_stage_with_complex_plan() -> ResolvedStage { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("value", DataType::Float64, true), + ])); + + // Build: PlaceholderRowExec -> FilterExec -> CoalesceBatchesExec -> SortExec -> ProjectionExec + let placeholder = Arc::new(PlaceholderRowExec::new(schema.clone())); + + // Filter: true (always pass) + let filter_expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))); + let filter = Arc::new( + FilterExec::try_new(filter_expr, placeholder).expect("filter creation"), + ); + + // Coalesce batches + let coalesce = Arc::new(CoalesceBatchesExec::new(filter, 8192)); + + // Sort by id ASC + let sort_exprs = LexOrdering::new(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("id", 0)), + options: Default::default(), + }]) + .expect("non-empty sort expressions"); + let sort = Arc::new(SortExec::new(sort_exprs, coalesce)); + + // Project id and name + let projection_exprs: Vec<(Arc, String)> = vec![ + (Arc::new(Column::new("id", 0)), "id".to_string()), + (Arc::new(Column::new("name", 1)), "name".to_string()), + ]; + let projection = + Arc::new(ProjectionExec::try_new(projection_exprs, sort).expect("projection")); + + ResolvedStage::new( + 1, + 0, + projection, + vec![], + HashMap::new(), + HashSet::new(), + Arc::new(SessionConfig::default()), + ) + } + + /// Create an unresolved stage with a multi-node plan for snapshot testing + /// Plan: PlaceholderRowExec -> FilterExec -> CoalesceBatchesExec -> SortExec -> ProjectionExec + fn make_unresolved_stage_with_complex_plan() -> UnresolvedStage { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + Field::new("value", DataType::Float64, true), + ])); + + let placeholder = Arc::new(PlaceholderRowExec::new(schema.clone())); + + let filter_expr = Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))); + let filter = Arc::new( + FilterExec::try_new(filter_expr, placeholder).expect("filter creation"), + ); + + let coalesce = Arc::new(CoalesceBatchesExec::new(filter, 8192)); + + let sort_exprs = LexOrdering::new(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("id", 0)), + options: Default::default(), + }]) + .expect("non-empty sort expressions"); + let sort = Arc::new(SortExec::new(sort_exprs, coalesce)); + + let projection_exprs: Vec<(Arc, String)> = vec![ + (Arc::new(Column::new("id", 0)), "id".to_string()), + (Arc::new(Column::new("name", 1)), "name".to_string()), + ]; + let projection = + Arc::new(ProjectionExec::try_new(projection_exprs, sort).expect("projection")); + + UnresolvedStage::new( + 1, + projection, + vec![], + vec![], + Arc::new(SessionConfig::default()), + ) + } + + #[test] + fn test_resolved_stage_format_tree_snapshot() { + let stage = make_resolved_stage_with_complex_plan(); + let output = stage.format_with(&ExplainFormat::Tree); + insta::assert_snapshot!("resolved_stage_tree_format", output); + } + + #[test] + fn test_resolved_stage_format_indent_snapshot() { + let stage = make_resolved_stage_with_complex_plan(); + let output = stage.format_with(&ExplainFormat::Indent); + insta::assert_snapshot!("resolved_stage_indent_format", output); + } + + #[test] + fn test_unresolved_stage_format_tree_snapshot() { + let stage = make_unresolved_stage_with_complex_plan(); + let output = stage.format_with(&ExplainFormat::Tree); + insta::assert_snapshot!("unresolved_stage_tree_format", output); + } + + #[test] + fn test_unresolved_stage_format_indent_snapshot() { + let stage = make_unresolved_stage_with_complex_plan(); + let output = stage.format_with(&ExplainFormat::Indent); + insta::assert_snapshot!("unresolved_stage_indent_format", output); + } } diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 94e010aeff..504776e666 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -604,8 +604,10 @@ impl SchedulerState Date: Thu, 23 Apr 2026 12:20:19 +0300 Subject: [PATCH 3/8] Fortmatting --- .../src/state/distributed_explain.rs | 21 ++++++++++--------- .../scheduler/src/state/execution_stage.rs | 20 ++++++++++++------ ballista/scheduler/src/state/mod.rs | 9 ++++++-- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/ballista/scheduler/src/state/distributed_explain.rs b/ballista/scheduler/src/state/distributed_explain.rs index 60bdaeb4c2..5c94699fc3 100644 --- a/ballista/scheduler/src/state/distributed_explain.rs +++ b/ballista/scheduler/src/state/distributed_explain.rs @@ -77,9 +77,7 @@ pub(crate) fn extract_logical_and_physical_plans( .unwrap_or_else(|| "".to_string()); (logical_txt, physical_txt) } - ExplainFormat::Indent - | ExplainFormat::PostgresJSON - | ExplainFormat::Graphviz => { + ExplainFormat::Indent | ExplainFormat::PostgresJSON | ExplainFormat::Graphviz => { let logical_txt = plans .iter() .rev() @@ -144,12 +142,12 @@ pub(crate) fn construct_distributed_explain_exec( vec!["physical_plan", "distributed_plan"], vec![&physical_txt, &distributed_txt], ), - ExplainFormat::Indent - | ExplainFormat::PostgresJSON - | ExplainFormat::Graphviz => ( - vec!["logical_plan", "physical_plan", "distributed_plan"], - vec![&logical_txt, &physical_txt, &distributed_txt], - ), + ExplainFormat::Indent | ExplainFormat::PostgresJSON | ExplainFormat::Graphviz => { + ( + vec!["logical_plan", "physical_plan", "distributed_plan"], + vec![&logical_txt, &physical_txt, &distributed_txt], + ) + } }; // construct list_type from plan_types @@ -227,7 +225,10 @@ pub(crate) fn construct_distributed_explain_exec( Ok(Arc::new(CoalescePartitionsExec::new(proj_final)) as Arc) } -fn render_stages(stages: HashMap, format: &ExplainFormat) -> String { +fn render_stages( + stages: HashMap, + format: &ExplainFormat, +) -> String { let mut buf = String::new(); let mut keys: Vec<_> = stages.keys().cloned().collect(); keys.sort(); diff --git a/ballista/scheduler/src/state/execution_stage.rs b/ballista/scheduler/src/state/execution_stage.rs index 8c0b702919..79661293dd 100644 --- a/ballista/scheduler/src/state/execution_stage.rs +++ b/ballista/scheduler/src/state/execution_stage.rs @@ -1340,12 +1340,16 @@ mod tests { let sort = Arc::new(SortExec::new(sort_exprs, coalesce)); // Project id and name - let projection_exprs: Vec<(Arc, String)> = vec![ + let projection_exprs: Vec<( + Arc, + String, + )> = vec![ (Arc::new(Column::new("id", 0)), "id".to_string()), (Arc::new(Column::new("name", 1)), "name".to_string()), ]; - let projection = - Arc::new(ProjectionExec::try_new(projection_exprs, sort).expect("projection")); + let projection = Arc::new( + ProjectionExec::try_new(projection_exprs, sort).expect("projection"), + ); ResolvedStage::new( 1, @@ -1383,12 +1387,16 @@ mod tests { .expect("non-empty sort expressions"); let sort = Arc::new(SortExec::new(sort_exprs, coalesce)); - let projection_exprs: Vec<(Arc, String)> = vec![ + let projection_exprs: Vec<( + Arc, + String, + )> = vec![ (Arc::new(Column::new("id", 0)), "id".to_string()), (Arc::new(Column::new("name", 1)), "name".to_string()), ]; - let projection = - Arc::new(ProjectionExec::try_new(projection_exprs, sort).expect("projection")); + let projection = Arc::new( + ProjectionExec::try_new(projection_exprs, sort).expect("projection"), + ); UnresolvedStage::new( 1, diff --git a/ballista/scheduler/src/state/mod.rs b/ballista/scheduler/src/state/mod.rs index 504776e666..29e2dc683b 100644 --- a/ballista/scheduler/src/state/mod.rs +++ b/ballista/scheduler/src/state/mod.rs @@ -607,8 +607,13 @@ impl SchedulerState Date: Thu, 23 Apr 2026 12:21:57 +0300 Subject: [PATCH 4/8] More formatting --- ballista/client/tests/context_checks.rs | 4 +-- ballista/core/src/serde/mod.rs | 7 ++-- ballista/executor/src/execution_loop.rs | 6 ++-- .../scheduler/src/scheduler_server/grpc.rs | 35 +++++++++---------- 4 files changed, 25 insertions(+), 27 deletions(-) diff --git a/ballista/client/tests/context_checks.rs b/ballista/client/tests/context_checks.rs index e283e29707..afa8124fda 100644 --- a/ballista/client/tests/context_checks.rs +++ b/ballista/client/tests/context_checks.rs @@ -24,15 +24,15 @@ mod supported { standalone_context_with_state, }; use ballista_core::config::BallistaConfig; - use datafusion::arrow::array::StringArray; use datafusion::arrow::array::Array; + use datafusion::arrow::array::StringArray; use datafusion::arrow::record_batch::RecordBatch; use datafusion::physical_plan::collect; use datafusion::prelude::*; use datafusion::{assert_batches_eq, prelude::SessionContext}; use rstest::*; - use std::sync::Arc; use std::path::PathBuf; + use std::sync::Arc; #[rstest::fixture] fn test_data() -> String { diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs index 91e0861467..9213e41457 100644 --- a/ballista/core/src/serde/mod.rs +++ b/ballista/core/src/serde/mod.rs @@ -852,8 +852,7 @@ mod test { plan_message.try_encode(&mut buf).unwrap(); let decoded_message = LogicalPlanNode::try_decode(&buf).unwrap(); - let decoded = - decoded_message.try_into_logical_plan(&ctx, &codec).unwrap(); + let decoded = decoded_message.try_into_logical_plan(&ctx, &codec).unwrap(); let LogicalPlan::Extension(ext) = &decoded else { panic!("expected Extension, got {decoded:?}"); @@ -889,8 +888,8 @@ mod test { ExplainFormat::Graphviz, ] { let s = BallistaExplainNode::format_as_str(&f); - let parsed = BallistaExplainNode::format_from_str(s) - .expect("round-trip must succeed"); + let parsed = + BallistaExplainNode::format_from_str(s).expect("round-trip must succeed"); assert_eq!(parsed, f); } assert!(BallistaExplainNode::format_from_str("bogus").is_none()); diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 3033ac7711..f028740de1 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -520,9 +520,9 @@ mod tests { CreateUpdateSessionParams, CreateUpdateSessionResult, ExecuteQueryParams, ExecuteQueryResult, ExecutorRegistration, ExecutorStoppedParams, ExecutorStoppedResult, GetCatalogParams, GetCatalogResult, GetJobMetricsParams, - GetJobMetricsResult, GetJobStatusParams, - GetJobStatusResult, GetRemoteFunctionsParams, GetRemoteFunctionsResult, - HeartBeatParams, HeartBeatResult, RegisterExecutorParams, RegisterExecutorResult, + GetJobMetricsResult, GetJobStatusParams, GetJobStatusResult, + GetRemoteFunctionsParams, GetRemoteFunctionsResult, HeartBeatParams, + HeartBeatResult, RegisterExecutorParams, RegisterExecutorResult, RemoveSessionParams, RemoveSessionResult, UpdateTaskStatusParams, UpdateTaskStatusResult, }; diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index 540269cdbb..9ebdd47a3a 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -27,12 +27,12 @@ use ballista_core::serde::protobuf::{ ExecuteQueryFailureResult, ExecuteQueryParams, ExecuteQueryResult, ExecuteQuerySuccessResult, ExecutorHeartbeat, ExecutorStoppedParams, ExecutorStoppedResult, GetCatalogParams, GetCatalogResult, GetJobMetricsParams, - GetJobMetricsResult, GetJobStatusParams, - GetJobStatusResult, GetRemoteFunctionsParams, GetRemoteFunctionsResult, - HeartBeatParams, HeartBeatResult, JobStatus, KeyValuePair, PollWorkParams, - PollWorkResult, RegisterExecutorParams, RegisterExecutorResult, RemoveSessionParams, - RemoveSessionResult, UpdateTaskStatusParams, UpdateTaskStatusResult, - execute_query_failure_result, execute_query_result, + GetJobMetricsResult, GetJobStatusParams, GetJobStatusResult, + GetRemoteFunctionsParams, GetRemoteFunctionsResult, HeartBeatParams, HeartBeatResult, + JobStatus, KeyValuePair, PollWorkParams, PollWorkResult, RegisterExecutorParams, + RegisterExecutorResult, RemoveSessionParams, RemoveSessionResult, + UpdateTaskStatusParams, UpdateTaskStatusResult, execute_query_failure_result, + execute_query_result, }; use ballista_core::serde::scheduler::ExecutorMetadata; use datafusion_proto::logical_plan::AsLogicalPlan; @@ -783,18 +783,17 @@ fn serialize_stage_metrics( }; let metrics = if plan.metrics().is_some() { - let metrics: ballista_core::serde::protobuf::OperatorMetricsSet = - raw_metrics - .get(metric_index) - .ok_or_else(|| { - BallistaError::Internal(format!( - "Missing metrics for operator {} at depth {}", - plan.name(), - depth - )) - })? - .clone() - .try_into()?; + let metrics: ballista_core::serde::protobuf::OperatorMetricsSet = raw_metrics + .get(metric_index) + .ok_or_else(|| { + BallistaError::Internal(format!( + "Missing metrics for operator {} at depth {}", + plan.name(), + depth + )) + })? + .clone() + .try_into()?; metric_index += 1; metrics.metrics } else { From 538e10af53ee0a048ae941a1e38dced2f97fa582 Mon Sep 17 00:00:00 2001 From: Sergei Grebnov Date: Thu, 23 Apr 2026 12:33:31 +0300 Subject: [PATCH 5/8] Add insta snapshots for FORMAT TREE integration tests --- Cargo.lock | 1 + ballista/client/Cargo.toml | 1 + ballista/client/tests/context_checks.rs | 15 ++-- ..._explain_format_tree_distributed_plan.snap | 73 +++++++++++++++++++ ...ed__explain_format_tree_physical_plan.snap | 61 ++++++++++++++++ 5 files changed, 141 insertions(+), 10 deletions(-) create mode 100644 ballista/client/tests/snapshots/context_checks__supported__explain_format_tree_distributed_plan.snap create mode 100644 ballista/client/tests/snapshots/context_checks__supported__explain_format_tree_physical_plan.snap diff --git a/Cargo.lock b/Cargo.lock index 40ad6d4a0f..f3f94f9dc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -974,6 +974,7 @@ dependencies = [ "datafusion", "datafusion-proto", "env_logger", + "insta", "log", "object_store", "rstest", diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml index 5d6bd30880..6bd1b8238c 100644 --- a/ballista/client/Cargo.toml +++ b/ballista/client/Cargo.toml @@ -46,6 +46,7 @@ ballista-scheduler = { path = "../scheduler", version = "52.0.0" } ctor = { workspace = true } datafusion-proto = { workspace = true } env_logger = { workspace = true } +insta = { workspace = true } rstest = { workspace = true } tempfile = { workspace = true } tonic = { workspace = true } diff --git a/ballista/client/tests/context_checks.rs b/ballista/client/tests/context_checks.rs index afa8124fda..bbcee7459f 100644 --- a/ballista/client/tests/context_checks.rs +++ b/ballista/client/tests/context_checks.rs @@ -1150,17 +1150,12 @@ mod supported { .unwrap(); let physical_plan_txt = plan_arr.value(0); - // Tree format uses box drawing characters like ┌, ─, ┐, │, └, ┘. - assert!( - physical_plan_txt.contains('┌') || physical_plan_txt.contains('│'), - "Expected tree format with box characters in physical_plan, got: {physical_plan_txt}" - ); - let distributed_plan_txt = plan_arr.value(1); - assert!( - !distributed_plan_txt.is_empty(), - "Expected non-empty distributed_plan" - ); + + // Snapshot the tree-rendered plans. Both standalone and remote cases + // should produce identical output, validating the codec round-trip. + insta::assert_snapshot!("explain_format_tree_physical_plan", physical_plan_txt); + insta::assert_snapshot!("explain_format_tree_distributed_plan", distributed_plan_txt); } #[rstest] diff --git a/ballista/client/tests/snapshots/context_checks__supported__explain_format_tree_distributed_plan.snap b/ballista/client/tests/snapshots/context_checks__supported__explain_format_tree_distributed_plan.snap new file mode 100644 index 0000000000..5d47aa9358 --- /dev/null +++ b/ballista/client/tests/snapshots/context_checks__supported__explain_format_tree_distributed_plan.snap @@ -0,0 +1,73 @@ +--- +source: ballista/client/tests/context_checks.rs +expression: distributed_plan_txt +--- +=========ResolvedStage[stage_id=1.0, partitions=1]========= +┌───────────────────────────┐ +│ ShuffleWriterExec │ +│ -------------------- │ +│ partitioning: │ +│ Hash([id@0], 16) │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ AggregateExec │ +│ -------------------- │ +│ aggr: count(1) │ +│ group_by: id │ +│ mode: Partial │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ ProjectionExec │ +│ -------------------- │ +│ id: │ +│ __unnest_placeholder │ +│ (make_array(Int64(1 │ +│ ),Int64(2),Int64(3),Int64 │ +│ (4),Int64(5)),depth=1) │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ UnnestExec │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ ProjectionExec │ +│ -------------------- │ +│ __unnest_placeholder │ +│ (make_array(Int64(1 │ +│ ),Int64(2),Int64(3),Int64 │ +│ (4),Int64(5))): │ +│ [1, 2, 3, 4, 5] │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ PlaceholderRowExec │ +└───────────────────────────┘ + +=========UnResolvedStage[stage_id=2.0, children=1]========= +Inputs{1: StageOutput { partition_locations: {}, complete: false }} +┌───────────────────────────┐ +│ ShuffleWriterExec │ +│ -------------------- │ +│ partitioning: None │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ ProjectionExec │ +│ -------------------- │ +│ count(*): │ +│ count(Int64(1)) │ +│ │ +│ id: id │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ AggregateExec │ +│ -------------------- │ +│ aggr: count(1) │ +│ group_by: id │ +│ │ +│ mode: │ +│ FinalPartitioned │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ UnresolvedShuffleExec │ +│ -------------------- │ +│ partitioning: │ +│ Hash([id@0], 16) │ +└───────────────────────────┘ diff --git a/ballista/client/tests/snapshots/context_checks__supported__explain_format_tree_physical_plan.snap b/ballista/client/tests/snapshots/context_checks__supported__explain_format_tree_physical_plan.snap new file mode 100644 index 0000000000..30b81bbd77 --- /dev/null +++ b/ballista/client/tests/snapshots/context_checks__supported__explain_format_tree_physical_plan.snap @@ -0,0 +1,61 @@ +--- +source: ballista/client/tests/context_checks.rs +expression: physical_plan_txt +--- +┌───────────────────────────┐ +│ ProjectionExec │ +│ -------------------- │ +│ count(*): │ +│ count(Int64(1)) │ +│ │ +│ id: id │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ AggregateExec │ +│ -------------------- │ +│ aggr: count(1) │ +│ group_by: id │ +│ │ +│ mode: │ +│ FinalPartitioned │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ RepartitionExec │ +│ -------------------- │ +│ partition_count(in->out): │ +│ 1 -> 16 │ +│ │ +│ partitioning_scheme: │ +│ Hash([id@0], 16) │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ AggregateExec │ +│ -------------------- │ +│ aggr: count(1) │ +│ group_by: id │ +│ mode: Partial │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ ProjectionExec │ +│ -------------------- │ +│ id: │ +│ __unnest_placeholder │ +│ (make_array(Int64(1 │ +│ ),Int64(2),Int64(3),Int64 │ +│ (4),Int64(5)),depth=1) │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ UnnestExec │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ ProjectionExec │ +│ -------------------- │ +│ __unnest_placeholder │ +│ (make_array(Int64(1 │ +│ ),Int64(2),Int64(3),Int64 │ +│ (4),Int64(5))): │ +│ [1, 2, 3, 4, 5] │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ PlaceholderRowExec │ +└───────────────────────────┘ From bf42918173c6321c1ca0cc5d2d828a4919d56c5e Mon Sep 17 00:00:00 2001 From: Sergei Grebnov Date: Thu, 23 Apr 2026 12:34:26 +0300 Subject: [PATCH 6/8] Lint --- .../core/src/execution_plans/distributed_explain_analyze.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/core/src/execution_plans/distributed_explain_analyze.rs b/ballista/core/src/execution_plans/distributed_explain_analyze.rs index 53d8c13248..9b43c2779f 100644 --- a/ballista/core/src/execution_plans/distributed_explain_analyze.rs +++ b/ballista/core/src/execution_plans/distributed_explain_analyze.rs @@ -17,7 +17,7 @@ //! `EXPLAIN ANALYZE` for distributed Ballista jobs. //! -//! On the client side, [`BallistaQueryPlanner`] strips `LogicalPlan::Analyze` +//! On the client side, `BallistaQueryPlanner` strips `LogicalPlan::Analyze` //! and runs the inner plan as a regular distributed job wrapped in //! [`DistributedExplainAnalyzeExec`]. After the child stream drains (i.e. the //! distributed job has succeeded), this exec calls the new From d4458ec6f251b8a81490152c298453a320d65739 Mon Sep 17 00:00:00 2001 From: Sergei Grebnov Date: Thu, 23 Apr 2026 12:35:38 +0300 Subject: [PATCH 7/8] Improve --- .../src/execution_plans/distributed_explain_analyze.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ballista/core/src/execution_plans/distributed_explain_analyze.rs b/ballista/core/src/execution_plans/distributed_explain_analyze.rs index 9b43c2779f..f9894bc7b9 100644 --- a/ballista/core/src/execution_plans/distributed_explain_analyze.rs +++ b/ballista/core/src/execution_plans/distributed_explain_analyze.rs @@ -163,7 +163,12 @@ impl ExecutionPlan for DistributedExplainAnalyzeExec partition: usize, ctx: Arc, ) -> Result { - assert_eq!(0, partition); + if partition != 0 { + return internal_err!( + "DistributedExplainAnalyzeExec only supports partition 0, got {}", + partition + ); + } let child = Arc::clone(&self.child); let scheduler_url = self.scheduler_url.clone(); From a7d549ed35fc87ea70292f83c838a127c0c6f2fe Mon Sep 17 00:00:00 2001 From: Sergei Grebnov Date: Thu, 23 Apr 2026 12:40:00 +0300 Subject: [PATCH 8/8] Lint --- ballista/client/tests/context_checks.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ballista/client/tests/context_checks.rs b/ballista/client/tests/context_checks.rs index bbcee7459f..6c7a12d609 100644 --- a/ballista/client/tests/context_checks.rs +++ b/ballista/client/tests/context_checks.rs @@ -1155,7 +1155,10 @@ mod supported { // Snapshot the tree-rendered plans. Both standalone and remote cases // should produce identical output, validating the codec round-trip. insta::assert_snapshot!("explain_format_tree_physical_plan", physical_plan_txt); - insta::assert_snapshot!("explain_format_tree_distributed_plan", distributed_plan_txt); + insta::assert_snapshot!( + "explain_format_tree_distributed_plan", + distributed_plan_txt + ); } #[rstest]