Skip to content

Commit a0a9090

Browse files
committed
fix: realign explain changes to spiceai-52
1 parent a8b4c1d commit a0a9090

26 files changed

Lines changed: 4079 additions & 664 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[workspace]
22
resolver = "2"
33

4-
members = ["datafusion-federation"]
4+
members = ["datafusion-federation", "integration-test"]
55

66
[workspace.package]
77
version = "0.4.2"

datafusion-federation/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ async-trait.workspace = true
2626
datafusion.workspace = true
2727
async-stream.workspace = true
2828
arrow-json.workspace = true
29+
tracing = "0.1"
2930

3031
[dev-dependencies]
3132
tokio.workspace = true
3233
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
33-
tracing = "0.1.40"
34+
insta = { version = "1.42.0", features = ["filters"] }
3435

3536
[[example]]
3637
name = "df-csv"

datafusion-federation/examples/df-csv-advanced.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ use datafusion::{
66
execution::{
77
context::SessionContext, options::CsvReadOptions, session_state::SessionStateBuilder,
88
},
9-
optimizer::Optimizer,
9+
optimizer::Analyzer,
1010
};
1111

1212
use datafusion_federation::{
1313
sql::{MultiSchemaProvider, SQLFederationProvider, SQLSchemaProvider},
14-
FederatedQueryPlanner, FederationOptimizerRule,
14+
FederatedQueryPlanner, FederationAnalyzerRule,
1515
};
1616

1717
use shared::{overwrite_default_schema, MockPostgresExecutor, MockSqliteExecutor};
@@ -115,15 +115,15 @@ async fn main() {
115115
/////////////////////
116116
// Main(local) DB
117117
/////////////////////
118-
// Get the default optimizer rules
119-
let mut rules = Optimizer::new().rules;
118+
// Get the default analyzer rules
119+
let mut rules = Analyzer::new().rules;
120120

121-
// Create a new federation optimizer rule and add it to the default rules
122-
rules.push(Arc::new(FederationOptimizerRule::new()));
121+
// Create a new federation analyzer rule and add it to the default rules
122+
rules.push(Arc::new(FederationAnalyzerRule::new()));
123123

124-
// Create a new SessionState with the optimizer rule we created above
124+
// Create a new SessionState with the analyzer rule we created above
125125
let state = SessionStateBuilder::new()
126-
.with_optimizer_rules(rules)
126+
.with_analyzer_rules(rules)
127127
.with_query_planner(Arc::new(FederatedQueryPlanner::new()))
128128
.build();
129129

datafusion-federation/src/optimizer/scan_result.rs renamed to datafusion-federation/src/analyzer/scan_result.rs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,37 @@
1-
use datafusion::common::tree_node::TreeNodeRecursion;
2-
31
use crate::FederationProviderRef;
2+
use datafusion::common::tree_node::TreeNodeRecursion;
3+
use datafusion::error::{DataFusionError, Result};
44

55
/// Used to track if all sources, including tableScan, plan inputs and
66
/// expressions, represents an un-ambiguous, none or a sole' [`crate::FederationProvider`].
77
pub enum ScanResult {
8+
/// [`LogicalPlan`] subtree has no [`FederationProvider`].
89
None,
10+
11+
/// There is a single [`FederationProvider`] in the [`LogicalPlan`] subtree.
912
Distinct(FederationProviderRef),
13+
14+
/// The [`LogicalPlan`] subtree has several different [`FederationProvider`]s.
1015
Ambiguous,
1116
}
1217

1318
impl ScanResult {
14-
pub fn merge(&mut self, other: Self) {
19+
pub fn merge(&mut self, other: impl Into<Self>) {
20+
let other = other.into();
1521
match (&self, &other) {
1622
(_, ScanResult::None) => {}
1723
(ScanResult::None, _) => *self = other,
1824
(ScanResult::Ambiguous, _) | (_, ScanResult::Ambiguous) => {
19-
*self = ScanResult::Ambiguous
25+
*self = ScanResult::Ambiguous;
2026
}
2127
(ScanResult::Distinct(provider), ScanResult::Distinct(other_provider)) => {
2228
if provider != other_provider {
23-
*self = ScanResult::Ambiguous
29+
*self = ScanResult::Ambiguous;
2430
}
2531
}
2632
}
2733
}
2834

29-
pub fn add(&mut self, provider: Option<FederationProviderRef>) {
30-
self.merge(ScanResult::from(provider))
31-
}
32-
3335
pub fn is_ambiguous(&self) -> bool {
3436
matches!(self, ScanResult::Ambiguous)
3537
}
@@ -41,11 +43,13 @@ impl ScanResult {
4143
!self.is_none()
4244
}
4345

44-
pub fn unwrap(self) -> Option<FederationProviderRef> {
46+
pub fn unwrap(self) -> Result<Option<FederationProviderRef>> {
4547
match self {
46-
ScanResult::None => None,
47-
ScanResult::Distinct(provider) => Some(provider),
48-
ScanResult::Ambiguous => panic!("called `ScanResult::unwrap()` on a `Ambiguous` value"),
48+
ScanResult::None => Ok(None),
49+
ScanResult::Distinct(provider) => Ok(Some(provider)),
50+
ScanResult::Ambiguous => Err(DataFusionError::External(
51+
"called `ScanResult::unwrap()` on a `Ambiguous` value".into(),
52+
)),
4953
}
5054
}
5155

@@ -77,6 +81,19 @@ impl PartialEq<Option<FederationProviderRef>> for ScanResult {
7781
}
7882
}
7983

84+
impl PartialEq for ScanResult {
85+
fn eq(&self, other: &Self) -> bool {
86+
match (self, other) {
87+
(ScanResult::None, ScanResult::None) => true,
88+
(ScanResult::Distinct(provider1), ScanResult::Distinct(provider2)) => {
89+
provider1 == provider2
90+
}
91+
(ScanResult::Ambiguous, ScanResult::Ambiguous) => true,
92+
_ => false,
93+
}
94+
}
95+
}
96+
8097
impl std::fmt::Debug for ScanResult {
8198
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
8299
match self {

datafusion-federation/src/lib.rs

Lines changed: 69 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
mod optimizer;
1+
mod analyzer;
22
mod plan_node;
33
pub mod schema_cast;
44
#[cfg(feature = "sql")]
@@ -13,58 +13,73 @@ use std::{
1313

1414
use datafusion::{
1515
execution::session_state::{SessionState, SessionStateBuilder},
16-
optimizer::{optimizer::Optimizer, OptimizerRule},
16+
logical_expr::LogicalPlan,
17+
optimizer::{
18+
analyzer::{
19+
resolve_grouping_function::ResolveGroupingFunction, type_coercion::TypeCoercion,
20+
},
21+
Analyzer, AnalyzerRule,
22+
},
1723
};
1824

19-
pub use optimizer::{get_table_source, FederationOptimizerRule};
25+
pub use analyzer::{get_table_source, FederationAnalyzerRule};
2026
pub use plan_node::{
2127
FederatedPlanNode, FederatedPlanner, FederatedQueryPlanner, FederationPlanner,
2228
};
2329
pub use table_provider::{FederatedTableProviderAdaptor, FederatedTableSource};
2430

2531
pub fn default_session_state() -> SessionState {
26-
let rules = default_optimizer_rules();
32+
let rules = default_analyzer_rules();
2733
SessionStateBuilder::new()
28-
.with_optimizer_rules(rules)
34+
.with_analyzer_rules(rules)
2935
.with_query_planner(Arc::new(FederatedQueryPlanner::new()))
3036
.with_default_features()
3137
.build()
3238
}
3339

34-
pub fn default_optimizer_rules() -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
35-
// Get the default optimizer
36-
let df_default = Optimizer::new();
37-
let mut default_rules = df_default.rules;
38-
39-
// Insert the FederationOptimizerRule after the ScalarSubqueryToJoin.
40-
// This ensures ScalarSubquery are replaced before we try to federate.
41-
let Some(pos) = default_rules
42-
.iter()
43-
.position(|x| x.name() == "scalar_subquery_to_join")
44-
else {
45-
panic!("Could not locate ScalarSubqueryToJoin");
46-
};
47-
48-
// TODO: check if we should allow other optimizers to run before the federation rule.
40+
/// datafusion-federation customizes the order of the analyzer rules, since some of them are only relevant when `DataFusion` is executing the query,
41+
/// as opposed to when underlying federated query engines will execute the query.
42+
///
43+
/// This list should be kept in sync with the default rules in `Analyzer::new()`, but with the federation analyzer rule added.
44+
pub fn default_analyzer_rules() -> Vec<Arc<dyn AnalyzerRule + Send + Sync>> {
45+
vec![
46+
Arc::new(FederationAnalyzerRule::new()),
47+
// The rest of these rules are run after the federation analyzer since they only affect internal DataFusion execution.
48+
Arc::new(ResolveGroupingFunction::new()),
49+
Arc::new(TypeCoercion::new()),
50+
]
51+
}
4952

50-
let federation_rule = Arc::new(FederationOptimizerRule::new());
51-
default_rules.insert(pos + 1, federation_rule);
53+
pub type FederationProviderRef = Arc<dyn FederationProvider>;
5254

53-
default_rules
55+
impl From<Arc<Analyzer>> for FederationAnalyzerForLogicalPlan {
56+
fn from(value: Arc<Analyzer>) -> Self {
57+
Self::With(value)
58+
}
5459
}
5560

56-
pub type FederationProviderRef = Arc<dyn FederationProvider>;
57-
pub trait FederationProvider: Send + Sync {
61+
pub trait FederationProvider: Send + Sync + std::fmt::Debug {
5862
// Returns the name of the provider, used for comparison.
5963
fn name(&self) -> &str;
6064

6165
// Returns the compute context in which this federation provider
6266
// will execute a query. For example: database instance & catalog.
6367
fn compute_context(&self) -> Option<String>;
6468

65-
// Returns an optimizer that can cut out part of the plan
66-
// to federate it.
67-
fn optimizer(&self) -> Option<Arc<Optimizer>>;
69+
// Returns an analyzer that can cut out, and federate part of the [`LogicalPlan`].
70+
//
71+
// Returns:
72+
// - [`None`] if the provider cannot federate any plan (e.g. [`crate::analyzer::NopFederationProvider`]).
73+
// - Some(FederationAnalyzerForLogicalPlan::Unable) if the provider cannot federate this specific [`LogicalPlan`].
74+
fn analyzer(&self, plan: &LogicalPlan) -> Option<FederationAnalyzerForLogicalPlan>;
75+
}
76+
77+
/// [`LogicalPlan`] specific federation [`Analyzer`] from a [`FederationProvider`].
78+
#[derive(Debug)]
79+
pub enum FederationAnalyzerForLogicalPlan {
80+
/// The [`FederationProvider`] cannot federate the [`LogicalPlan`].
81+
Unable,
82+
With(Arc<Analyzer>),
6883
}
6984

7085
impl fmt::Display for dyn FederationProvider {
@@ -88,3 +103,29 @@ impl Hash for dyn FederationProvider {
88103
}
89104

90105
impl Eq for dyn FederationProvider {}
106+
107+
#[cfg(test)]
108+
mod tests {
109+
use datafusion::optimizer::Analyzer;
110+
111+
/// Verifies that the default analyzer rules are in the expected order.
112+
///
113+
/// If this test fails, `DataFusion` has modified the default analyzer rules and `get_analyzer_rules()` should be updated.
114+
#[test]
115+
fn test_verify_default_analyzer_rules() {
116+
let default_rules = Analyzer::new().rules;
117+
assert_eq!(
118+
default_rules.len(),
119+
2,
120+
"Default analyzer rules have changed"
121+
);
122+
let expected_rule_names = vec!["resolve_grouping_function", "type_coercion"];
123+
for (rule, expected_name) in default_rules.iter().zip(expected_rule_names.into_iter()) {
124+
assert_eq!(
125+
expected_name,
126+
rule.name(),
127+
"Default analyzer rule order has changed"
128+
);
129+
}
130+
}
131+
}

0 commit comments

Comments
 (0)