fix: Disable dynamic filters on HashJoinExec across broadcast joins#33
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses a distributed execution deadlock in Ballista caused by DataFusion hash-join dynamic filter state that relies on cross-partition synchronization (e.g., tokio::sync::Barrier) which cannot be satisfied when partitions execute in separate tasks/executors.
Changes:
- Disables DataFusion join dynamic filter pushdown in the scheduler’s adjusted session config to avoid generating dynamic filters for distributed planning.
- Rebuilds
HashJoinExecnodes viatry_new()during plan transforms (scheduler + executor shuffle writing) to strip any dynamic-filter accumulator state, and coalesces build-side partitions forCollectLeft. - Adds a regression test intended to ensure nested broadcast (
CollectLeft) joins do not deadlock.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
ballista/scheduler/src/state/mod.rs |
Disables join dynamic filter pushdown and rewrites HashJoinExec nodes in the planned physical plan before job submission. |
ballista/core/src/execution_plans/shuffle_writer.rs |
Rewrites HashJoinExec nodes before executing shuffle write to avoid deadlock-inducing dynamic filter state. |
ballista/client/tests/context_checks.rs |
Adds a regression test that times out if nested joins deadlock. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
HashJoinExecwould configure a dynamic filter that attempts to accumulate across multiple executors/tasks. Because the accumulator is synchronized with a Barrier, but the tasks aren't guaranteed to run on the same executor where the Barrier conditions could be met (all partitions finished), we need to remove the dynamic filter when re-creating joins on executors.