Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions ballista/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ impl ShuffleWriterExec {
) -> Result<Vec<ShuffleWritePartition>> {
match output_partitioning {
None => {
let timer = write_metrics.write_time.timer();
Comment thread
peasee marked this conversation as resolved.
path.push(format!("{input_partition}"));
std::fs::create_dir_all(&path)?;
path.push(format!("data.{file_ext}"));
Expand All @@ -429,7 +428,6 @@ impl ShuffleWriterExec {
write_metrics
.output_rows
.add(stats.num_rows.unwrap_or(0) as usize);
timer.done();

info!(
"Executed partition {} in {} seconds. Statistics: {}",
Expand Down
10 changes: 10 additions & 0 deletions ballista/executor/src/execution_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::execution_plans::sort_shuffle::SortShuffleWriterExec;
use ballista_core::serde::protobuf::ShuffleWritePartition;
use ballista_core::utils;
use datafusion::config::ConfigOptions;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::MetricsSet;
use std::fmt::{Debug, Display};
Expand Down Expand Up @@ -95,6 +98,13 @@ impl ExecutionEngine for DefaultExecutionEngine {
plan: Arc<dyn ExecutionPlan>,
work_dir: &str,
) -> Result<Arc<dyn QueryStageExecutor>> {
// Re-run FilterPushdown(Post) to re-establish dynamic filter links
// (e.g., TopK → DataSourceExec) that are lost during protobuf
// serialization/deserialization between scheduler and executor.
let filter_pushdown = FilterPushdown::new_post_optimization();
let config = ConfigOptions::default();
let plan = filter_pushdown.optimize(plan, &config)?;
Comment thread
peasee marked this conversation as resolved.
Outdated
Comment thread
peasee marked this conversation as resolved.
Outdated

Comment thread
peasee marked this conversation as resolved.
// the query plan created by the scheduler always starts with a shuffle writer
// (either ShuffleWriterExec or SortShuffleWriterExec)
if let Some(shuffle_writer) = plan.as_any().downcast_ref::<ShuffleWriterExec>() {
Expand Down
1 change: 1 addition & 0 deletions ballista/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,4 @@ tonic-prost-build = { workspace = true }

[dev-dependencies]
rstest = { workspace = true }
tempfile = { workspace = true }
134 changes: 120 additions & 14 deletions ballista/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,23 +160,44 @@ impl DefaultDistributedPlanner {
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
} else if let Some(_sort_preserving_merge) = execution_plan
} else if let Some(sort_preserving_merge) = execution_plan
.as_any()
.downcast_ref::<SortPreservingMergeExec>(
) {
let shuffle_writer = create_shuffle_writer_with_config(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
config,
)?;
let unresolved_shuffle = create_unresolved_shuffle(shuffle_writer.as_ref());
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
// For TopK queries (SortPreservingMergeExec with a small fetch/limit),
// skip the stage break and keep the merge in the same stage as its children.
// This avoids the overhead of shuffle write/read for a small number of rows,
// which dominates execution time for TopK queries in distributed mode.
// The merge will run as a single task on one executor with all partitions
// executing as parallel threads within that executor.
const TOPK_FETCH_THRESHOLD: usize = 1000;
if sort_preserving_merge
.fetch()
.is_some_and(|f| f <= TOPK_FETCH_THRESHOLD)
{
Comment thread
peasee marked this conversation as resolved.
Comment thread
peasee marked this conversation as resolved.
Ok((
with_new_children_if_necessary(execution_plan, children)?,
stages,
))
} else {
let shuffle_writer = create_shuffle_writer_with_config(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
config,
)?;
let unresolved_shuffle =
create_unresolved_shuffle(shuffle_writer.as_ref());
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(
execution_plan,
vec![unresolved_shuffle],
)?,
stages,
))
}
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
Expand Down Expand Up @@ -860,4 +881,89 @@ order by
(proto).try_into_physical_plan(ctx, codec.physical_extension_codec())?;
Ok(result_exec_plan)
}

/// Verifies that TopK queries (ORDER BY ... LIMIT N, where N is small)
/// do NOT create a stage break at SortPreservingMergeExec, avoiding
/// shuffle overhead for small result sets.
#[tokio::test]
async fn test_topk_avoids_stage_break() -> Result<(), BallistaError> {
use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use std::io::Write;

let tmp_dir = tempfile::tempdir().unwrap();
let schema = "id,value\n";
for i in 0..4 {
let path = tmp_dir.path().join(format!("part{i:02}.csv"));
let mut f = std::fs::File::create(&path).unwrap();
write!(f, "{schema}").unwrap();
for j in 0..10 {
writeln!(f, "{},{}", i * 10 + j, (i * 10 + j) * 100).unwrap();
}
}

let config = SessionConfig::new().with_target_partitions(4);
let ctx = SessionContext::new_with_config(config);
ctx.register_csv(
"test_table",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new(),
)
.await?;

// TopK query with small LIMIT — should produce a single stage
let df = ctx
.sql("SELECT id, value FROM test_table ORDER BY value DESC LIMIT 10")
.await?;
let plan = df.into_optimized_plan()?;
let plan = ctx.state().create_physical_plan(&plan).await?;

let mut planner = DefaultDistributedPlanner::new();
let stages = planner.plan_query_stages(
"job-topk",
plan,
ctx.state().config().options(),
)?;

for (i, stage) in stages.iter().enumerate() {
println!(
"TopK Stage {i}:\n{}",
displayable(stage.as_ref()).indent(false)
);
}

// Should be a single stage (no shuffle for TopK with small limit)
assert_eq!(
1,
stages.len(),
"TopK with small LIMIT should produce 1 stage, got {}",
stages.len()
);

// The single stage should contain SortPreservingMergeExec
let root = stages[0].children()[0].clone();
let _merge = downcast_exec!(root, SortPreservingMergeExec);

// Without LIMIT, the same query should produce 2 stages (with shuffle)
let df_no_limit = ctx
.sql("SELECT id, value FROM test_table ORDER BY value DESC")
.await?;
let plan_no_limit = df_no_limit.into_optimized_plan()?;
let plan_no_limit = ctx.state().create_physical_plan(&plan_no_limit).await?;

let mut planner2 = DefaultDistributedPlanner::new();
let stages_no_limit = planner2.plan_query_stages(
"job-no-limit",
plan_no_limit,
ctx.state().config().options(),
)?;

assert_eq!(
2,
stages_no_limit.len(),
"ORDER BY without LIMIT should produce 2 stages, got {}",
stages_no_limit.len()
);

Ok(())
}
}
Loading