Skip to content

Commit e7f982d

Browse files
Downgrade to DF 45
1 parent 749a4f5 commit e7f982d

3 files changed

Lines changed: 47 additions & 68 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ repository = "https://github.com/datafusion-contrib/datafusion-federation"
1414
arrow-json = "54"
1515
async-stream = "0.3.5"
1616
async-trait = "0.1.83"
17-
datafusion = "46.0.1"
17+
datafusion = "45"
1818
datafusion-federation = { path = "./datafusion-federation", version = "0.3.7" }
1919
futures = "0.3.31"
2020
tokio = { version = "1.41", features = ["full"] }

datafusion-federation/src/sql/rewrite/ast.rs

Lines changed: 30 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,13 @@ fn rewrite_multi_part_table_reference_in_expr(
431431
ast::Expr::Value(..) => {}
432432
ast::Expr::IntroducedString { .. } => {}
433433
ast::Expr::TypedString { .. } => {}
434+
ast::Expr::MapAccess { column, keys } => {
435+
rewrite_multi_part_table_reference_in_expr(&mut *column, known_rewrites);
436+
437+
for key in keys {
438+
rewrite_multi_part_table_reference_in_expr(&mut key.key, known_rewrites);
439+
}
440+
}
434441
ast::Expr::Exists { subquery, .. } => {
435442
rewrite_multi_part_table_reference_in_query(&mut *subquery, known_rewrites);
436443
}
@@ -473,6 +480,29 @@ fn rewrite_multi_part_table_reference_in_expr(
473480
rewrite_multi_part_table_reference_in_expr(&mut entry.value, known_rewrites);
474481
}
475482
}
483+
ast::Expr::Subscript { expr, subscript } => {
484+
rewrite_multi_part_table_reference_in_expr(&mut *expr, known_rewrites);
485+
match &mut **subscript {
486+
ast::Subscript::Index { index } => {
487+
rewrite_multi_part_table_reference_in_expr(index, known_rewrites);
488+
}
489+
ast::Subscript::Slice {
490+
lower_bound,
491+
upper_bound,
492+
stride,
493+
} => {
494+
if let Some(lower_bound) = lower_bound {
495+
rewrite_multi_part_table_reference_in_expr(lower_bound, known_rewrites);
496+
}
497+
if let Some(upper_bound) = upper_bound {
498+
rewrite_multi_part_table_reference_in_expr(upper_bound, known_rewrites);
499+
}
500+
if let Some(stride) = stride {
501+
rewrite_multi_part_table_reference_in_expr(stride, known_rewrites);
502+
}
503+
}
504+
}
505+
}
476506
ast::Expr::Array(array) => {
477507
for expr in array.elem.iter_mut() {
478508
rewrite_multi_part_table_reference_in_expr(expr, known_rewrites);
@@ -498,45 +528,6 @@ fn rewrite_multi_part_table_reference_in_expr(
498528
ast::Expr::Method(method) => {
499529
rewrite_multi_part_table_reference_in_expr(&mut method.expr, known_rewrites);
500530
}
501-
ast::Expr::CompoundFieldAccess { root, access_chain } => {
502-
rewrite_multi_part_table_reference_in_expr(root, known_rewrites);
503-
for access in access_chain {
504-
match access {
505-
ast::AccessExpr::Dot(expr) => {
506-
rewrite_multi_part_table_reference_in_expr(expr, known_rewrites);
507-
}
508-
ast::AccessExpr::Subscript(subscript) => match subscript {
509-
ast::Subscript::Slice {
510-
lower_bound,
511-
upper_bound,
512-
stride,
513-
} => {
514-
if let Some(lower_bound) = lower_bound {
515-
rewrite_multi_part_table_reference_in_expr(
516-
lower_bound,
517-
known_rewrites,
518-
);
519-
}
520-
if let Some(upper_bound) = upper_bound {
521-
rewrite_multi_part_table_reference_in_expr(
522-
upper_bound,
523-
known_rewrites,
524-
);
525-
}
526-
if let Some(stride) = stride {
527-
rewrite_multi_part_table_reference_in_expr(stride, known_rewrites);
528-
}
529-
}
530-
ast::Subscript::Index { index } => {
531-
rewrite_multi_part_table_reference_in_expr(index, known_rewrites);
532-
}
533-
},
534-
}
535-
}
536-
}
537-
ast::Expr::IsNormalized { expr, .. } => {
538-
rewrite_multi_part_table_reference_in_expr(expr, known_rewrites);
539-
}
540531
}
541532
}
542533

datafusion-federation/src/sql/rewrite/plan.rs

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use datafusion::{
77
logical_expr::{
88
self,
99
expr::{
10-
AggregateFunction, AggregateFunctionParams, Alias, Exists, InList, InSubquery,
11-
ScalarFunction, Sort, Unnest, WindowFunction, WindowFunctionParams,
10+
AggregateFunction, Alias, Exists, InList, InSubquery, ScalarFunction, Sort, Unnest,
11+
WindowFunction,
1212
},
1313
Between, BinaryExpr, Case, Cast, Expr, GroupingSet, Like, Limit, LogicalPlan,
1414
LogicalPlanBuilder, Projection, Subquery, TryCast,
@@ -105,27 +105,27 @@ fn collect_known_rewrites_from_expr(
105105
Ok(())
106106
}
107107
Expr::AggregateFunction(af) => {
108-
for arg in af.params.args {
108+
for arg in af.args {
109109
collect_known_rewrites_from_expr(arg, known_rewrites)?;
110110
}
111-
if let Some(filter) = af.params.filter {
111+
if let Some(filter) = af.filter {
112112
collect_known_rewrites_from_expr(*filter, known_rewrites)?;
113113
}
114-
if let Some(order_by) = af.params.order_by {
114+
if let Some(order_by) = af.order_by {
115115
for sort in order_by {
116116
collect_known_rewrites_from_expr(sort.expr, known_rewrites)?;
117117
}
118118
}
119119
Ok(())
120120
}
121121
Expr::WindowFunction(wf) => {
122-
for arg in wf.params.args {
122+
for arg in wf.args {
123123
collect_known_rewrites_from_expr(arg, known_rewrites)?;
124124
}
125-
for expr in wf.params.partition_by {
125+
for expr in wf.partition_by {
126126
collect_known_rewrites_from_expr(expr, known_rewrites)?;
127127
}
128-
for sort in wf.params.order_by {
128+
for sort in wf.order_by {
129129
collect_known_rewrites_from_expr(sort.expr, known_rewrites)?;
130130
}
131131
Ok(())
@@ -939,7 +939,6 @@ fn rewrite_table_scans_in_expr(
939939
}
940940
Expr::AggregateFunction(af) => {
941941
let args = af
942-
.params
943942
.args
944943
.into_iter()
945944
.map(|e| {
@@ -952,7 +951,6 @@ fn rewrite_table_scans_in_expr(
952951
})
953952
.collect::<Result<Vec<Expr>>>()?;
954953
let filter = af
955-
.params
956954
.filter
957955
.map(|e| {
958956
rewrite_table_scans_in_expr(
@@ -965,7 +963,6 @@ fn rewrite_table_scans_in_expr(
965963
.transpose()?
966964
.map(Box::new);
967965
let order_by = af
968-
.params
969966
.order_by
970967
.map(|e| {
971968
e.into_iter()
@@ -981,21 +978,17 @@ fn rewrite_table_scans_in_expr(
981978
.collect::<Result<Vec<Sort>>>()
982979
})
983980
.transpose()?;
984-
let params = AggregateFunctionParams {
981+
Ok(Expr::AggregateFunction(AggregateFunction {
982+
func: af.func,
985983
args,
986-
distinct: af.params.distinct,
984+
distinct: af.distinct,
987985
filter,
988986
order_by,
989-
null_treatment: af.params.null_treatment,
990-
};
991-
Ok(Expr::AggregateFunction(AggregateFunction {
992-
func: af.func,
993-
params,
987+
null_treatment: af.null_treatment,
994988
}))
995989
}
996990
Expr::WindowFunction(wf) => {
997991
let args = wf
998-
.params
999992
.args
1000993
.into_iter()
1001994
.map(|e| {
@@ -1008,7 +1001,6 @@ fn rewrite_table_scans_in_expr(
10081001
})
10091002
.collect::<Result<Vec<Expr>>>()?;
10101003
let partition_by = wf
1011-
.params
10121004
.partition_by
10131005
.into_iter()
10141006
.map(|e| {
@@ -1021,7 +1013,6 @@ fn rewrite_table_scans_in_expr(
10211013
})
10221014
.collect::<Result<Vec<Expr>>>()?;
10231015
let order_by = wf
1024-
.params
10251016
.order_by
10261017
.into_iter()
10271018
.map(|s| {
@@ -1034,16 +1025,13 @@ fn rewrite_table_scans_in_expr(
10341025
.map(|e| Sort::new(e, s.asc, s.nulls_first))
10351026
})
10361027
.collect::<Result<Vec<Sort>>>()?;
1037-
let params = WindowFunctionParams {
1028+
Ok(Expr::WindowFunction(WindowFunction {
1029+
fun: wf.fun,
10381030
args,
10391031
partition_by,
10401032
order_by,
1041-
window_frame: wf.params.window_frame,
1042-
null_treatment: wf.params.null_treatment,
1043-
};
1044-
Ok(Expr::WindowFunction(WindowFunction {
1045-
fun: wf.fun,
1046-
params,
1033+
window_frame: wf.window_frame,
1034+
null_treatment: wf.null_treatment,
10471035
}))
10481036
}
10491037
Expr::InList(il) => {

0 commit comments

Comments
 (0)