Skip to content

Commit 4b06546

Browse files
author
wiedld
committed
feat: example lexical range handling
1 parent 507f6b6 commit 4b06546

8 files changed

Lines changed: 2252 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/physical-optimizer/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ recursive_protection = ["dep:recursive"]
4040
[dependencies]
4141
arrow = { workspace = true }
4242
datafusion-common = { workspace = true, default-features = true }
43+
datafusion-datasource = { workspace = true }
4344
datafusion-execution = { workspace = true }
4445
datafusion-expr = { workspace = true }
4546
datafusion-expr-common = { workspace = true, default-features = true }
@@ -52,5 +53,6 @@ recursive = { workspace = true, optional = true }
5253

5354
[dev-dependencies]
5455
datafusion-expr = { workspace = true }
56+
datafusion-datasource-parquet = { workspace = true }
5557
datafusion-functions-nested = { workspace = true }
5658
insta = { workspace = true }

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub mod limit_pushdown;
3434
pub mod limited_distinct_aggregation;
3535
pub mod optimizer;
3636
pub mod output_requirements;
37+
pub mod progressive_evaluation;
3738
pub mod projection_pushdown;
3839
pub mod pruning;
3940
pub mod sanity_checker;
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! TODO: physical optimizer run to conditionally swap the SPM with the ProgressiveEvalExec
19+
20+
mod extract_ranges;
21+
mod lexical_ranges;
22+
mod statistics;
23+
mod util;
24+
25+
use itertools::Itertools;
26+
use std::sync::Arc;
27+
28+
use datafusion_common::{
29+
tree_node::{Transformed, TreeNode},
30+
Result,
31+
};
32+
use datafusion_physical_plan::{
33+
sorts::sort_preserving_merge::SortPreservingMergeExec, union::UnionExec,
34+
ExecutionPlan,
35+
};
36+
use extract_ranges::extract_disjoint_ranges_from_plan;
37+
use util::split_parquet_files;
38+
39+
use crate::PhysicalOptimizerRule;
40+
41+
#[allow(dead_code)]
42+
#[derive(Debug)]
43+
struct InsertProgressiveEval;
44+
45+
impl PhysicalOptimizerRule for InsertProgressiveEval {
46+
fn name(&self) -> &str {
47+
"TBD"
48+
}
49+
50+
fn schema_check(&self) -> bool {
51+
false
52+
}
53+
54+
fn optimize(
55+
&self,
56+
plan: Arc<dyn ExecutionPlan>,
57+
_config: &datafusion_common::config::ConfigOptions,
58+
) -> Result<Arc<dyn ExecutionPlan>> {
59+
plan.transform_up(|plan| {
60+
// Find SortPreservingMergeExec
61+
let Some(sort_preserving_merge_exec) =
62+
plan.as_any().downcast_ref::<SortPreservingMergeExec>()
63+
else {
64+
return Ok(Transformed::no(plan));
65+
};
66+
67+
// Split file groups to maximize potential disjoint ranges.
68+
let new_inputs: Vec<Arc<dyn ExecutionPlan>> = sort_preserving_merge_exec
69+
.children()
70+
.into_iter()
71+
.map(|spm_child| {
72+
Arc::clone(spm_child)
73+
.transform_down(|plan| {
74+
split_parquet_files(plan, sort_preserving_merge_exec.expr())
75+
})
76+
.map(|t| t.data)
77+
})
78+
.try_collect()?;
79+
let transformed_input_plan = Arc::new(UnionExec::new(new_inputs)) as _;
80+
81+
// try to extract the lexical ranges for the input partitions
82+
let Ok(Some(_lexical_ranges)) = extract_disjoint_ranges_from_plan(
83+
sort_preserving_merge_exec.expr(),
84+
&transformed_input_plan,
85+
) else {
86+
return Ok(Transformed::no(plan));
87+
};
88+
89+
// confirm we still have the ordering needed for the SPM
90+
assert!(transformed_input_plan
91+
.properties()
92+
.equivalence_properties()
93+
.ordering_satisfy(sort_preserving_merge_exec.expr()));
94+
95+
// Replace SortPreservingMergeExec with ProgressiveEvalExec
96+
// TODO: have the ProgressiveEvalExec perform that partition mapping
97+
// let progresive_eval_exec = Arc::new(ProgressiveEvalExec::new(
98+
// transformed_input_plan,
99+
// lexical_ranges,
100+
// sort_preserving_merge_exec.fetch(),
101+
// ));
102+
// Ok(Transformed::yes(progresive_eval_exec))
103+
104+
Ok(Transformed::no(plan))
105+
})
106+
.map(|t| t.data)
107+
}
108+
}

0 commit comments

Comments
 (0)