Skip to content

Commit f7d9156

Browse files
committed
feat: reorder row groups by statistics for TopK queries
When a parquet file has multiple row groups with out-of-order or overlapping statistics, TopK queries benefit from reading "best" row groups first so the dynamic filter threshold tightens quickly. This PR adds: 1. `reorder_by_statistics`: sorts row groups by min values (ASC) based on parquet column statistics. Direction (DESC) is handled by the existing `reverse()` applied after reorder. The two steps compose: - Sorted data: reorder is a no-op, reverse gives perfect DESC order - Unsorted data: reorder fixes the order, reverse flips for DESC 2. `AccessPlanOptimizer` trait: extensible interface for row group access plan optimizations (reorder, reverse) applied after pruning. 3. `DynamicFilterPhysicalExpr.sort_options/fetch`: SortExec now passes sort direction and fetch limit to the dynamic filter, enabling the parquet reader to determine reorder direction for any TopK query. 4. `FileSource::reorder_files`: file-level reordering in the shared work queue so multi-file TopK reads the most promising files first. 5. Fix `SortExec::with_fetch` ordering: fetch must be set before `create_filter()` so the DynamicFilter gets the correct K value.
1 parent 0144570 commit f7d9156

11 files changed

Lines changed: 1028 additions & 18 deletions

File tree

datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,23 @@ async fn test_fuzz_topk_filter_pushdown() {
317317
.map(|col| orders.get(**col).unwrap())
318318
.multi_cartesian_product()
319319
{
320+
// Add remaining columns as tiebreakers (ASC NULLS LAST)
321+
// to ensure deterministic results when RG reorder changes
322+
// the read order of rows with equal sort key values.
323+
let tiebreakers: Vec<String> = ["id", "name", "department"]
324+
.iter()
325+
.filter(|c| {
326+
!order_columns
327+
.iter()
328+
.take(num_order_by_columns)
329+
.any(|oc| **oc == **c)
330+
})
331+
.map(|c| format!("{c} ASC NULLS LAST"))
332+
.collect();
333+
let all_orderings =
334+
orderings.into_iter().chain(tiebreakers.iter()).join(", ");
320335
let query = format!(
321-
"SELECT * FROM test_table ORDER BY {} LIMIT {}",
322-
orderings.into_iter().join(", "),
323-
limit
336+
"SELECT * FROM test_table ORDER BY {all_orderings} LIMIT {limit}",
324337
);
325338
queries.push(query);
326339
}

datafusion/datasource-parquet/src/access_plan.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616
// under the License.
1717

1818
use crate::sort::reverse_row_selection;
19+
use arrow::datatypes::Schema;
1920
use datafusion_common::{Result, assert_eq_or_internal_err};
21+
use datafusion_physical_expr::expressions::Column;
22+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
23+
use log::debug;
24+
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
2025
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
2126
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
2227

@@ -377,6 +382,98 @@ impl PreparedAccessPlan {
377382
})
378383
}
379384

385+
/// Reorder row groups by their min statistics for the given sort order.
386+
///
387+
/// This helps TopK queries find optimal values first. Row groups are
388+
/// always sorted by min values in ASC order — direction (DESC) is
389+
/// handled separately by `reverse()` which is applied after reorder.
390+
///
391+
/// Gracefully skips reordering when:
392+
/// - There is a row_selection (too complex to remap)
393+
/// - 0 or 1 row groups (nothing to reorder)
394+
/// - Sort expression is not a simple column reference
395+
/// - Statistics are unavailable
396+
pub(crate) fn reorder_by_statistics(
397+
mut self,
398+
sort_order: &LexOrdering,
399+
file_metadata: &ParquetMetaData,
400+
arrow_schema: &Schema,
401+
) -> Result<Self> {
402+
// Skip if row_selection present (too complex to remap)
403+
if self.row_selection.is_some() {
404+
debug!("Skipping RG reorder: row_selection present");
405+
return Ok(self);
406+
}
407+
408+
// Nothing to reorder
409+
if self.row_group_indexes.len() <= 1 {
410+
return Ok(self);
411+
}
412+
413+
let first_sort_expr = sort_order.first();
414+
415+
// Extract column name from sort expression
416+
let column: &Column = match first_sort_expr.expr.downcast_ref::<Column>() {
417+
Some(col) => col,
418+
None => {
419+
debug!("Skipping RG reorder: sort expr is not a simple column");
420+
return Ok(self);
421+
}
422+
};
423+
424+
// Build statistics converter for this column
425+
let converter = match StatisticsConverter::try_new(
426+
column.name(),
427+
arrow_schema,
428+
file_metadata.file_metadata().schema_descr(),
429+
) {
430+
Ok(c) => c,
431+
Err(e) => {
432+
debug!("Skipping RG reorder: cannot create stats converter: {e}");
433+
return Ok(self);
434+
}
435+
};
436+
437+
// Always sort ASC by min values — direction is handled by reverse
438+
let rg_metadata: Vec<&RowGroupMetaData> = self
439+
.row_group_indexes
440+
.iter()
441+
.map(|&idx| file_metadata.row_group(idx))
442+
.collect();
443+
444+
let stat_values = match converter.row_group_mins(rg_metadata.iter().copied()) {
445+
Ok(vals) => vals,
446+
Err(e) => {
447+
debug!("Skipping RG reorder: cannot get min values: {e}");
448+
return Ok(self);
449+
}
450+
};
451+
452+
let sort_options = arrow::compute::SortOptions {
453+
descending: false,
454+
nulls_first: first_sort_expr.options.nulls_first,
455+
};
456+
let sorted_indices =
457+
match arrow::compute::sort_to_indices(&stat_values, Some(sort_options), None)
458+
{
459+
Ok(indices) => indices,
460+
Err(e) => {
461+
debug!("Skipping RG reorder: sort failed: {e}");
462+
return Ok(self);
463+
}
464+
};
465+
466+
// Apply the reordering
467+
let original_indexes = self.row_group_indexes.clone();
468+
self.row_group_indexes = sorted_indices
469+
.values()
470+
.iter()
471+
.map(|&i| original_indexes[i as usize])
472+
.collect();
473+
474+
Ok(self)
475+
}
476+
380477
/// Reverse the access plan for reverse scanning
381478
pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result<Self> {
382479
// Get the row group indexes before reversing
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! [`AccessPlanOptimizer`] trait and implementations for optimizing
19+
//! row group access order during parquet scans.
20+
//!
21+
//! Applied after row group pruning but before building the decoder,
22+
//! these optimizers reorder (or reverse) the row groups to improve
23+
//! query performance — e.g., placing the "best" row groups first
24+
//! so TopK's dynamic filter threshold tightens quickly.
25+
26+
use crate::access_plan::PreparedAccessPlan;
27+
use arrow::datatypes::Schema;
28+
use datafusion_common::Result;
29+
use datafusion_physical_expr_common::sort_expr::LexOrdering;
30+
use parquet::file::metadata::ParquetMetaData;
31+
use std::fmt::Debug;
32+
33+
/// Optimizes the row group access order for a prepared access plan.
34+
///
35+
/// Implementations can reorder, reverse, or otherwise transform the
36+
/// row group read order to improve scan performance. The optimizer
37+
/// is applied once per file, after all pruning passes are complete.
38+
///
39+
/// # Examples
40+
///
41+
/// - [`ReverseRowGroups`]: simple O(n) reversal for DESC on ASC-sorted data
42+
/// - [`ReorderByStatistics`]: sort row groups by min/max statistics
43+
/// so TopK queries find optimal values first
44+
pub(crate) trait AccessPlanOptimizer: Send + Sync + Debug {
45+
/// Transform the prepared access plan.
46+
///
47+
/// Implementations should return the plan unchanged if they cannot
48+
/// apply their optimization (e.g., missing statistics).
49+
fn optimize(
50+
&self,
51+
plan: PreparedAccessPlan,
52+
file_metadata: &ParquetMetaData,
53+
arrow_schema: &Schema,
54+
) -> Result<PreparedAccessPlan>;
55+
}
56+
57+
/// Reverse the row group order — simple O(n) reversal.
58+
///
59+
/// Used as a fallback when the sort column has no statistics available.
60+
/// For ASC-sorted files with a DESC query, reversing row groups places
61+
/// the highest-value row groups first.
62+
#[derive(Debug)]
63+
pub(crate) struct ReverseRowGroups;
64+
65+
impl AccessPlanOptimizer for ReverseRowGroups {
66+
fn optimize(
67+
&self,
68+
plan: PreparedAccessPlan,
69+
file_metadata: &ParquetMetaData,
70+
_arrow_schema: &Schema,
71+
) -> Result<PreparedAccessPlan> {
72+
plan.reverse(file_metadata)
73+
}
74+
}
75+
76+
/// Reorder row groups by min/max statistics of the sort column.
77+
///
78+
/// Row groups are always sorted by min values in ASC order. Direction
79+
/// (DESC) is handled separately by [`ReverseRowGroups`] applied after.
80+
///
81+
/// This is more effective than [`ReverseRowGroups`] alone when row groups
82+
/// are out of order (e.g., append-heavy workloads), because it uses
83+
/// actual statistics rather than assuming the original order is sorted.
84+
///
85+
/// Gracefully falls back to the original order when statistics are
86+
/// unavailable, the sort expression is not a simple column, etc.
87+
#[derive(Debug)]
88+
pub(crate) struct ReorderByStatistics {
89+
sort_order: LexOrdering,
90+
}
91+
92+
impl ReorderByStatistics {
93+
pub(crate) fn new(sort_order: LexOrdering) -> Self {
94+
Self { sort_order }
95+
}
96+
}
97+
98+
impl AccessPlanOptimizer for ReorderByStatistics {
99+
fn optimize(
100+
&self,
101+
plan: PreparedAccessPlan,
102+
file_metadata: &ParquetMetaData,
103+
arrow_schema: &Schema,
104+
) -> Result<PreparedAccessPlan> {
105+
plan.reorder_by_statistics(&self.sort_order, file_metadata, arrow_schema)
106+
}
107+
}

datafusion/datasource-parquet/src/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#![cfg_attr(test, allow(clippy::needless_pass_by_value))]
2626

2727
pub mod access_plan;
28+
mod access_plan_optimizer;
2829
pub mod file_format;
2930
pub mod metadata;
3031
mod metrics;

0 commit comments

Comments
 (0)