Skip to content

Commit 007c82d

Browse files
authored
Merge pull request #61 from spiceai/jeadie/25-11-12/avoid-filter-federate
Avoid 'federation_sql_push_down_filter' correctly
2 parents 1bffb14 + 0ae4021 commit 007c82d

3 files changed

Lines changed: 33 additions & 24 deletions

File tree

datafusion-federation/src/analyzer/mod.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
mod scan_result;
22

3-
use crate::FederationProvider;
43
use crate::{FederatedTableProviderAdaptor, FederatedTableSource, FederationProviderRef};
4+
use crate::{FederationAnalyzerForLogicalPlan, FederationProvider};
55
use datafusion::logical_expr::{col, expr::InSubquery, LogicalPlanBuilder};
66
use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion;
77
use datafusion::optimizer::push_down_filter::PushDownFilter;
@@ -243,12 +243,12 @@ impl FederationAnalyzerRule {
243243
// The largest sub-plan is higher up.
244244
return Ok((None, ScanResult::Distinct(provider)));
245245
}
246-
(true, Some(analyzer)) => {
246+
(true, Some(FederationAnalyzerForLogicalPlan::With(analyzer))) => {
247247
// If this is the root plan node; federate the entire plan
248248
let optimized = analyzer.execute_and_check(plan.clone(), config, |_, _| {})?;
249249
return Ok((Some(optimized), ScanResult::None));
250250
}
251-
(_, None) => {
251+
(_, None | Some(FederationAnalyzerForLogicalPlan::Unable)) => {
252252
// Provider CAN'T federate this specific plan shape
253253
// Fall through to try federating children instead
254254
}
@@ -282,7 +282,9 @@ impl FederationAnalyzerRule {
282282
return Ok(original_input);
283283
};
284284

285-
let Some(analyzer) = provider.analyzer(&original_input) else {
285+
let Some(FederationAnalyzerForLogicalPlan::With(analyzer)) =
286+
provider.analyzer(&original_input)
287+
else {
286288
// Either provider has no analyzer, or cannot federate [`LogicalPlan`].
287289
return Ok(original_input);
288290
};
@@ -413,26 +415,19 @@ impl FederationAnalyzerRule {
413415
#[derive(Debug)]
414416
pub(crate) struct NopFederationProvider {}
415417

416-
pub static NOP_NAME: &str = "nop";
417-
418418
impl FederationProvider for NopFederationProvider {
419419
fn name(&self) -> &str {
420-
NOP_NAME
420+
"nop"
421421
}
422422

423423
fn compute_context(&self) -> Option<String> {
424424
None
425425
}
426426

427-
fn analyzer(&self, _plan: &LogicalPlan) -> Option<Arc<datafusion::optimizer::Analyzer>> {
427+
fn analyzer(&self, _plan: &LogicalPlan) -> Option<FederationAnalyzerForLogicalPlan> {
428428
None
429429
}
430430
}
431-
impl NopFederationProvider {
432-
pub fn is_nop(provider: Arc<dyn FederationProvider>) -> bool {
433-
provider.name() == NOP_NAME
434-
}
435-
}
436431

437432
/// Recursively find the [`FederationProvider`] for all [`TableReference`] instances in the plan.
438433
/// This is used to resolve the federation providers for [`Expr::OuterReferenceColumn`].
@@ -474,7 +469,7 @@ fn wrap_projection(plan: LogicalPlan) -> Result<LogicalPlan> {
474469
fn contains_federated_table(plan: &LogicalPlan) -> Result<bool> {
475470
let federated_table_exists = plan.exists(|x| {
476471
if let (Some(provider), _) = get_leaf_provider(x)? {
477-
return Ok(!NopFederationProvider::is_nop(provider));
472+
return Ok(provider.analyzer(plan).is_some());
478473
}
479474
Ok(false)
480475
})?;

datafusion-federation/src/lib.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ pub fn default_analyzer_rules() -> Vec<Arc<dyn AnalyzerRule + Send + Sync>> {
5252

5353
pub type FederationProviderRef = Arc<dyn FederationProvider>;
5454

55+
impl From<Arc<Analyzer>> for FederationAnalyzerForLogicalPlan {
56+
fn from(value: Arc<Analyzer>) -> Self {
57+
Self::With(value)
58+
}
59+
}
60+
5561
pub trait FederationProvider: Send + Sync + std::fmt::Debug {
5662
// Returns the name of the provider, used for comparison.
5763
fn name(&self) -> &str;
@@ -62,10 +68,18 @@ pub trait FederationProvider: Send + Sync + std::fmt::Debug {
6268

6369
// Returns an analyzer that can cut out, and federate part of the [`LogicalPlan`].
6470
//
65-
// Returns [`None`] if either the provider cannot federate any plan
66-
// (e.g. [`crate::analyzer::NopFederationProvider`]), or cannot federate
67-
// this specific [`LogicalPlan`].
68-
fn analyzer(&self, plan: &LogicalPlan) -> Option<Arc<Analyzer>>;
71+
// Returns:
72+
// - [`None`] if the provider cannot federate any plan (e.g. [`crate::analyzer::NopFederationProvider`]).
73+
// - Some(FederationAnalyzerForLogicalPlan::Unable) if the provider cannot federate this specific [`LogicalPlan`].
74+
fn analyzer(&self, plan: &LogicalPlan) -> Option<FederationAnalyzerForLogicalPlan>;
75+
}
76+
77+
/// [`LogicalPlan`] specific federation [`Analyzer`] from a [`FederationProvider`].
78+
#[derive(Debug)]
79+
pub enum FederationAnalyzerForLogicalPlan {
80+
/// The [`FederationProvider`] cannot federate the [`LogicalPlan`].
81+
Unable,
82+
With(Arc<Analyzer>),
6983
}
7084

7185
impl fmt::Display for dyn FederationProvider {

datafusion-federation/src/sql/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ pub use table::{RemoteTable, SQLTable, SQLTableSource};
3535
pub use table_reference::{MultiPartTableReference, RemoteTableRef};
3636

3737
use crate::{
38-
get_table_source, schema_cast, FederatedPlanNode, FederationAnalyzerRule, FederationPlanner,
39-
FederationProvider,
38+
get_table_source, schema_cast, FederatedPlanNode, FederationAnalyzerForLogicalPlan,
39+
FederationAnalyzerRule, FederationPlanner, FederationProvider,
4040
};
4141

4242
/// Returns a federation analyzer rule that is optimized for SQL federation.
@@ -75,11 +75,11 @@ impl FederationProvider for SQLFederationProvider {
7575
self.executor.compute_context()
7676
}
7777

78-
fn analyzer(&self, plan: &LogicalPlan) -> Option<Arc<Analyzer>> {
78+
fn analyzer(&self, plan: &LogicalPlan) -> Option<FederationAnalyzerForLogicalPlan> {
7979
if self.executor.can_execute_plan(plan) {
80-
Some(Arc::clone(&self.analyzer))
80+
Some(Arc::clone(&self.analyzer).into())
8181
} else {
82-
None
82+
Some(FederationAnalyzerForLogicalPlan::Unable)
8383
}
8484
}
8585
}
@@ -595,7 +595,7 @@ mod tests {
595595
// pushed down since it will be optimised into `Filter: table_a1.a > Int64(0)`.
596596
insta::assert_snapshot!(ctx
597597
.sql(
598-
r#"SELECT non_federate, b, c FROM (SELECT a AS 'non_federate', b, c FROM table_a1) WHERE non_federate > 0"#,
598+
r#"SELECT a as non_federate, b, c FROM (SELECT a, b, c FROM table_a1) WHERE a > 0"#,
599599
)
600600
.await?
601601
.into_optimized_plan()?

0 commit comments

Comments
 (0)