Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions ballista/client/tests/context_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,78 @@ mod supported {
Ok(())
}

/// Regression test: nested CollectLeft HashJoinExec with
/// CoalescePartitionsExec should not deadlock.
///
/// This reproduces the pattern from TPC-H Q2 where a chain of
/// small-table inner joins (region→nation→supplier) is broadcast-joined
/// against a large partitioned table (partsupp). The scheduler enables
/// CollectLeft for inner joins under the broadcast threshold, and each
/// executor task runs exactly ONE partition. If any cross-partition
/// synchronisation (e.g. a tokio Barrier) is used in the build-side
/// completion path, it will deadlock because only one partition
/// participates per task.
#[rstest]
#[case::standalone(standalone_context())]
Comment thread
peasee marked this conversation as resolved.
#[tokio::test]
async fn nested_collect_left_should_not_deadlock(
#[future(awt)]
#[case]
ctx: SessionContext,
test_data: String,
Comment thread
peasee marked this conversation as resolved.
) -> datafusion::error::Result<()> {
// Use alltypes_plain.parquet registered as 3 different tables
// to create a nested inner join query where the optimizer
// should choose CollectLeft for the small tables.
ctx.register_parquet(
"fact_table",
&format!("{test_data}/alltypes_plain.parquet"),
Default::default(),
)
.await?;

ctx.register_parquet(
"dim_a",
&format!("{test_data}/alltypes_plain.parquet"),
Default::default(),
)
.await?;

ctx.register_parquet(
"dim_b",
&format!("{test_data}/alltypes_plain.parquet"),
Default::default(),
)
.await?;

// Query with nested inner joins: dim_b → dim_a → fact_table
let result = tokio::time::timeout(
std::time::Duration::from_secs(120),
ctx.sql(
"SELECT f.id, a.int_col, b.string_col
FROM fact_table f
Comment thread
peasee marked this conversation as resolved.
INNER JOIN dim_a a ON f.id = a.id
INNER JOIN dim_b b ON a.tinyint_col = b.tinyint_col
ORDER BY f.id
LIMIT 5",
)
.await?
.collect(),
)
Comment thread
peasee marked this conversation as resolved.
.await
.expect("nested CollectLeft joins should complete within 120s, not deadlock");
Comment thread
peasee marked this conversation as resolved.

let result = result?;
// Verify we got results
assert!(!result.is_empty(), "query should return results");
assert!(
result[0].num_rows() > 0,
"query should return at least one row"
);

Ok(())
}

#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
Expand Down
42 changes: 42 additions & 0 deletions ballista/core/src/execution_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,45 @@ pub use vortex_shuffle::{
LocalVortexShuffleStream, VortexWriteTracker, vortex_file_extension,
write_stream_to_disk_vortex,
};

use datafusion::common::tree_node::Transformed;
use datafusion::error::Result;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
use std::sync::Arc;

/// Rebuild a `HashJoinExec` node via `try_new()` to strip any dynamic-filter
/// accumulator (e.g. `SharedBuildAccumulator`). The accumulator uses a
/// cross-partition `Barrier` that deadlocks in Ballista where each task runs a
/// single partition. `try_new()` never attaches an accumulator, so this is
/// always safe.
///
/// If `node` is not a `HashJoinExec`, returns `Transformed::no(node)`.
pub fn rebuild_hash_join_without_accumulator(
node: Arc<dyn ExecutionPlan>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
if let Some(hj) = node.as_any().downcast_ref::<HashJoinExec>() {
let left = Arc::clone(hj.left());
let left: Arc<dyn ExecutionPlan> = if *hj.partition_mode()
== PartitionMode::CollectLeft
&& left.properties().output_partitioning().partition_count() > 1
{
Arc::new(CoalescePartitionsExec::new(left))
} else {
left
};
let rebuilt: Arc<dyn ExecutionPlan> = Arc::new(HashJoinExec::try_new(
left,
Arc::clone(hj.right()),
hj.on().to_vec(),
hj.filter().cloned(),
hj.join_type(),
hj.projection.clone(),
*hj.partition_mode(),
hj.null_equality(),
)?);
return Ok(Transformed::yes(rebuilt));
}
Ok(Transformed::no(node))
}
4 changes: 4 additions & 0 deletions ballista/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ use datafusion::physical_plan::metrics::{
self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};

use datafusion::common::tree_node::TreeNode;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics, displayable,
Expand Down Expand Up @@ -339,6 +340,9 @@ impl ShuffleWriterExec {

async move {
let now = Instant::now();
let plan = plan
.transform_down(&super::rebuild_hash_join_without_accumulator)?
.data;
Comment thread
peasee marked this conversation as resolved.
let mut stream = plan.execute(input_partition, context)?;

if use_memory {
Expand Down
13 changes: 13 additions & 0 deletions ballista/scheduler/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
.set_u64(
"datafusion.optimizer.hash_join_single_partition_threshold_rows",
BROADCAST_THRESHOLD_ROWS,
)
// Dynamic filter pushdown for hash joins may use cross-partition
// synchronisation (e.g. tokio::sync::Barrier) that expects ALL
// probe-side partitions to report before any can proceed. In
// Ballista each task runs a single partition, so the barrier
// waits forever. Disable to prevent deadlocks.
.set_bool(
"datafusion.optimizer.enable_join_dynamic_filter_pushdown",
false,
);

// Use the adjusted config for both physical planning, stage resolution,
Expand Down Expand Up @@ -629,6 +638,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
let explain_fmt = explain_format.unwrap_or(ExplainFormat::Indent);

let plan = plan.transform_down(&|node: Arc<dyn ExecutionPlan>| {
let node = match ballista_core::execution_plans::rebuild_hash_join_without_accumulator(node)? {
t if t.transformed => return Ok(t),
t => t.data,
};
Comment thread
peasee marked this conversation as resolved.
if node.output_partitioning().partition_count() == 0 {
let empty: Arc<dyn ExecutionPlan> =
Arc::new(EmptyExec::new(node.schema()));
Expand Down