From 61fe09456d4d5927e1a85bd9626f194dfa318f70 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Mon, 20 Apr 2026 14:49:13 +1000 Subject: [PATCH 1/5] fix: Combine small fetch TopK into single stage --- Cargo.lock | 1 + .../src/execution_plans/shuffle_writer.rs | 2 - ballista/scheduler/Cargo.toml | 1 + ballista/scheduler/src/planner.rs | 134 ++++++++++++++++-- 4 files changed, 122 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c798024f4c..40ad6d4a0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1145,6 +1145,7 @@ dependencies = [ "rand 0.9.2", "rstest", "serde", + "tempfile", "tokio", "tokio-stream", "tonic", diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index a4ee4c3927..d98a27629a 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -406,7 +406,6 @@ impl ShuffleWriterExec { ) -> Result> { match output_partitioning { None => { - let timer = write_metrics.write_time.timer(); path.push(format!("{input_partition}")); std::fs::create_dir_all(&path)?; path.push(format!("data.{file_ext}")); @@ -429,7 +428,6 @@ impl ShuffleWriterExec { write_metrics .output_rows .add(stats.num_rows.unwrap_or(0) as usize); - timer.done(); info!( "Executed partition {} in {} seconds. Statistics: {}", diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index a79e6890ff..96406f84d8 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -80,3 +80,4 @@ tonic-prost-build = { workspace = true } [dev-dependencies] rstest = { workspace = true } +tempfile = { workspace = true } diff --git a/ballista/scheduler/src/planner.rs b/ballista/scheduler/src/planner.rs index d00f0bb679..6e9580b228 100644 --- a/ballista/scheduler/src/planner.rs +++ b/ballista/scheduler/src/planner.rs @@ -160,23 +160,44 @@ impl DefaultDistributedPlanner { with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?, stages, )) - } else if let Some(_sort_preserving_merge) = execution_plan + } else if let Some(sort_preserving_merge) = execution_plan .as_any() .downcast_ref::( ) { - let shuffle_writer = create_shuffle_writer_with_config( - job_id, - self.next_stage_id(), - children[0].clone(), - None, - config, - )?; - let unresolved_shuffle = create_unresolved_shuffle(shuffle_writer.as_ref()); - stages.push(shuffle_writer); - Ok(( - with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?, - stages, - )) + // For TopK queries (SortPreservingMergeExec with a small fetch/limit), + // skip the stage break and keep the merge in the same stage as its children. + // This avoids the overhead of shuffle write/read for a small number of rows, + // which dominates execution time for TopK queries in distributed mode. + // The merge will run as a single task on one executor with all partitions + // executing as parallel threads within that executor. + const TOPK_FETCH_THRESHOLD: usize = 1000; + if sort_preserving_merge + .fetch() + .is_some_and(|f| f <= TOPK_FETCH_THRESHOLD) + { + Ok(( + with_new_children_if_necessary(execution_plan, children)?, + stages, + )) + } else { + let shuffle_writer = create_shuffle_writer_with_config( + job_id, + self.next_stage_id(), + children[0].clone(), + None, + config, + )?; + let unresolved_shuffle = + create_unresolved_shuffle(shuffle_writer.as_ref()); + stages.push(shuffle_writer); + Ok(( + with_new_children_if_necessary( + execution_plan, + vec![unresolved_shuffle], + )?, + stages, + )) + } } else if let Some(repart) = execution_plan.as_any().downcast_ref::() { @@ -860,4 +881,89 @@ order by (proto).try_into_physical_plan(ctx, codec.physical_extension_codec())?; Ok(result_exec_plan) } + + /// Verifies that TopK queries (ORDER BY ... LIMIT N, where N is small) + /// do NOT create a stage break at SortPreservingMergeExec, avoiding + /// shuffle overhead for small result sets. + #[tokio::test] + async fn test_topk_avoids_stage_break() -> Result<(), BallistaError> { + use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext}; + use std::io::Write; + + let tmp_dir = tempfile::tempdir().unwrap(); + let schema = "id,value\n"; + for i in 0..4 { + let path = tmp_dir.path().join(format!("part{i:02}.csv")); + let mut f = std::fs::File::create(&path).unwrap(); + write!(f, "{schema}").unwrap(); + for j in 0..10 { + writeln!(f, "{},{}", i * 10 + j, (i * 10 + j) * 100).unwrap(); + } + } + + let config = SessionConfig::new().with_target_partitions(4); + let ctx = SessionContext::new_with_config(config); + ctx.register_csv( + "test_table", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new(), + ) + .await?; + + // TopK query with small LIMIT — should produce a single stage + let df = ctx + .sql("SELECT id, value FROM test_table ORDER BY value DESC LIMIT 10") + .await?; + let plan = df.into_optimized_plan()?; + let plan = ctx.state().create_physical_plan(&plan).await?; + + let mut planner = DefaultDistributedPlanner::new(); + let stages = planner.plan_query_stages( + "job-topk", + plan, + ctx.state().config().options(), + )?; + + for (i, stage) in stages.iter().enumerate() { + println!( + "TopK Stage {i}:\n{}", + displayable(stage.as_ref()).indent(false) + ); + } + + // Should be a single stage (no shuffle for TopK with small limit) + assert_eq!( + 1, + stages.len(), + "TopK with small LIMIT should produce 1 stage, got {}", + stages.len() + ); + + // The single stage should contain SortPreservingMergeExec + let root = stages[0].children()[0].clone(); + let _merge = downcast_exec!(root, SortPreservingMergeExec); + + // Without LIMIT, the same query should produce 2 stages (with shuffle) + let df_no_limit = ctx + .sql("SELECT id, value FROM test_table ORDER BY value DESC") + .await?; + let plan_no_limit = df_no_limit.into_optimized_plan()?; + let plan_no_limit = ctx.state().create_physical_plan(&plan_no_limit).await?; + + let mut planner2 = DefaultDistributedPlanner::new(); + let stages_no_limit = planner2.plan_query_stages( + "job-no-limit", + plan_no_limit, + ctx.state().config().options(), + )?; + + assert_eq!( + 2, + stages_no_limit.len(), + "ORDER BY without LIMIT should produce 2 stages, got {}", + stages_no_limit.len() + ); + + Ok(()) + } } From 20078c8d2ba2809c63688dba947eefef355665a4 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Mon, 20 Apr 2026 15:33:27 +1000 Subject: [PATCH 2/5] fix: Persist TopK dynamic filters onto executors --- ballista/executor/src/execution_engine.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/ballista/executor/src/execution_engine.rs b/ballista/executor/src/execution_engine.rs index 7beaede7ca..3f932f6bd0 100644 --- a/ballista/executor/src/execution_engine.rs +++ b/ballista/executor/src/execution_engine.rs @@ -26,8 +26,11 @@ use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::execution_plans::sort_shuffle::SortShuffleWriterExec; use ballista_core::serde::protobuf::ShuffleWritePartition; use ballista_core::utils; +use datafusion::config::ConfigOptions; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::TaskContext; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_optimizer::filter_pushdown::FilterPushdown; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::metrics::MetricsSet; use std::fmt::{Debug, Display}; @@ -95,6 +98,13 @@ impl ExecutionEngine for DefaultExecutionEngine { plan: Arc, work_dir: &str, ) -> Result> { + // Re-run FilterPushdown(Post) to re-establish dynamic filter links + // (e.g., TopK → DataSourceExec) that are lost during protobuf + // serialization/deserialization between scheduler and executor. + let filter_pushdown = FilterPushdown::new_post_optimization(); + let config = ConfigOptions::default(); + let plan = filter_pushdown.optimize(plan, &config)?; + // the query plan created by the scheduler always starts with a shuffle writer // (either ShuffleWriterExec or SortShuffleWriterExec) if let Some(shuffle_writer) = plan.as_any().downcast_ref::() { From 4ea98b8c5cae6db31d4ce79dae1353c2f3d26de6 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 22 Apr 2026 09:52:14 +1000 Subject: [PATCH 3/5] review: address comments --- ballista/scheduler/src/planner.rs | 11 +++++++++-- ballista/scheduler/src/state/aqe/mod.rs | 4 ++-- ballista/scheduler/src/state/execution_graph.rs | 4 ++-- ballista/scheduler/src/state/executor_manager.rs | 4 ++-- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/ballista/scheduler/src/planner.rs b/ballista/scheduler/src/planner.rs index 6e9580b228..c34904941e 100644 --- a/ballista/scheduler/src/planner.rs +++ b/ballista/scheduler/src/planner.rs @@ -168,8 +168,15 @@ impl DefaultDistributedPlanner { // skip the stage break and keep the merge in the same stage as its children. // This avoids the overhead of shuffle write/read for a small number of rows, // which dominates execution time for TopK queries in distributed mode. - // The merge will run as a single task on one executor with all partitions - // executing as parallel threads within that executor. + // + // Note on parallelism: because SortPreservingMergeExec has an output + // partitioning of 1, the entire stage becomes a single task assigned to + // one executor (ShuffleWriterExec::input_partition_count() == 1). + // This does sacrifice cluster-level parallelism (no cross-executor + // distribution). However, within that executor the child partitions + // still execute as parallel async streams, so intra-executor parallelism + // is preserved. For small fetch values this trade-off is worthwhile as + // the shuffle coordination overhead far exceeds the merge cost. const TOPK_FETCH_THRESHOLD: usize = 1000; if sort_preserving_merge .fetch() diff --git a/ballista/scheduler/src/state/aqe/mod.rs b/ballista/scheduler/src/state/aqe/mod.rs index 8200b2a519..2436729021 100644 --- a/ballista/scheduler/src/state/aqe/mod.rs +++ b/ballista/scheduler/src/state/aqe/mod.rs @@ -920,8 +920,8 @@ impl ExecutionGraph for AdaptiveExecutionGraph { /// Return all currently running tasks along with the executor ID on which they are assigned fn running_tasks(&self) -> Vec { self.stages - .iter() - .flat_map(|(_, stage)| { + .values() + .flat_map(|stage| { if let ExecutionStage::Running(stage) = stage { stage .running_tasks() diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index 37ba3f828e..9e5b3e00d5 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -1320,8 +1320,8 @@ impl ExecutionGraph for StaticExecutionGraph { /// Return all currently running tasks along with the executor ID on which they are assigned fn running_tasks(&self) -> Vec { self.stages - .iter() - .flat_map(|(_, stage)| { + .values() + .flat_map(|stage| { if let ExecutionStage::Running(stage) = stage { stage .running_tasks() diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs index 9d47310b80..bdacc2d4e2 100644 --- a/ballista/scheduler/src/state/executor_manager.rs +++ b/ballista/scheduler/src/state/executor_manager.rs @@ -439,8 +439,8 @@ impl ExecutorManager { self.cluster_state .executor_heartbeats() - .iter() - .filter_map(|(_exec, heartbeat)| { + .values() + .filter_map(|heartbeat| { let terminating = matches!( heartbeat .status From 1f505e531acd0f0f3edc2a70357aae42d01d8724 Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 22 Apr 2026 13:25:56 +1000 Subject: [PATCH 4/5] revert: Add back shuffle writer timer change --- ballista/core/src/execution_plans/shuffle_writer.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index d98a27629a..a4ee4c3927 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -406,6 +406,7 @@ impl ShuffleWriterExec { ) -> Result> { match output_partitioning { None => { + let timer = write_metrics.write_time.timer(); path.push(format!("{input_partition}")); std::fs::create_dir_all(&path)?; path.push(format!("data.{file_ext}")); @@ -428,6 +429,7 @@ impl ShuffleWriterExec { write_metrics .output_rows .add(stats.num_rows.unwrap_or(0) as usize); + timer.done(); info!( "Executed partition {} in {} seconds. Statistics: {}", From 42f17f88fc01c40848f09c79b9ee3ea81c9f367c Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Wed, 22 Apr 2026 13:36:35 +1000 Subject: [PATCH 5/5] review: Address comment --- ballista/executor/src/execution_engine.rs | 5 +++-- ballista/executor/src/execution_loop.rs | 1 + ballista/executor/src/executor_server.rs | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/ballista/executor/src/execution_engine.rs b/ballista/executor/src/execution_engine.rs index 2327d4734e..f3d7d8ea8e 100644 --- a/ballista/executor/src/execution_engine.rs +++ b/ballista/executor/src/execution_engine.rs @@ -56,6 +56,7 @@ pub trait ExecutionEngine: Sync + Send { stage_id: usize, plan: Arc, work_dir: &str, + config: &ConfigOptions, ) -> Result>; } @@ -138,13 +139,13 @@ impl ExecutionEngine for DefaultExecutionEngine { stage_id: usize, plan: Arc, work_dir: &str, + config: &ConfigOptions, ) -> Result> { // Re-run FilterPushdown(Post) to re-establish dynamic filter links // (e.g., TopK → DataSourceExec) that are lost during protobuf // serialization/deserialization between scheduler and executor. let filter_pushdown = FilterPushdown::new_post_optimization(); - let config = ConfigOptions::default(); - let plan = filter_pushdown.optimize(plan, &config)?; + let plan = filter_pushdown.optimize(plan, config)?; // Fix ParquetSource metadata_size_hint lost during serialization let plan = fix_parquet_metadata_size_hint(plan)?; diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 306596ccb5..4a0437591f 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -418,6 +418,7 @@ async fn run_received_task ExecutorServer