Skip to content

Commit 062c147

Browse files
Fix for all TPCH queries
1 parent 955bef7 commit 062c147

10 files changed

Lines changed: 147 additions & 57 deletions

File tree

datafusion-federation/src/analyzer/mod.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{FederatedTableProviderAdaptor, FederatedTableSource, FederationProvi
55
use datafusion::logical_expr::{col, expr::InSubquery, LogicalPlanBuilder};
66
use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion;
77
use datafusion::optimizer::push_down_filter::PushDownFilter;
8-
use datafusion::optimizer::{Optimizer, OptimizerContext};
8+
use datafusion::optimizer::{Optimizer, OptimizerContext, OptimizerRule};
99
use datafusion::{
1010
common::tree_node::{Transformed, TreeNode, TreeNodeRecursion},
1111
config::ConfigOptions,
@@ -19,10 +19,6 @@ use scan_result::ScanResult;
1919
use std::collections::HashMap;
2020
use std::sync::Arc;
2121

22-
pub use optimize_projections::OptimizeProjections;
23-
24-
mod optimize_projections;
25-
2622
/// An analyzer rule to identifying sub-plans to federate
2723
///
2824
/// The analyzer logic walks over the plan, look for the largest subtrees that only have
@@ -65,11 +61,7 @@ impl AnalyzerRule for FederationAnalyzerRule {
6561
impl Default for FederationAnalyzerRule {
6662
fn default() -> Self {
6763
Self {
68-
optimizer: Optimizer::with_rules(vec![
69-
Arc::new(EliminateNestedUnion::new()),
70-
Arc::new(PushDownFilter::new()),
71-
Arc::new(OptimizeProjections::new()),
72-
]),
64+
optimizer: Optimizer::with_rules(Self::default_optimizer_rules()),
7365
}
7466
}
7567
}
@@ -79,6 +71,13 @@ impl FederationAnalyzerRule {
7971
Self::default()
8072
}
8173

74+
pub fn default_optimizer_rules() -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
75+
vec![
76+
Arc::new(EliminateNestedUnion::new()),
77+
Arc::new(PushDownFilter::new()),
78+
]
79+
}
80+
8281
/// Override the default optimizer with custom rules
8382
pub fn with_optimizer(mut self, optimizer: Optimizer) -> Self {
8483
self.optimizer = optimizer;

datafusion-federation/src/sql/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod analyzer;
22
pub mod ast_analyzer;
33
mod executor;
4+
pub mod optimizer;
45
mod schema;
56
mod table;
67
mod table_reference;
@@ -17,7 +18,7 @@ use datafusion::{
1718
error::{DataFusionError, Result},
1819
execution::{context::SessionState, TaskContext},
1920
logical_expr::{Extension, LogicalPlan},
20-
optimizer::{Analyzer, AnalyzerRule},
21+
optimizer::{eliminate_nested_union::EliminateNestedUnion, Analyzer, AnalyzerRule, Optimizer},
2122
physical_expr::EquivalenceProperties,
2223
physical_plan::{
2324
execution_plan::{Boundedness, EmissionType},
@@ -26,16 +27,27 @@ use datafusion::{
2627
},
2728
sql::{sqlparser::ast::Statement, unparser::Unparser},
2829
};
30+
use optimizer::{OptimizeProjectionsFederation, PushDownFilterFederation};
2931

3032
pub use executor::{LogicalOptimizer, SQLExecutor, SQLExecutorRef};
3133
pub use schema::{MultiSchemaProvider, SQLSchemaProvider};
3234
pub use table::{RemoteTable, SQLTableSource};
3335
pub use table_reference::RemoteTableRef;
3436

3537
use crate::{
36-
get_table_source, schema_cast, FederatedPlanNode, FederationPlanner, FederationProvider,
38+
get_table_source, schema_cast, FederatedPlanNode, FederationAnalyzerRule, FederationPlanner,
39+
FederationProvider,
3740
};
3841

42+
/// Returns a federation analyzer rule that is optimized for SQL federation.
43+
pub fn federation_analyzer_rule() -> FederationAnalyzerRule {
44+
FederationAnalyzerRule::new().with_optimizer(Optimizer::with_rules(vec![
45+
Arc::new(EliminateNestedUnion::new()),
46+
Arc::new(PushDownFilterFederation::new()),
47+
Arc::new(OptimizeProjectionsFederation::new()),
48+
]))
49+
}
50+
3951
// SQLFederationProvider provides federation to SQL DMBSs.
4052
#[derive(Debug)]
4153
pub struct SQLFederationProvider {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
//! A collection of optimizer rules that benefit SQL federation.
2+
3+
mod push_down_filter;
4+
pub use push_down_filter::PushDownFilterFederation;
5+
6+
mod optimize_projections;
7+
pub use optimize_projections::OptimizeProjectionsFederation;

datafusion-federation/src/analyzer/optimize_projections/mod.rs renamed to datafusion-federation/src/sql/optimizer/optimize_projections/mod.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,16 @@ type Result<T, E = DataFusionError> = result::Result<T, E>;
4545
mod required_indices;
4646

4747
#[derive(Default, Debug)]
48-
pub struct OptimizeProjections {}
48+
pub struct OptimizeProjectionsFederation {}
4949

50-
impl OptimizeProjections {
50+
impl OptimizeProjectionsFederation {
5151
#[must_use]
5252
pub fn new() -> Self {
5353
Self {}
5454
}
5555
}
5656

57-
impl OptimizerRule for OptimizeProjections {
57+
impl OptimizerRule for OptimizeProjectionsFederation {
5858
fn name(&self) -> &str {
5959
"federation_optimize_projections"
6060
}
@@ -947,7 +947,11 @@ mod tests {
947947
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
948948

949949
fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
950-
assert_optimized_plan_eq(Arc::new(super::OptimizeProjections::new()), plan, expected)
950+
assert_optimized_plan_eq(
951+
Arc::new(super::OptimizeProjectionsFederation::new()),
952+
plan,
953+
expected,
954+
)
951955
}
952956

953957
pub fn assert_optimized_plan_eq(
@@ -967,7 +971,8 @@ mod tests {
967971
}
968972

969973
fn optimize(plan: LogicalPlan) -> Result<LogicalPlan> {
970-
let optimizer = Optimizer::with_rules(vec![Arc::new(super::OptimizeProjections::new())]);
974+
let optimizer =
975+
Optimizer::with_rules(vec![Arc::new(super::OptimizeProjectionsFederation::new())]);
971976
let optimized_plan = optimizer.optimize(plan, &OptimizerContext::new(), observe)?;
972977
Ok(optimized_plan)
973978
}

datafusion-federation/src/analyzer/optimize_projections/required_indices.rs renamed to datafusion-federation/src/sql/optimizer/optimize_projections/required_indices.rs

File renamed without changes.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use datafusion::{
2+
common::{error::Result, tree_node::Transformed},
3+
logical_expr::LogicalPlan,
4+
optimizer::{push_down_filter::PushDownFilter, ApplyOrder, OptimizerConfig, OptimizerRule},
5+
};
6+
7+
/// A wrapper around DataFusion's [`PushDownFilter`](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/push_down_filter.rs) rule.
8+
///
9+
/// This wrapper prevents running the rule in a way that would break the SQL unparser, i.e. pushing a filter beyond a SubqueryAlias.
10+
#[derive(Default, Debug)]
11+
pub struct PushDownFilterFederation {
12+
inner: PushDownFilter,
13+
}
14+
15+
impl PushDownFilterFederation {
16+
#[must_use]
17+
pub fn new() -> Self {
18+
Self::default()
19+
}
20+
}
21+
22+
impl OptimizerRule for PushDownFilterFederation {
23+
fn name(&self) -> &str {
24+
"federation_sql_push_down_filter"
25+
}
26+
27+
fn apply_order(&self) -> Option<ApplyOrder> {
28+
self.inner.apply_order()
29+
}
30+
31+
fn supports_rewrite(&self) -> bool {
32+
self.inner.supports_rewrite()
33+
}
34+
35+
fn rewrite(
36+
&self,
37+
plan: LogicalPlan,
38+
config: &dyn OptimizerConfig,
39+
) -> Result<Transformed<LogicalPlan>> {
40+
if !should_run_rule_for_node(&plan) {
41+
return Ok(Transformed::no(plan));
42+
}
43+
44+
self.inner.rewrite(plan, config)
45+
}
46+
}
47+
48+
fn should_run_rule_for_node(node: &LogicalPlan) -> bool {
49+
if let LogicalPlan::Filter(x) = node {
50+
// Applying the `push_down_filter_rule` to certain nodes like `SubqueryAlias`, `Aggregate`, and `CrossJoin`
51+
// can cause issues during unparsing, thus the optimization is only applied to nodes that are currently supported.
52+
matches!(
53+
x.input.as_ref(),
54+
LogicalPlan::Join(_)
55+
| LogicalPlan::TableScan(_)
56+
| LogicalPlan::Projection(_)
57+
| LogicalPlan::Filter(_)
58+
| LogicalPlan::Distinct(_)
59+
| LogicalPlan::Sort(_)
60+
)
61+
} else {
62+
true
63+
}
64+
}

0 commit comments

Comments
 (0)