diff --git a/Cargo.toml b/Cargo.toml index 73dc3fb..30ece0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,10 +11,10 @@ readme = "README.md" repository = "https://github.com/datafusion-contrib/datafusion-federation" [workspace.dependencies] -arrow-json = "57.2" +arrow-json = "58.0" async-stream = "0.3" async-trait = "0.1" -datafusion = "52.0" +datafusion = "53" datafusion-federation = { path = "./datafusion-federation", version = "0.5.1" } futures = "0.3" tokio = { version = "1.41", features = ["full"] } diff --git a/datafusion-federation/src/schema_cast/mod.rs b/datafusion-federation/src/schema_cast/mod.rs index 526886c..c39f324 100644 --- a/datafusion-federation/src/schema_cast/mod.rs +++ b/datafusion-federation/src/schema_cast/mod.rs @@ -27,7 +27,7 @@ mod struct_cast; pub struct SchemaCastScanExec { input: Arc, schema: SchemaRef, - properties: PlanProperties, + properties: Arc, metrics_set: ExecutionPlanMetricsSet, } @@ -36,12 +36,12 @@ impl SchemaCastScanExec { let eq_properties = input.equivalence_properties().clone(); let emission_type = input.pipeline_behavior(); let boundedness = input.boundedness(); - let properties = PlanProperties::new( + let properties = Arc::new(PlanProperties::new( eq_properties, input.output_partitioning().clone(), emission_type, boundedness, - ); + )); Self { input, schema, @@ -66,7 +66,7 @@ impl ExecutionPlan for SchemaCastScanExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion-federation/src/sql/analyzer.rs b/datafusion-federation/src/sql/analyzer.rs index 625af38..29a39c5 100644 --- a/datafusion-federation/src/sql/analyzer.rs +++ b/datafusion-federation/src/sql/analyzer.rs @@ -5,7 +5,7 @@ use datafusion::{ logical_expr::{ expr::{ AggregateFunction, AggregateFunctionParams, Alias, Exists, InList, InSubquery, - PlannedReplaceSelectItem, ScalarFunction, Sort, Unnest, WildcardOptions, + PlannedReplaceSelectItem, ScalarFunction, SetComparison, Sort, Unnest, WildcardOptions, WindowFunction, WindowFunctionParams, }, Between, BinaryExpr, Case, Cast, Expr, GroupingSet, Like, Limit, LogicalPlan, Subquery, @@ -563,6 +563,27 @@ fn rewrite_table_scans_in_expr( Ok(Expr::Unnest(Unnest::new(expr))) } Expr::ScalarVariable(_, _) | Expr::Literal(_, _) | Expr::Placeholder(_) => Ok(expr), + Expr::SetComparison(sc) => { + let expr = rewrite_table_scans_in_expr(*sc.expr, known_rewrites)?; + let subquery_plan = rewrite_table_scans(&sc.subquery.subquery, known_rewrites)?; + let outer_ref_columns = sc + .subquery + .outer_ref_columns + .into_iter() + .map(|e| rewrite_table_scans_in_expr(e, known_rewrites)) + .collect::>>()?; + let subquery = Subquery { + subquery: Arc::new(subquery_plan), + outer_ref_columns, + spans: Spans::new(), + }; + Ok(Expr::SetComparison(SetComparison::new( + Box::new(expr), + subquery, + sc.op, + sc.quantifier, + ))) + } } } diff --git a/datafusion-federation/src/sql/ast_analyzer.rs b/datafusion-federation/src/sql/ast_analyzer.rs index 5dee009..2fc3a49 100644 --- a/datafusion-federation/src/sql/ast_analyzer.rs +++ b/datafusion-federation/src/sql/ast_analyzer.rs @@ -97,6 +97,7 @@ impl VisitorMut for TableArgReplace { *args = Some(arg.clone()); if alias.is_none() { *alias = Some(TableAlias { + explicit: true, name: Ident::new(table.table()), columns: vec![], }) diff --git a/datafusion-federation/src/sql/mod.rs b/datafusion-federation/src/sql/mod.rs index 6e6814f..d5198c7 100644 --- a/datafusion-federation/src/sql/mod.rs +++ b/datafusion-federation/src/sql/mod.rs @@ -164,7 +164,7 @@ impl FederationPlanner for SQLFederationPlanner { pub struct VirtualExecutionPlan { plan: LogicalPlan, executor: Arc, - props: PlanProperties, + props: Arc, statistics: Statistics, filters: Vec>, } @@ -172,12 +172,12 @@ pub struct VirtualExecutionPlan { impl VirtualExecutionPlan { pub fn new(plan: LogicalPlan, executor: Arc, statistics: Statistics) -> Self { let schema: Schema = plan.schema().as_arrow().clone(); - let props = PlanProperties::new( + let props = Arc::new(PlanProperties::new( EquivalenceProperties::new(Arc::new(schema)), Partitioning::UnknownPartitioning(1), EmissionType::Incremental, Boundedness::Bounded, - ); + )); Self { plan, executor, @@ -401,7 +401,7 @@ impl ExecutionPlan for VirtualExecutionPlan { .execute(&self.final_sql()?, self.schema(), &self.filters) } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.props }