Skip to content

Commit dfb1043

Browse files
committed
Add EnsureRequirements: idempotent merged EnforceDistribution + EnforceSorting
## Summary Replace the separate `EnforceDistribution` and `EnforceSorting` optimizer rules with a single `EnsureRequirements` rule in the default optimizer chain. This makes the composition idempotent by fixing distribution-awareness in `pushdown_sorts` and fetch preservation in `EnforceDistribution`. ## Problem `EnforceDistribution` and `EnforceSorting` are coupled through `SortExec.preserve_partitioning` but run as independent rules. This caused: 1. **Production 502 errors**: `pushdown_sorts` set `preserve_partitioning=true` without `SortPreservingMergeExec`, violating `SinglePartition` requirements from `GlobalLimitExec` → `SanityCheckPlan` failure. 2. **Non-idempotent composition**: Running the rules multiple times produced different (sometimes invalid) plans. 3. **Lost fetch values** (#14150): `EnforceDistribution` dropped `fetch` from `SortPreservingMergeExec` when stripping and re-adding distribution operators. DataFusion was the only major engine with separate rules — Spark (`EnsureRequirements`) and Presto (`AddExchanges`) use a single rule. ## Changes ### `EnsureRequirements` rule (new) - Composes `EnforceDistribution::optimize()` + `EnforceSorting::optimize()` - Replaces both rules in the default optimizer chain - 53 comprehensive tests including idempotency verification ### Distribution-aware `pushdown_sorts` (fix) - Add `distribution_requirement` field to `ParentRequirements` - New `add_sort_above_with_distribution()` inserts `SortPreservingMergeExec` when parent requires `SinglePartition` and input has multiple partitions - Propagate distribution through recursion with `stronger_distribution()` - Reset distribution below partition-merging nodes (SPM, single-partition outputs) ### Fix `EnforceDistribution` fetch preservation (#14150) - `remove_dist_changing_operators()` now saves fetch from removed SPM/Coalesce - `add_merge_on_top()` re-applies saved fetch to new operators ## Testing | Suite | Result | |-------|--------| | EnsureRequirements (new) | 53 passed | | enforce_sorting (existing) | 124 passed, 0 regressions | | enforce_distribution (existing) | 66 passed, 0 regressions | | SLT (465 files) | 1 pre-existing failure only | | **Total** | **243 unit + 464 SLT, 0 new failures** | Idempotency verified: - All partition counts 1-64 - Triple + 10x consecutive optimization passes - SortMergeJoin, HashJoin, Window, Aggregate topologies - PR #53/#54 regression scenarios - #14150 fetch preservation across passes Closes: #14150 Part of: #21973
1 parent bb86364 commit dfb1043

9 files changed

Lines changed: 2797 additions & 52 deletions

File tree

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,10 @@ fn preserving_order_enables_streaming(
970970
///
971971
/// Updated node with an execution plan, where the desired single distribution
972972
/// requirement is satisfied.
973-
fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
973+
fn add_merge_on_top(
974+
input: DistributionContext,
975+
fetch: Option<usize>,
976+
) -> DistributionContext {
974977
// Apply only when the partition count is larger than one.
975978
if input.plan.output_partitioning().partition_count() > 1 {
976979
// When there is an existing ordering, we preserve ordering
@@ -979,14 +982,20 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
979982
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
980983
// - Usage of order preserving variants is not desirable
981984
// (determined by flag `config.optimizer.prefer_existing_sort`)
982-
let new_plan = if let Some(req) = input.plan.output_ordering() {
983-
Arc::new(SortPreservingMergeExec::new(
984-
req.clone(),
985-
Arc::clone(&input.plan),
986-
)) as _
985+
let new_plan: Arc<dyn ExecutionPlan> = if let Some(req) =
986+
input.plan.output_ordering()
987+
{
988+
let mut spm =
989+
SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan));
990+
if let Some(f) = fetch {
991+
spm = spm.with_fetch(Some(f));
992+
}
993+
Arc::new(spm)
987994
} else {
988995
// If there is no input order, we can simply coalesce partitions:
989-
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
996+
Arc::new(
997+
CoalescePartitionsExec::new(Arc::clone(&input.plan)).with_fetch(fetch),
998+
)
990999
};
9911000

9921001
DistributionContext::new(new_plan, true, vec![input])
@@ -1012,20 +1021,41 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
10121021
/// ```text
10131022
/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
10141023
/// ```
1024+
/// Returned by [`remove_dist_changing_operators`] to carry the fetch value
1025+
/// that may have been on a removed `SortPreservingMergeExec` or `CoalescePartitionsExec`.
1026+
struct RemovedDistOps {
1027+
context: DistributionContext,
1028+
/// The fetch value from the removed SPM/Coalesce, if any.
1029+
/// Must be re-applied when distribution operators are re-inserted.
1030+
removed_fetch: Option<usize>,
1031+
}
1032+
10151033
fn remove_dist_changing_operators(
10161034
mut distribution_context: DistributionContext,
1017-
) -> Result<DistributionContext> {
1035+
) -> Result<RemovedDistOps> {
1036+
let mut removed_fetch = None;
10181037
while is_repartition(&distribution_context.plan)
10191038
|| is_coalesce_partitions(&distribution_context.plan)
10201039
|| is_sort_preserving_merge(&distribution_context.plan)
10211040
{
1041+
// Preserve fetch from SPM or CoalescePartitions before removing (#14150).
1042+
if let Some(fetch) = distribution_context.plan.fetch() {
1043+
removed_fetch = Some(
1044+
removed_fetch
1045+
.map(|existing: usize| existing.min(fetch))
1046+
.unwrap_or(fetch),
1047+
);
1048+
}
10221049
// All of above operators have a single child. First child is only child.
10231050
// Remove any distribution changing operators at the beginning:
10241051
distribution_context = distribution_context.children.swap_remove(0);
10251052
// Note that they will be re-inserted later on if necessary or helpful.
10261053
}
10271054

1028-
Ok(distribution_context)
1055+
Ok(RemovedDistOps {
1056+
context: distribution_context,
1057+
removed_fetch,
1058+
})
10291059
}
10301060

10311061
/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
@@ -1219,11 +1249,16 @@ pub fn ensure_distribution(
12191249
let order_preserving_variants_desirable =
12201250
unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
12211251

1222-
// Remove unnecessary repartition from the physical plan if any
1223-
let DistributionContext {
1224-
mut plan,
1225-
data,
1226-
children,
1252+
// Remove unnecessary repartition from the physical plan if any.
1253+
// Preserve fetch from removed SPM/Coalesce (#14150).
1254+
let RemovedDistOps {
1255+
context:
1256+
DistributionContext {
1257+
mut plan,
1258+
data,
1259+
children,
1260+
},
1261+
removed_fetch,
12271262
} = remove_dist_changing_operators(dist_context)?;
12281263

12291264
if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
@@ -1359,7 +1394,7 @@ pub fn ensure_distribution(
13591394
// Satisfy the distribution requirement if it is unmet.
13601395
match &requirement {
13611396
Distribution::SinglePartition => {
1362-
child = add_merge_on_top(child);
1397+
child = add_merge_on_top(child, removed_fetch);
13631398
}
13641399
Distribution::HashPartitioned(exprs) => {
13651400
// See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ impl PhysicalOptimizerRule for EnforceSorting {
262262
/// If the plan is not a [`SortExec`] or its child is not unbounded, returns the original plan.
263263
/// Otherwise, by checking the requirement satisfaction searches for a replacement chance.
264264
/// If there's one replaces the [`SortExec`] plan with a [`PartialSortExec`]
265-
fn replace_with_partial_sort(
265+
pub fn replace_with_partial_sort(
266266
plan: Arc<dyn ExecutionPlan>,
267267
) -> Result<Arc<dyn ExecutionPlan>> {
268268
let Some(sort_plan) = plan.downcast_ref::<SortExec>() else {

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

Lines changed: 98 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use std::fmt::Debug;
1919
use std::sync::Arc;
2020

2121
use crate::utils::{
22-
add_sort_above, is_sort, is_sort_preserving_merge, is_union, is_window,
22+
add_sort_above_with_distribution, is_sort, is_sort_preserving_merge, is_union,
23+
is_window,
2324
};
2425

2526
use arrow::datatypes::SchemaRef;
@@ -29,7 +30,7 @@ use datafusion_expr::JoinType;
2930
use datafusion_physical_expr::expressions::Column;
3031
use datafusion_physical_expr::utils::collect_columns;
3132
use datafusion_physical_expr::{
32-
EquivalenceProperties, add_offset_to_physical_sort_exprs,
33+
Distribution, EquivalenceProperties, add_offset_to_physical_sort_exprs,
3334
};
3435
use datafusion_physical_expr_common::sort_expr::{
3536
LexOrdering, LexRequirement, OrderingRequirements, PhysicalSortExpr,
@@ -55,23 +56,42 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
5556
/// of the parent node as its data.
5657
///
5758
/// [`EnforceSorting`]: crate::enforce_sorting::EnforceSorting
58-
#[derive(Default, Clone, Debug)]
59+
#[derive(Clone, Debug)]
5960
pub struct ParentRequirements {
6061
ordering_requirement: Option<OrderingRequirements>,
6162
fetch: Option<usize>,
63+
/// The distribution required by the consumer above any SortExec we insert.
64+
/// When this is `SinglePartition` and the input has multiple partitions,
65+
/// `add_sort_above_with_distribution` wraps the sort in `SortPreservingMergeExec`.
66+
distribution_requirement: Distribution,
67+
}
68+
69+
impl Default for ParentRequirements {
70+
fn default() -> Self {
71+
Self {
72+
ordering_requirement: None,
73+
fetch: None,
74+
distribution_requirement: Distribution::UnspecifiedDistribution,
75+
}
76+
}
6277
}
6378

6479
pub type SortPushDown = PlanContext<ParentRequirements>;
6580

6681
/// Assigns the ordering requirement of the root node to the its children.
6782
pub fn assign_initial_requirements(sort_push_down: &mut SortPushDown) {
6883
let reqs = sort_push_down.plan.required_input_ordering();
69-
for (child, requirement) in sort_push_down.children.iter_mut().zip(reqs) {
84+
let dists = sort_push_down.plan.required_input_distribution();
85+
for (idx, (child, requirement)) in
86+
sort_push_down.children.iter_mut().zip(reqs).enumerate()
87+
{
7088
child.data = ParentRequirements {
7189
ordering_requirement: requirement,
72-
// If the parent has a fetch value, assign it to the children
73-
// Or use the fetch value of the child.
7490
fetch: child.plan.fetch(),
91+
distribution_requirement: dists
92+
.get(idx)
93+
.cloned()
94+
.unwrap_or(Distribution::UnspecifiedDistribution),
7595
};
7696
}
7797
}
@@ -92,11 +112,25 @@ fn min_fetch(f1: Option<usize>, f2: Option<usize>) -> Option<usize> {
92112
}
93113
}
94114

115+
/// Returns the stricter of two distribution requirements.
116+
/// `SinglePartition` is the strictest.
117+
fn stronger_distribution(a: &Distribution, b: &Distribution) -> Distribution {
118+
match (a, b) {
119+
(Distribution::SinglePartition, _) | (_, Distribution::SinglePartition) => {
120+
Distribution::SinglePartition
121+
}
122+
(Distribution::HashPartitioned(_), _) => a.clone(),
123+
(_, Distribution::HashPartitioned(_)) => b.clone(),
124+
_ => Distribution::UnspecifiedDistribution,
125+
}
126+
}
127+
95128
fn pushdown_sorts_helper(
96129
mut sort_push_down: SortPushDown,
97130
) -> Result<Transformed<SortPushDown>> {
98131
let plan = sort_push_down.plan;
99132
let parent_fetch = sort_push_down.data.fetch;
133+
let parent_distribution = sort_push_down.data.distribution_requirement.clone();
100134

101135
let Some(parent_requirement) = sort_push_down.data.ordering_requirement.clone()
102136
else {
@@ -121,6 +155,14 @@ fn pushdown_sorts_helper(
121155
return pushdown_sorts_helper(sort_push_down);
122156
}
123157
sort_push_down.plan = plan;
158+
// No ordering is being pushed; use each child's own distribution requirement
159+
let dists = sort_push_down.plan.required_input_distribution();
160+
for (idx, child) in sort_push_down.children.iter_mut().enumerate() {
161+
child.data.distribution_requirement = dists
162+
.get(idx)
163+
.cloned()
164+
.unwrap_or(Distribution::UnspecifiedDistribution);
165+
}
124166
return Ok(Transformed::no(sort_push_down));
125167
};
126168

@@ -149,22 +191,29 @@ fn pushdown_sorts_helper(
149191
// The sort was imposing a different ordering than the one being
150192
// pushed down. Replace it with a sort that matches the pushed-down
151193
// ordering, and continue the pushdown.
152-
// Add back the sort:
153-
sort_push_down = add_sort_above(
194+
// Add back the sort (distribution-aware):
195+
sort_push_down = add_sort_above_with_distribution(
154196
sort_push_down,
155197
parent_requirement.into_single(),
156198
parent_fetch,
199+
&parent_distribution,
157200
);
158201
// Update pushdown requirements:
159202
sort_push_down.children[0].data = ParentRequirements {
160203
ordering_requirement: Some(OrderingRequirements::from(sort_ordering)),
161204
fetch: sort_fetch,
205+
distribution_requirement: Distribution::UnspecifiedDistribution,
162206
};
163207
return Ok(Transformed::yes(sort_push_down));
164208
} else {
165209
// Sort was unnecessary, just propagate the stricter fetch and
166-
// ordering requirements:
210+
// ordering requirements. Reset distribution to Unspecified
211+
// because the sort we're removing may have been below a
212+
// partition-merging node (like SortPreservingMergeExec) that
213+
// already satisfies SinglePartition.
167214
sort_push_down.data.fetch = min_fetch(sort_fetch, parent_fetch);
215+
sort_push_down.data.distribution_requirement =
216+
Distribution::UnspecifiedDistribution;
168217
let current_is_stricter = eqp.requirements_compatible(
169218
sort_ordering.clone().into(),
170219
parent_requirement.first().clone(),
@@ -184,30 +233,65 @@ fn pushdown_sorts_helper(
184233
if satisfy_parent {
185234
// For non-sort operators which satisfy ordering:
186235
let reqs = sort_push_down.plan.required_input_ordering();
236+
let dists = sort_push_down.plan.required_input_distribution();
237+
238+
// If this node already outputs single partition, don't push SinglePartition
239+
// requirement to children (they're below the merge point).
240+
let effective_parent_dist =
241+
if sort_push_down.plan.output_partitioning().partition_count() == 1 {
242+
Distribution::UnspecifiedDistribution
243+
} else {
244+
parent_distribution.clone()
245+
};
187246

188-
for (child, order) in sort_push_down.children.iter_mut().zip(reqs) {
247+
for (idx, (child, order)) in
248+
sort_push_down.children.iter_mut().zip(reqs).enumerate()
249+
{
189250
child.data.ordering_requirement = order;
190251
child.data.fetch = min_fetch(parent_fetch, child.data.fetch);
252+
child.data.distribution_requirement = stronger_distribution(
253+
&effective_parent_dist,
254+
dists
255+
.get(idx)
256+
.unwrap_or(&Distribution::UnspecifiedDistribution),
257+
);
191258
}
192259
} else if let Some(adjusted) = pushdown_requirement_to_children(
193260
&sort_push_down.plan,
194261
parent_requirement.clone(),
195262
parent_fetch,
196263
)? {
197264
// For operators that can take a sort pushdown, continue with updated
198-
// requirements:
265+
// requirements. If this node already outputs single partition (e.g. SPM),
266+
// don't push SinglePartition to children.
199267
let current_fetch = sort_push_down.plan.fetch();
200-
for (child, order) in sort_push_down.children.iter_mut().zip(adjusted) {
268+
let dists = sort_push_down.plan.required_input_distribution();
269+
let effective_dist =
270+
if sort_push_down.plan.output_partitioning().partition_count() == 1 {
271+
Distribution::UnspecifiedDistribution
272+
} else {
273+
parent_distribution.clone()
274+
};
275+
for (idx, (child, order)) in
276+
sort_push_down.children.iter_mut().zip(adjusted).enumerate()
277+
{
201278
child.data.ordering_requirement = order;
202279
child.data.fetch = min_fetch(current_fetch, parent_fetch);
280+
child.data.distribution_requirement = stronger_distribution(
281+
&effective_dist,
282+
dists
283+
.get(idx)
284+
.unwrap_or(&Distribution::UnspecifiedDistribution),
285+
);
203286
}
204287
sort_push_down.data.ordering_requirement = None;
205288
} else {
206-
// Can not push down requirements, add new `SortExec`:
207-
sort_push_down = add_sort_above(
289+
// Can not push down requirements, add new `SortExec` (distribution-aware):
290+
sort_push_down = add_sort_above_with_distribution(
208291
sort_push_down,
209292
parent_requirement.into_single(),
210293
parent_fetch,
294+
&parent_distribution,
211295
);
212296
assign_initial_requirements(&mut sort_push_down);
213297
}

0 commit comments

Comments
 (0)