Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions ballista/executor/src/execution_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ use ballista_core::execution_plans::sort_shuffle::SortShuffleWriterExec;
use ballista_core::serde::protobuf::ShuffleWritePartition;
use ballista_core::utils;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::config::ConfigOptions;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::MetricsSet;
use std::fmt::{Debug, Display};
Expand All @@ -53,6 +56,7 @@ pub trait ExecutionEngine: Sync + Send {
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
work_dir: &str,
config: &ConfigOptions,
) -> Result<Arc<dyn QueryStageExecutor>>;
}

Expand Down Expand Up @@ -135,7 +139,14 @@ impl ExecutionEngine for DefaultExecutionEngine {
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
work_dir: &str,
config: &ConfigOptions,
) -> Result<Arc<dyn QueryStageExecutor>> {
// Re-run FilterPushdown(Post) to re-establish dynamic filter links
// (e.g., TopK → DataSourceExec) that are lost during protobuf
// serialization/deserialization between scheduler and executor.
let filter_pushdown = FilterPushdown::new_post_optimization();
let plan = filter_pushdown.optimize(plan, config)?;

Comment thread
peasee marked this conversation as resolved.
// Fix ParquetSource metadata_size_hint lost during serialization
let plan = fix_parquet_metadata_size_hint(plan)?;

Expand Down
1 change: 1 addition & 0 deletions ballista/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
stage_id as usize,
plan,
&executor.work_dir,
task_context.session_config().options(),
)?;
dedicated_executor.spawn(async move {
use std::panic::AssertUnwindSafe;
Expand Down
1 change: 1 addition & 0 deletions ballista/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
stage_id,
plan,
&self.executor.work_dir,
task.session_config.options(),
)
.unwrap();

Expand Down
1 change: 1 addition & 0 deletions ballista/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,4 @@ tonic-prost-build = { workspace = true }

[dev-dependencies]
rstest = { workspace = true }
tempfile = { workspace = true }
141 changes: 127 additions & 14 deletions ballista/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,23 +169,51 @@ impl DefaultDistributedPlanner {
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
} else if let Some(_sort_preserving_merge) = execution_plan
} else if let Some(sort_preserving_merge) = execution_plan
.as_any()
.downcast_ref::<SortPreservingMergeExec>(
) {
let shuffle_writer = create_shuffle_writer_with_config(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
config,
)?;
let unresolved_shuffle = create_unresolved_shuffle(shuffle_writer.as_ref());
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
// For TopK queries (SortPreservingMergeExec with a small fetch/limit),
// skip the stage break and keep the merge in the same stage as its children.
// This avoids the overhead of shuffle write/read for a small number of rows,
// which dominates execution time for TopK queries in distributed mode.
//
// Note on parallelism: because SortPreservingMergeExec has an output
// partitioning of 1, the entire stage becomes a single task assigned to
// one executor (ShuffleWriterExec::input_partition_count() == 1).
// This does sacrifice cluster-level parallelism (no cross-executor
// distribution). However, within that executor the child partitions
// still execute as parallel async streams, so intra-executor parallelism
// is preserved. For small fetch values this trade-off is worthwhile as
// the shuffle coordination overhead far exceeds the merge cost.
const TOPK_FETCH_THRESHOLD: usize = 1000;
if sort_preserving_merge
.fetch()
.is_some_and(|f| f <= TOPK_FETCH_THRESHOLD)
{
Comment thread
peasee marked this conversation as resolved.
Comment thread
peasee marked this conversation as resolved.
Ok((
with_new_children_if_necessary(execution_plan, children)?,
stages,
))
} else {
let shuffle_writer = create_shuffle_writer_with_config(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
config,
)?;
let unresolved_shuffle =
create_unresolved_shuffle(shuffle_writer.as_ref());
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(
execution_plan,
vec![unresolved_shuffle],
)?,
stages,
))
}
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
Expand Down Expand Up @@ -940,6 +968,91 @@ order by
Ok(result_exec_plan)
}

/// Verifies that TopK queries (ORDER BY ... LIMIT N, where N is small)
/// do NOT create a stage break at SortPreservingMergeExec, avoiding
/// shuffle overhead for small result sets.
#[tokio::test]
async fn test_topk_avoids_stage_break() -> Result<(), BallistaError> {
use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use std::io::Write;

let tmp_dir = tempfile::tempdir().unwrap();
let schema = "id,value\n";
for i in 0..4 {
let path = tmp_dir.path().join(format!("part{i:02}.csv"));
let mut f = std::fs::File::create(&path).unwrap();
write!(f, "{schema}").unwrap();
for j in 0..10 {
writeln!(f, "{},{}", i * 10 + j, (i * 10 + j) * 100).unwrap();
}
}

let config = SessionConfig::new().with_target_partitions(4);
let ctx = SessionContext::new_with_config(config);
ctx.register_csv(
"test_table",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new(),
)
.await?;

// TopK query with small LIMIT — should produce a single stage
let df = ctx
.sql("SELECT id, value FROM test_table ORDER BY value DESC LIMIT 10")
.await?;
let plan = df.into_optimized_plan()?;
let plan = ctx.state().create_physical_plan(&plan).await?;

let mut planner = DefaultDistributedPlanner::new();
let stages = planner.plan_query_stages(
"job-topk",
plan,
ctx.state().config().options(),
)?;

for (i, stage) in stages.iter().enumerate() {
println!(
"TopK Stage {i}:\n{}",
displayable(stage.as_ref()).indent(false)
);
}

// Should be a single stage (no shuffle for TopK with small limit)
assert_eq!(
1,
stages.len(),
"TopK with small LIMIT should produce 1 stage, got {}",
stages.len()
);

// The single stage should contain SortPreservingMergeExec
let root = stages[0].children()[0].clone();
let _merge = downcast_exec!(root, SortPreservingMergeExec);

// Without LIMIT, the same query should produce 2 stages (with shuffle)
let df_no_limit = ctx
.sql("SELECT id, value FROM test_table ORDER BY value DESC")
.await?;
let plan_no_limit = df_no_limit.into_optimized_plan()?;
let plan_no_limit = ctx.state().create_physical_plan(&plan_no_limit).await?;

let mut planner2 = DefaultDistributedPlanner::new();
let stages_no_limit = planner2.plan_query_stages(
"job-no-limit",
plan_no_limit,
ctx.state().config().options(),
)?;

assert_eq!(
2,
stages_no_limit.len(),
"ORDER BY without LIMIT should produce 2 stages, got {}",
stages_no_limit.len()
);

Ok(())
}

fn memory_exec(
schema: Arc<Schema>,
partition_count: usize,
Expand Down
Loading