Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions datafusion/core/src/optimizer_rule_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,19 @@ in multiple phases.
| 3 | `join_selection` | - | Chooses join implementation, build side, and partition mode from statistics and stream properties. |
| 4 | `LimitedDistinctAggregation` | - | Pushes limit hints into grouped distinct-style aggregations when only a small result is needed. |
| 5 | `FilterPushdown` | pre-optimization phase | Pushes supported physical filters down toward data sources before distribution and sorting are enforced. |
| 6 | `EnforceDistribution` | - | Adds repartitioning only where needed to satisfy physical distribution requirements. |
| 6 | `EnsureRequirements` | - | Enforces both distribution and sorting requirements in a single idempotent rule (replaces EnforceDistribution + EnforceSorting). |
| 7 | `CombinePartialFinalAggregate` | - | Collapses adjacent partial and final aggregates when the distributed shape makes them redundant. |
| 8 | `EnforceSorting` | - | Adds or removes local sorts to satisfy required input orderings. |
| 9 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
| 10 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
| 11 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
| 12 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
| 13 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
| 14 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
| 15 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
| 16 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
| 17 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
| 18 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
| 19 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
| 20 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
| 21 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
| 22 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
| 8 | `OptimizeAggregateOrder` | - | Updates aggregate expressions to use the best ordering once sort requirements are known. |
| 9 | `WindowTopN` | - | Replaces eligible row-number window and filter patterns with per-partition TopK execution. |
| 10 | `ProjectionPushdown` | early pass | Pushes projections toward inputs before later physical rewrites add more limit and TopK structure. |
| 11 | `OutputRequirements` | remove phase | Removes the temporary output-requirement helper nodes after requirement-sensitive planning is done. |
| 12 | `LimitAggregation` | - | Passes a limit hint into eligible aggregations so they can keep fewer accumulator buckets. |
| 13 | `LimitPushPastWindows` | - | Pushes fetch limits through bounded window operators when doing so keeps the result correct. |
| 14 | `HashJoinBuffering` | - | Adds buffering on the probe side of hash joins so probing can start before build completion. |
| 15 | `LimitPushdown` | - | Moves physical limits into child operators or fetch-enabled variants to cut data early. |
| 16 | `TopKRepartition` | - | Pushes TopK below hash repartition when the partition key is a prefix of the sort key. |
| 17 | `ProjectionPushdown` | late pass | Runs projection pushdown again after limit and TopK rewrites expose new pruning opportunities. |
| 18 | `PushdownSort` | - | Pushes sort requirements into data sources that can already return sorted output. |
| 19 | `EnsureCooperative` | - | Wraps non-cooperative plan parts so long-running tasks yield fairly. |
| 20 | `FilterPushdown(Post)` | post-optimization phase | Pushes dynamic filters at the end of optimization, after plan references stop moving. |
| 21 | `SanityCheckPlan` | - | Validates that the final physical plan meets ordering, distribution, and infinite-input safety requirements. |
65 changes: 50 additions & 15 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,10 @@ fn preserving_order_enables_streaming(
///
/// Updated node with an execution plan, where the desired single distribution
/// requirement is satisfied.
fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
fn add_merge_on_top(
input: DistributionContext,
fetch: Option<usize>,
) -> DistributionContext {
// Apply only when the partition count is larger than one.
if input.plan.output_partitioning().partition_count() > 1 {
// When there is an existing ordering, we preserve ordering
Expand All @@ -979,14 +982,20 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.prefer_existing_sort`)
let new_plan = if let Some(req) = input.plan.output_ordering() {
Arc::new(SortPreservingMergeExec::new(
req.clone(),
Arc::clone(&input.plan),
)) as _
let new_plan: Arc<dyn ExecutionPlan> = if let Some(req) =
input.plan.output_ordering()
{
let mut spm =
SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan));
if let Some(f) = fetch {
spm = spm.with_fetch(Some(f));
}
Arc::new(spm)
} else {
// If there is no input order, we can simply coalesce partitions:
Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
Arc::new(
CoalescePartitionsExec::new(Arc::clone(&input.plan)).with_fetch(fetch),
)
};

DistributionContext::new(new_plan, true, vec![input])
Expand All @@ -1012,20 +1021,41 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
/// ```text
/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
/// ```
/// Returned by [`remove_dist_changing_operators`] to carry the fetch value
/// that may have been on a removed `SortPreservingMergeExec` or `CoalescePartitionsExec`.
struct RemovedDistOps {
context: DistributionContext,
/// The fetch value from the removed SPM/Coalesce, if any.
/// Must be re-applied when distribution operators are re-inserted.
removed_fetch: Option<usize>,
}

fn remove_dist_changing_operators(
mut distribution_context: DistributionContext,
) -> Result<DistributionContext> {
) -> Result<RemovedDistOps> {
let mut removed_fetch = None;
while is_repartition(&distribution_context.plan)
|| is_coalesce_partitions(&distribution_context.plan)
|| is_sort_preserving_merge(&distribution_context.plan)
{
// Preserve fetch from SPM or CoalescePartitions before removing (#14150).
if let Some(fetch) = distribution_context.plan.fetch() {
removed_fetch = Some(
removed_fetch
.map(|existing: usize| existing.min(fetch))
.unwrap_or(fetch),
);
}
// All of above operators have a single child. First child is only child.
// Remove any distribution changing operators at the beginning:
distribution_context = distribution_context.children.swap_remove(0);
// Note that they will be re-inserted later on if necessary or helpful.
}

Ok(distribution_context)
Ok(RemovedDistOps {
context: distribution_context,
removed_fetch,
})
}

/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
Expand Down Expand Up @@ -1219,11 +1249,16 @@ pub fn ensure_distribution(
let order_preserving_variants_desirable =
unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;

// Remove unnecessary repartition from the physical plan if any
let DistributionContext {
mut plan,
data,
children,
// Remove unnecessary repartition from the physical plan if any.
// Preserve fetch from removed SPM/Coalesce (#14150).
let RemovedDistOps {
context:
DistributionContext {
mut plan,
data,
children,
},
removed_fetch,
} = remove_dist_changing_operators(dist_context)?;

if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
Expand Down Expand Up @@ -1359,7 +1394,7 @@ pub fn ensure_distribution(
// Satisfy the distribution requirement if it is unmet.
match &requirement {
Distribution::SinglePartition => {
child = add_merge_on_top(child);
child = add_merge_on_top(child, removed_fetch);
}
Distribution::HashPartitioned(exprs) => {
// See https://github.com/apache/datafusion/issues/18341#issuecomment-3503238325 for background
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl PhysicalOptimizerRule for EnforceSorting {
/// If the plan is not a [`SortExec`] or its child is not unbounded, returns the original plan.
/// Otherwise, by checking the requirement satisfaction searches for a replacement chance.
/// If there's one replaces the [`SortExec`] plan with a [`PartialSortExec`]
fn replace_with_partial_sort(
pub fn replace_with_partial_sort(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let Some(sort_plan) = plan.downcast_ref::<SortExec>() else {
Expand Down
Loading
Loading