Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions ballista/executor/src/execution_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ use ballista_core::execution_plans::sort_shuffle::SortShuffleWriterExec;
use ballista_core::serde::protobuf::ShuffleWritePartition;
use ballista_core::utils;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::config::ConfigOptions;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::MetricsSet;
use std::fmt::{Debug, Display};
Expand Down Expand Up @@ -136,6 +139,13 @@ impl ExecutionEngine for DefaultExecutionEngine {
plan: Arc<dyn ExecutionPlan>,
work_dir: &str,
) -> Result<Arc<dyn QueryStageExecutor>> {
// Re-run FilterPushdown(Post) to re-establish dynamic filter links
// (e.g., TopK → DataSourceExec) that are lost during protobuf
// serialization/deserialization between scheduler and executor.
let filter_pushdown = FilterPushdown::new_post_optimization();
let config = ConfigOptions::default();
let plan = filter_pushdown.optimize(plan, &config)?;
Comment thread
peasee marked this conversation as resolved.
Outdated
Comment thread
peasee marked this conversation as resolved.
Outdated

Comment thread
peasee marked this conversation as resolved.
// Fix ParquetSource metadata_size_hint lost during serialization
let plan = fix_parquet_metadata_size_hint(plan)?;

Expand Down
1 change: 1 addition & 0 deletions ballista/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,4 @@ tonic-prost-build = { workspace = true }

[dev-dependencies]
rstest = { workspace = true }
tempfile = { workspace = true }
141 changes: 127 additions & 14 deletions ballista/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,23 +169,51 @@ impl DefaultDistributedPlanner {
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
} else if let Some(_sort_preserving_merge) = execution_plan
} else if let Some(sort_preserving_merge) = execution_plan
.as_any()
.downcast_ref::<SortPreservingMergeExec>(
) {
let shuffle_writer = create_shuffle_writer_with_config(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
config,
)?;
let unresolved_shuffle = create_unresolved_shuffle(shuffle_writer.as_ref());
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
// For TopK queries (SortPreservingMergeExec with a small fetch/limit),
// skip the stage break and keep the merge in the same stage as its children.
// This avoids the overhead of shuffle write/read for a small number of rows,
// which dominates execution time for TopK queries in distributed mode.
//
// Note on parallelism: because SortPreservingMergeExec has an output
// partitioning of 1, the entire stage becomes a single task assigned to
// one executor (ShuffleWriterExec::input_partition_count() == 1).
// This does sacrifice cluster-level parallelism (no cross-executor
// distribution). However, within that executor the child partitions
// still execute as parallel async streams, so intra-executor parallelism
// is preserved. For small fetch values this trade-off is worthwhile as
// the shuffle coordination overhead far exceeds the merge cost.
const TOPK_FETCH_THRESHOLD: usize = 1000;
if sort_preserving_merge
.fetch()
.is_some_and(|f| f <= TOPK_FETCH_THRESHOLD)
{
Comment thread
peasee marked this conversation as resolved.
Comment thread
peasee marked this conversation as resolved.
Ok((
with_new_children_if_necessary(execution_plan, children)?,
stages,
))
} else {
let shuffle_writer = create_shuffle_writer_with_config(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
config,
)?;
let unresolved_shuffle =
create_unresolved_shuffle(shuffle_writer.as_ref());
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(
execution_plan,
vec![unresolved_shuffle],
)?,
stages,
))
}
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
Expand Down Expand Up @@ -940,6 +968,91 @@ order by
Ok(result_exec_plan)
}

/// Verifies that TopK queries (ORDER BY ... LIMIT N, where N is small)
/// do NOT create a stage break at SortPreservingMergeExec, avoiding
/// shuffle overhead for small result sets.
#[tokio::test]
async fn test_topk_avoids_stage_break() -> Result<(), BallistaError> {
use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use std::io::Write;

let tmp_dir = tempfile::tempdir().unwrap();
let schema = "id,value\n";
for i in 0..4 {
let path = tmp_dir.path().join(format!("part{i:02}.csv"));
let mut f = std::fs::File::create(&path).unwrap();
write!(f, "{schema}").unwrap();
for j in 0..10 {
writeln!(f, "{},{}", i * 10 + j, (i * 10 + j) * 100).unwrap();
}
}

let config = SessionConfig::new().with_target_partitions(4);
let ctx = SessionContext::new_with_config(config);
ctx.register_csv(
"test_table",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new(),
)
.await?;

// TopK query with small LIMIT — should produce a single stage
let df = ctx
.sql("SELECT id, value FROM test_table ORDER BY value DESC LIMIT 10")
.await?;
let plan = df.into_optimized_plan()?;
let plan = ctx.state().create_physical_plan(&plan).await?;

let mut planner = DefaultDistributedPlanner::new();
let stages = planner.plan_query_stages(
"job-topk",
plan,
ctx.state().config().options(),
)?;

for (i, stage) in stages.iter().enumerate() {
println!(
"TopK Stage {i}:\n{}",
displayable(stage.as_ref()).indent(false)
);
}

// Should be a single stage (no shuffle for TopK with small limit)
assert_eq!(
1,
stages.len(),
"TopK with small LIMIT should produce 1 stage, got {}",
stages.len()
);

// The single stage should contain SortPreservingMergeExec
let root = stages[0].children()[0].clone();
let _merge = downcast_exec!(root, SortPreservingMergeExec);

// Without LIMIT, the same query should produce 2 stages (with shuffle)
let df_no_limit = ctx
.sql("SELECT id, value FROM test_table ORDER BY value DESC")
.await?;
let plan_no_limit = df_no_limit.into_optimized_plan()?;
let plan_no_limit = ctx.state().create_physical_plan(&plan_no_limit).await?;

let mut planner2 = DefaultDistributedPlanner::new();
let stages_no_limit = planner2.plan_query_stages(
"job-no-limit",
plan_no_limit,
ctx.state().config().options(),
)?;

assert_eq!(
2,
stages_no_limit.len(),
"ORDER BY without LIMIT should produce 2 stages, got {}",
stages_no_limit.len()
);

Ok(())
}

fn memory_exec(
schema: Arc<Schema>,
partition_count: usize,
Expand Down
Loading