diff --git a/Cargo.lock b/Cargo.lock index c798024f4c..40ad6d4a0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1145,6 +1145,7 @@ dependencies = [ "rand 0.9.2", "rstest", "serde", + "tempfile", "tokio", "tokio-stream", "tonic", diff --git a/ballista/executor/src/execution_engine.rs b/ballista/executor/src/execution_engine.rs index 486723eef3..f3d7d8ea8e 100644 --- a/ballista/executor/src/execution_engine.rs +++ b/ballista/executor/src/execution_engine.rs @@ -27,10 +27,13 @@ use ballista_core::execution_plans::sort_shuffle::SortShuffleWriterExec; use ballista_core::serde::protobuf::ShuffleWritePartition; use ballista_core::utils; use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::config::ConfigOptions; use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource}; use datafusion::datasource::source::DataSourceExec; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::TaskContext; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_optimizer::filter_pushdown::FilterPushdown; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_plan::metrics::MetricsSet; use std::fmt::{Debug, Display}; @@ -53,6 +56,7 @@ pub trait ExecutionEngine: Sync + Send { stage_id: usize, plan: Arc, work_dir: &str, + config: &ConfigOptions, ) -> Result>; } @@ -135,7 +139,14 @@ impl ExecutionEngine for DefaultExecutionEngine { stage_id: usize, plan: Arc, work_dir: &str, + config: &ConfigOptions, ) -> Result> { + // Re-run FilterPushdown(Post) to re-establish dynamic filter links + // (e.g., TopK → DataSourceExec) that are lost during protobuf + // serialization/deserialization between scheduler and executor. + let filter_pushdown = FilterPushdown::new_post_optimization(); + let plan = filter_pushdown.optimize(plan, config)?; + // Fix ParquetSource metadata_size_hint lost during serialization let plan = fix_parquet_metadata_size_hint(plan)?; diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 306596ccb5..4a0437591f 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -418,6 +418,7 @@ async fn run_received_task ExecutorServer( ) { - let shuffle_writer = create_shuffle_writer_with_config( - job_id, - self.next_stage_id(), - children[0].clone(), - None, - config, - )?; - let unresolved_shuffle = create_unresolved_shuffle(shuffle_writer.as_ref()); - stages.push(shuffle_writer); - Ok(( - with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?, - stages, - )) + // For TopK queries (SortPreservingMergeExec with a small fetch/limit), + // skip the stage break and keep the merge in the same stage as its children. + // This avoids the overhead of shuffle write/read for a small number of rows, + // which dominates execution time for TopK queries in distributed mode. + // + // Note on parallelism: because SortPreservingMergeExec has an output + // partitioning of 1, the entire stage becomes a single task assigned to + // one executor (ShuffleWriterExec::input_partition_count() == 1). + // This does sacrifice cluster-level parallelism (no cross-executor + // distribution). However, within that executor the child partitions + // still execute as parallel async streams, so intra-executor parallelism + // is preserved. For small fetch values this trade-off is worthwhile as + // the shuffle coordination overhead far exceeds the merge cost. + const TOPK_FETCH_THRESHOLD: usize = 1000; + if sort_preserving_merge + .fetch() + .is_some_and(|f| f <= TOPK_FETCH_THRESHOLD) + { + Ok(( + with_new_children_if_necessary(execution_plan, children)?, + stages, + )) + } else { + let shuffle_writer = create_shuffle_writer_with_config( + job_id, + self.next_stage_id(), + children[0].clone(), + None, + config, + )?; + let unresolved_shuffle = + create_unresolved_shuffle(shuffle_writer.as_ref()); + stages.push(shuffle_writer); + Ok(( + with_new_children_if_necessary( + execution_plan, + vec![unresolved_shuffle], + )?, + stages, + )) + } } else if let Some(repart) = execution_plan.as_any().downcast_ref::() { @@ -940,6 +968,91 @@ order by Ok(result_exec_plan) } + /// Verifies that TopK queries (ORDER BY ... LIMIT N, where N is small) + /// do NOT create a stage break at SortPreservingMergeExec, avoiding + /// shuffle overhead for small result sets. + #[tokio::test] + async fn test_topk_avoids_stage_break() -> Result<(), BallistaError> { + use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext}; + use std::io::Write; + + let tmp_dir = tempfile::tempdir().unwrap(); + let schema = "id,value\n"; + for i in 0..4 { + let path = tmp_dir.path().join(format!("part{i:02}.csv")); + let mut f = std::fs::File::create(&path).unwrap(); + write!(f, "{schema}").unwrap(); + for j in 0..10 { + writeln!(f, "{},{}", i * 10 + j, (i * 10 + j) * 100).unwrap(); + } + } + + let config = SessionConfig::new().with_target_partitions(4); + let ctx = SessionContext::new_with_config(config); + ctx.register_csv( + "test_table", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new(), + ) + .await?; + + // TopK query with small LIMIT — should produce a single stage + let df = ctx + .sql("SELECT id, value FROM test_table ORDER BY value DESC LIMIT 10") + .await?; + let plan = df.into_optimized_plan()?; + let plan = ctx.state().create_physical_plan(&plan).await?; + + let mut planner = DefaultDistributedPlanner::new(); + let stages = planner.plan_query_stages( + "job-topk", + plan, + ctx.state().config().options(), + )?; + + for (i, stage) in stages.iter().enumerate() { + println!( + "TopK Stage {i}:\n{}", + displayable(stage.as_ref()).indent(false) + ); + } + + // Should be a single stage (no shuffle for TopK with small limit) + assert_eq!( + 1, + stages.len(), + "TopK with small LIMIT should produce 1 stage, got {}", + stages.len() + ); + + // The single stage should contain SortPreservingMergeExec + let root = stages[0].children()[0].clone(); + let _merge = downcast_exec!(root, SortPreservingMergeExec); + + // Without LIMIT, the same query should produce 2 stages (with shuffle) + let df_no_limit = ctx + .sql("SELECT id, value FROM test_table ORDER BY value DESC") + .await?; + let plan_no_limit = df_no_limit.into_optimized_plan()?; + let plan_no_limit = ctx.state().create_physical_plan(&plan_no_limit).await?; + + let mut planner2 = DefaultDistributedPlanner::new(); + let stages_no_limit = planner2.plan_query_stages( + "job-no-limit", + plan_no_limit, + ctx.state().config().options(), + )?; + + assert_eq!( + 2, + stages_no_limit.len(), + "ORDER BY without LIMIT should produce 2 stages, got {}", + stages_no_limit.len() + ); + + Ok(()) + } + fn memory_exec( schema: Arc, partition_count: usize,