Skip to content

Commit ee26f55

Browse files
committed
Upgrade to DataFusion 53
1 parent 375c446 commit ee26f55

5 files changed

Lines changed: 35 additions & 11 deletions

File tree

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ readme = "README.md"
1111
repository = "https://github.com/datafusion-contrib/datafusion-federation"
1212

1313
[workspace.dependencies]
14-
arrow-json = "57.2"
14+
arrow-json = "58.0"
1515
async-stream = "0.3"
1616
async-trait = "0.1"
17-
datafusion = "52.0"
17+
# TODO
18+
#datafusion = "52.0"
19+
datafusion = { git = "https://github.com/apache/datafusion", branch = "branch-53" }
1820
datafusion-federation = { path = "./datafusion-federation", version = "0.5.0" }
1921
futures = "0.3"
2022
tokio = { version = "1.41", features = ["full"] }

datafusion-federation/src/schema_cast/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ mod struct_cast;
2727
pub struct SchemaCastScanExec {
2828
input: Arc<dyn ExecutionPlan>,
2929
schema: SchemaRef,
30-
properties: PlanProperties,
30+
properties: Arc<PlanProperties>,
3131
metrics_set: ExecutionPlanMetricsSet,
3232
}
3333

@@ -36,12 +36,12 @@ impl SchemaCastScanExec {
3636
let eq_properties = input.equivalence_properties().clone();
3737
let emission_type = input.pipeline_behavior();
3838
let boundedness = input.boundedness();
39-
let properties = PlanProperties::new(
39+
let properties = Arc::new(PlanProperties::new(
4040
eq_properties,
4141
input.output_partitioning().clone(),
4242
emission_type,
4343
boundedness,
44-
);
44+
));
4545
Self {
4646
input,
4747
schema,
@@ -66,7 +66,7 @@ impl ExecutionPlan for SchemaCastScanExec {
6666
self
6767
}
6868

69-
fn properties(&self) -> &PlanProperties {
69+
fn properties(&self) -> &Arc<PlanProperties> {
7070
&self.properties
7171
}
7272

datafusion-federation/src/sql/analyzer.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use datafusion::{
55
logical_expr::{
66
expr::{
77
AggregateFunction, AggregateFunctionParams, Alias, Exists, InList, InSubquery,
8-
PlannedReplaceSelectItem, ScalarFunction, Sort, Unnest, WildcardOptions,
8+
PlannedReplaceSelectItem, ScalarFunction, SetComparison, Sort, Unnest, WildcardOptions,
99
WindowFunction, WindowFunctionParams,
1010
},
1111
Between, BinaryExpr, Case, Cast, Expr, GroupingSet, Like, Limit, LogicalPlan, Subquery,
@@ -563,6 +563,27 @@ fn rewrite_table_scans_in_expr(
563563
Ok(Expr::Unnest(Unnest::new(expr)))
564564
}
565565
Expr::ScalarVariable(_, _) | Expr::Literal(_, _) | Expr::Placeholder(_) => Ok(expr),
566+
Expr::SetComparison(sc) => {
567+
let expr = rewrite_table_scans_in_expr(*sc.expr, known_rewrites)?;
568+
let subquery_plan = rewrite_table_scans(&sc.subquery.subquery, known_rewrites)?;
569+
let outer_ref_columns = sc
570+
.subquery
571+
.outer_ref_columns
572+
.into_iter()
573+
.map(|e| rewrite_table_scans_in_expr(e, known_rewrites))
574+
.collect::<Result<Vec<Expr>>>()?;
575+
let subquery = Subquery {
576+
subquery: Arc::new(subquery_plan),
577+
outer_ref_columns,
578+
spans: Spans::new(),
579+
};
580+
Ok(Expr::SetComparison(SetComparison::new(
581+
Box::new(expr),
582+
subquery,
583+
sc.op,
584+
sc.quantifier,
585+
)))
586+
}
566587
}
567588
}
568589

datafusion-federation/src/sql/ast_analyzer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ impl VisitorMut for TableArgReplace {
9797
*args = Some(arg.clone());
9898
if alias.is_none() {
9999
*alias = Some(TableAlias {
100+
explicit: true,
100101
name: Ident::new(table.table()),
101102
columns: vec![],
102103
})

datafusion-federation/src/sql/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,20 +164,20 @@ impl FederationPlanner for SQLFederationPlanner {
164164
pub struct VirtualExecutionPlan {
165165
plan: LogicalPlan,
166166
executor: Arc<dyn SQLExecutor>,
167-
props: PlanProperties,
167+
props: Arc<PlanProperties>,
168168
statistics: Statistics,
169169
filters: Vec<Arc<dyn PhysicalExpr>>,
170170
}
171171

172172
impl VirtualExecutionPlan {
173173
pub fn new(plan: LogicalPlan, executor: Arc<dyn SQLExecutor>, statistics: Statistics) -> Self {
174174
let schema: Schema = plan.schema().as_arrow().clone();
175-
let props = PlanProperties::new(
175+
let props = Arc::new(PlanProperties::new(
176176
EquivalenceProperties::new(Arc::new(schema)),
177177
Partitioning::UnknownPartitioning(1),
178178
EmissionType::Incremental,
179179
Boundedness::Bounded,
180-
);
180+
));
181181
Self {
182182
plan,
183183
executor,
@@ -371,7 +371,7 @@ impl ExecutionPlan for VirtualExecutionPlan {
371371
.execute(&self.final_sql()?, self.schema(), &self.filters)
372372
}
373373

374-
fn properties(&self) -> &PlanProperties {
374+
fn properties(&self) -> &Arc<PlanProperties> {
375375
&self.props
376376
}
377377

0 commit comments

Comments
 (0)