fix: Improve TopK filter pushdown#28
Conversation
There was a problem hiding this comment.
Pull request overview
This PR improves distributed TopK execution in Ballista by (1) ensuring TopK-related dynamic filter links survive scheduler→executor protobuf roundtrips, and (2) avoiding unnecessary stage breaks/shuffles for small TopK limits.
Changes:
- Skip the SortPreservingMergeExec stage break for “small” TopK fetch/limits to avoid shuffle overhead.
- Re-run DataFusion’s post-optimization filter pushdown on executors to re-establish dynamic filter linkages after deserialization.
- Add a regression test for TopK stage planning and introduce
tempfileas a scheduler dev-dependency.
Reviewed changes
Copilot reviewed 4 out of 5 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
ballista/scheduler/src/planner.rs |
Adds TopK stage-collapsing logic for small fetch on SortPreservingMergeExec and a new test validating stage counts. |
ballista/scheduler/Cargo.toml |
Adds tempfile for the new planner test. |
ballista/executor/src/execution_engine.rs |
Re-runs FilterPushdown(Post) on the executor side to restore dynamic filter wiring after serde. |
ballista/core/src/execution_plans/shuffle_writer.rs |
Removes manual write-time timer handling for the non-partitioned shuffle write path. |
Cargo.lock |
Records the added tempfile dependency. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Improves distributed TopK execution in Ballista by (1) ensuring dynamic filter pushdown is re-established after plan serialization/deserialization and (2) avoiding shuffle-stage overhead for small ORDER BY ... LIMIT N queries by collapsing certain stage breaks.
Changes:
- Skip creating a stage break at
SortPreservingMergeExecwhenfetch/limitis small to avoid shuffle overhead for TopK. - Re-run DataFusion’s post-optimization
FilterPushdownon the executor to restore dynamic filter links lost during protobuf round-trip. - Add a scheduler-side regression test for TopK stage-planning behavior and include
tempfilefor the test.
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
ballista/scheduler/src/planner.rs |
Adds TopK-specific stage-planning heuristic and a regression test asserting stage count behavior. |
ballista/scheduler/Cargo.toml |
Adds tempfile as a dev-dependency for the new test. |
ballista/executor/src/execution_engine.rs |
Re-runs FilterPushdown after deserialization to restore dynamic filter pushdown links. |
Cargo.lock |
Locks tempfile inclusion. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Arc<T>was breaking when they were serialized/deserialized into executors. This resulted in each executor creating its own independent TopK filter, resulting in more rows scanned than necessaryLIMIT Nhas a smallN- if theNis small enough, there is no need to shuffle data as we can merge it all from the single stage. This avoids the overhead of 1) coordinating launching a new stage and 2) reading/writing from shuffle for only a handful of rows.