Skip to content

Commit 383e165

Browse files
authored
Merge pull request #33 from spiceai/peasee/260422-revive-offers-on-job-update
fix: Disable dynamic filters on HashJoinExec across broadcast joins
2 parents e6bcc66 + dcaf3f0 commit 383e165

4 files changed

Lines changed: 131 additions & 0 deletions

File tree

ballista/client/tests/context_checks.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1109,6 +1109,78 @@ mod supported {
11091109
Ok(())
11101110
}
11111111

1112+
/// Regression test: nested CollectLeft HashJoinExec with
1113+
/// CoalescePartitionsExec should not deadlock.
1114+
///
1115+
/// This reproduces the pattern from TPC-H Q2 where a chain of
1116+
/// small-table inner joins (region→nation→supplier) is broadcast-joined
1117+
/// against a large partitioned table (partsupp). The scheduler enables
1118+
/// CollectLeft for inner joins under the broadcast threshold, and each
1119+
/// executor task runs exactly ONE partition. If any cross-partition
1120+
/// synchronisation (e.g. a tokio Barrier) is used in the build-side
1121+
/// completion path, it will deadlock because only one partition
1122+
/// participates per task.
1123+
#[rstest]
1124+
#[case::standalone(standalone_context())]
1125+
#[tokio::test]
1126+
async fn nested_collect_left_should_not_deadlock(
1127+
#[future(awt)]
1128+
#[case]
1129+
ctx: SessionContext,
1130+
test_data: String,
1131+
) -> datafusion::error::Result<()> {
1132+
// Use alltypes_plain.parquet registered as 3 different tables
1133+
// to create a nested inner join query where the optimizer
1134+
// should choose CollectLeft for the small tables.
1135+
ctx.register_parquet(
1136+
"fact_table",
1137+
&format!("{test_data}/alltypes_plain.parquet"),
1138+
Default::default(),
1139+
)
1140+
.await?;
1141+
1142+
ctx.register_parquet(
1143+
"dim_a",
1144+
&format!("{test_data}/alltypes_plain.parquet"),
1145+
Default::default(),
1146+
)
1147+
.await?;
1148+
1149+
ctx.register_parquet(
1150+
"dim_b",
1151+
&format!("{test_data}/alltypes_plain.parquet"),
1152+
Default::default(),
1153+
)
1154+
.await?;
1155+
1156+
// Query with nested inner joins: dim_b → dim_a → fact_table
1157+
let result = tokio::time::timeout(
1158+
std::time::Duration::from_secs(120),
1159+
ctx.sql(
1160+
"SELECT f.id, a.int_col, b.string_col
1161+
FROM fact_table f
1162+
INNER JOIN dim_a a ON f.id = a.id
1163+
INNER JOIN dim_b b ON a.tinyint_col = b.tinyint_col
1164+
ORDER BY f.id
1165+
LIMIT 5",
1166+
)
1167+
.await?
1168+
.collect(),
1169+
)
1170+
.await
1171+
.expect("nested CollectLeft joins should complete within 120s, not deadlock");
1172+
1173+
let result = result?;
1174+
// Verify we got results
1175+
assert!(!result.is_empty(), "query should return results");
1176+
assert!(
1177+
result[0].num_rows() > 0,
1178+
"query should return at least one row"
1179+
);
1180+
1181+
Ok(())
1182+
}
1183+
11121184
#[rstest]
11131185
#[case::standalone(standalone_context())]
11141186
#[case::remote(remote_context())]

ballista/core/src/execution_plans/mod.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,45 @@ pub use vortex_shuffle::{
4848
LocalVortexShuffleStream, VortexWriteTracker, vortex_file_extension,
4949
write_stream_to_disk_vortex,
5050
};
51+
52+
use datafusion::common::tree_node::Transformed;
53+
use datafusion::error::Result;
54+
use datafusion::physical_plan::ExecutionPlan;
55+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
56+
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
57+
use std::sync::Arc;
58+
59+
/// Rebuild a `HashJoinExec` node via `try_new()` to strip any dynamic-filter
60+
/// accumulator (e.g. `SharedBuildAccumulator`). The accumulator uses a
61+
/// cross-partition `Barrier` that deadlocks in Ballista where each task runs a
62+
/// single partition. `try_new()` never attaches an accumulator, so this is
63+
/// always safe.
64+
///
65+
/// If `node` is not a `HashJoinExec`, returns `Transformed::no(node)`.
66+
pub fn rebuild_hash_join_without_accumulator(
67+
node: Arc<dyn ExecutionPlan>,
68+
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
69+
if let Some(hj) = node.as_any().downcast_ref::<HashJoinExec>() {
70+
let left = Arc::clone(hj.left());
71+
let left: Arc<dyn ExecutionPlan> = if *hj.partition_mode()
72+
== PartitionMode::CollectLeft
73+
&& left.properties().output_partitioning().partition_count() > 1
74+
{
75+
Arc::new(CoalescePartitionsExec::new(left))
76+
} else {
77+
left
78+
};
79+
let rebuilt: Arc<dyn ExecutionPlan> = Arc::new(HashJoinExec::try_new(
80+
left,
81+
Arc::clone(hj.right()),
82+
hj.on().to_vec(),
83+
hj.filter().cloned(),
84+
hj.join_type(),
85+
hj.projection.clone(),
86+
*hj.partition_mode(),
87+
hj.null_equality(),
88+
)?);
89+
return Ok(Transformed::yes(rebuilt));
90+
}
91+
Ok(Transformed::no(node))
92+
}

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use datafusion::physical_plan::metrics::{
5858
self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
5959
};
6060

61+
use datafusion::common::tree_node::TreeNode;
6162
use datafusion::physical_plan::{
6263
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
6364
SendableRecordBatchStream, Statistics, displayable,
@@ -339,6 +340,9 @@ impl ShuffleWriterExec {
339340

340341
async move {
341342
let now = Instant::now();
343+
let plan = plan
344+
.transform_down(&super::rebuild_hash_join_without_accumulator)?
345+
.data;
342346
let mut stream = plan.execute(input_partition, context)?;
343347

344348
if use_memory {

ballista/scheduler/src/state/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,15 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
593593
.set_u64(
594594
"datafusion.optimizer.hash_join_single_partition_threshold_rows",
595595
BROADCAST_THRESHOLD_ROWS,
596+
)
597+
// Dynamic filter pushdown for hash joins may use cross-partition
598+
// synchronisation (e.g. tokio::sync::Barrier) that expects ALL
599+
// probe-side partitions to report before any can proceed. In
600+
// Ballista each task runs a single partition, so the barrier
601+
// waits forever. Disable to prevent deadlocks.
602+
.set_bool(
603+
"datafusion.optimizer.enable_join_dynamic_filter_pushdown",
604+
false,
596605
);
597606

598607
// Use the adjusted config for both physical planning, stage resolution,
@@ -629,6 +638,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
629638
let explain_fmt = explain_format.unwrap_or(ExplainFormat::Indent);
630639

631640
let plan = plan.transform_down(&|node: Arc<dyn ExecutionPlan>| {
641+
let node = match ballista_core::execution_plans::rebuild_hash_join_without_accumulator(node)? {
642+
t if t.transformed => return Ok(t),
643+
t => t.data,
644+
};
632645
if node.output_partitioning().partition_count() == 0 {
633646
let empty: Arc<dyn ExecutionPlan> =
634647
Arc::new(EmptyExec::new(node.schema()));

0 commit comments

Comments
 (0)