Skip to content

Commit 8193217

Browse files
committed
Address PR review comments and fix clippy/test issues
- Fix redundant clone in handle_child_pushdown_result - Fix missing cannot_federate field in TestExecutor instantiation - Fix SqlRewriteTable::table_reference return type to MultiPartTableReference - Prefix unused sql_query_rewriters with _ in DisplayAs - Propagate errors from physical_plan.apply in rewriter test - Update rewrite_table_scan.rs execute signature to include filters param - Remove unused std::clone::Clone import in schema_cast/mod.rs - Remove unused sqlparser::ast import in executor.rs - Replace deprecated EliminateNestedUnion with OptimizeUnions - Update test expectations for DataFusion v52.3.0 SQL unparser behavior
1 parent 4361384 commit 8193217

5 files changed

Lines changed: 19 additions & 14 deletions

File tree

datafusion-federation/src/analyzer/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ mod scan_result;
33
use crate::{FederatedTableProviderAdaptor, FederatedTableSource, FederationProviderRef};
44
use crate::{FederationAnalyzerForLogicalPlan, FederationProvider};
55
use datafusion::logical_expr::{col, expr::InSubquery, LogicalPlanBuilder};
6-
use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion;
6+
use datafusion::optimizer::optimize_unions::OptimizeUnions;
77
use datafusion::optimizer::push_down_filter::PushDownFilter;
88
use datafusion::optimizer::{Optimizer, OptimizerContext, OptimizerRule};
99
use datafusion::{
@@ -74,7 +74,7 @@ impl FederationAnalyzerRule {
7474

7575
pub fn default_optimizer_rules() -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
7676
vec![
77-
Arc::new(EliminateNestedUnion::new()),
77+
Arc::new(OptimizeUnions::new()),
7878
Arc::new(PushDownFilter::new()),
7979
]
8080
}

datafusion-federation/src/schema_cast/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use datafusion::physical_plan::{
1313
};
1414
use futures::StreamExt;
1515
use std::any::Any;
16-
use std::clone::Clone;
1716
use std::fmt;
1817
use std::sync::Arc;
1918

datafusion-federation/src/sql/analyzer/rewrite_table_scan.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ mod tests {
490490
use async_trait::async_trait;
491491
use datafusion::arrow::datatypes::{Schema, SchemaRef};
492492
use datafusion::execution::SendableRecordBatchStream;
493+
use datafusion::physical_plan::PhysicalExpr;
493494
use datafusion::sql::unparser::dialect::Dialect;
494495
use datafusion::sql::unparser::plan_to_sql;
495496
use datafusion::{
@@ -520,7 +521,12 @@ mod tests {
520521
unimplemented!()
521522
}
522523

523-
fn execute(&self, _query: &str, _schema: SchemaRef) -> Result<SendableRecordBatchStream> {
524+
fn execute(
525+
&self,
526+
_query: &str,
527+
_schema: SchemaRef,
528+
_filters: &[Arc<dyn PhysicalExpr>],
529+
) -> Result<SendableRecordBatchStream> {
524530
unimplemented!()
525531
}
526532

@@ -876,7 +882,7 @@ mod tests {
876882
),
877883
(
878884
"SELECT foo.df_table.a FROM foo.df_table",
879-
r#"SELECT a FROM "default".remote_table"#,
885+
r#"SELECT remote_table.a FROM "default".remote_table"#,
880886
),
881887
(
882888
"SELECT MIN(a) FROM foo.df_table",

datafusion-federation/src/sql/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use datafusion::{
66
error::Result,
77
logical_expr::LogicalPlan,
88
physical_plan::{metrics::MetricsSet, PhysicalExpr, SendableRecordBatchStream},
9-
sql::{sqlparser::ast, unparser::dialect::Dialect},
9+
sql::unparser::dialect::Dialect,
1010
};
1111
use std::sync::Arc;
1212

datafusion-federation/src/sql/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use datafusion::{
2222
error::{DataFusionError, Result},
2323
execution::{context::SessionState, TaskContext},
2424
logical_expr::{Extension, LogicalPlan},
25-
optimizer::{eliminate_nested_union::EliminateNestedUnion, Analyzer, AnalyzerRule, Optimizer},
25+
optimizer::{optimize_unions::OptimizeUnions, Analyzer, AnalyzerRule, Optimizer},
2626
physical_expr::EquivalenceProperties,
2727
physical_plan::{
2828
execution_plan::{Boundedness, EmissionType},
@@ -51,7 +51,7 @@ use crate::{
5151
/// Returns a federation analyzer rule that is optimized for SQL federation.
5252
pub fn federation_analyzer_rule() -> FederationAnalyzerRule {
5353
FederationAnalyzerRule::new().with_optimizer(Optimizer::with_rules(vec![
54-
Arc::new(EliminateNestedUnion::new()),
54+
Arc::new(OptimizeUnions::new()),
5555
Arc::new(PushDownFilterFederation::new()),
5656
Arc::new(OptimizeProjectionsFederation::new()),
5757
]))
@@ -323,7 +323,7 @@ impl DisplayAs for VirtualExecutionPlan {
323323
write!(f, " base_sql={statement}")?;
324324
}
325325

326-
let (logical_optimizers, ast_analyzers, sql_query_rewriters) = match gather_analyzers(&plan)
326+
let (logical_optimizers, ast_analyzers, _sql_query_rewriters) = match gather_analyzers(&plan)
327327
{
328328
Ok(analyzers) => analyzers,
329329
Err(_) => return Ok(()),
@@ -427,7 +427,6 @@ impl ExecutionPlan for VirtualExecutionPlan {
427427
_config: &ConfigOptions,
428428
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
429429
let parent_filters: Vec<_> = child_pushdown_result
430-
.clone()
431430
.parent_filters
432431
.into_iter()
433432
.map(|f| f.filter)
@@ -588,7 +587,7 @@ mod tests {
588587
self
589588
}
590589

591-
fn table_reference(&self) -> TableReference {
590+
fn table_reference(&self) -> MultiPartTableReference {
592591
self.table.table_reference().clone()
593592
}
594593

@@ -914,7 +913,7 @@ mod tests {
914913
});
915914

916915
let expected = vec![
917-
r#"SELECT a, b, c FROM "default"."table" UNION ALL SELECT a, b, c FROM "default"."Table"(1) AS Table"#,
916+
r#"SELECT "table".a, "table".b, "table".c FROM "default"."table" UNION ALL SELECT "Table".a, "Table".b, "Table".c FROM "default"."Table"(1) AS Table"#,
918917
];
919918

920919
assert_eq!(
@@ -929,6 +928,7 @@ mod tests {
929928
async fn sql_query_rewriter_hook_invoked_and_rewrites_sql() -> Result<(), DataFusionError> {
930929
let executor = TestExecutor {
931930
compute_context: "rewrite".into(),
931+
cannot_federate: None,
932932
};
933933
let rewrite_calls = Arc::new(AtomicUsize::new(0));
934934
let table_ref = "table_with_rewriter".to_string();
@@ -955,7 +955,7 @@ mod tests {
955955
let physical_plan = ctx.state().create_physical_plan(&logical_plan).await?;
956956

957957
let mut final_queries = vec![];
958-
let _ = physical_plan.apply(|node| {
958+
physical_plan.apply(|node| {
959959
if node.name() == "sql_federation_exec" {
960960
let node = node
961961
.as_any()
@@ -964,7 +964,7 @@ mod tests {
964964
final_queries.push(node.final_sql()?);
965965
}
966966
Ok(TreeNodeRecursion::Continue)
967-
});
967+
})?;
968968

969969
let [final_query] = final_queries.as_slice() else {
970970
panic!("expected a single federated SQL query");

0 commit comments

Comments
 (0)