Skip to content

Commit dc11bd6

Browse files
committed
Upgrade to DataFusion 47 and Arrow 55
1 parent 924096a commit dc11bd6

5 files changed

Lines changed: 52 additions & 45 deletions

File tree

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ readme = "README.md"
1818
async-trait = "0.1.77"
1919
async-stream = "0.3.5"
2020
futures = "0.3.30"
21-
datafusion = "46"
22-
datafusion-substrait = "46"
23-
arrow-json = "54"
21+
datafusion = "47"
22+
datafusion-substrait = "47"
23+
arrow-json = "55"
2424

2525
[patch.crates-io]
2626
datafusion-federation = { path = "./datafusion-federation" }

datafusion-federation/src/optimize.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -119,18 +119,5 @@ fn optimize_plan_node(
119119
return Ok(Transformed::no(plan));
120120
}
121121

122-
if rule.supports_rewrite() {
123-
return rule.rewrite(plan, config);
124-
}
125-
126-
#[allow(deprecated)]
127-
rule.try_optimize(&plan, config).map(|maybe_plan| {
128-
match maybe_plan {
129-
Some(new_plan) => {
130-
// if the node was rewritten by the optimizer, replace the node
131-
Transformed::yes(new_plan)
132-
}
133-
None => Transformed::no(plan),
134-
}
135-
})
122+
rule.rewrite(plan, config)
136123
}

datafusion-federation/src/optimize/optimize_projections/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,8 +464,16 @@ fn merge_consecutive_projections(proj: Projection) -> Result<Transformed<Project
464464
expr,
465465
relation,
466466
name,
467+
metadata,
467468
}) => rewrite_expr(*expr, &prev_projection).map(|result| {
468-
result.update_data(|expr| Expr::Alias(Alias::new(expr, relation, name)))
469+
result.update_data(|expr| {
470+
Expr::Alias(Alias {
471+
expr: Box::new(expr),
472+
relation,
473+
name,
474+
metadata,
475+
})
476+
})
469477
}),
470478
e => rewrite_expr(e, &prev_projection),
471479
}

sources/sql/src/rewrite/ast.rs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use datafusion::{
44
common::HashMap,
55
sql::{
66
sqlparser::ast::{
7-
self, Ident, ObjectName, Query, SelectItem, SetExpr, TableFactor, TableWithJoins,
7+
self, Ident, ObjectName, ObjectNamePart, Query, SelectItem, SetExpr, TableFactor,
8+
TableWithJoins,
89
},
910
TableReference,
1011
},
@@ -48,6 +49,9 @@ fn rewrite_multi_part_table_with_joins(
4849
| ast::JoinOperator::RightOuter(join_constraint)
4950
| ast::JoinOperator::Inner(join_constraint)
5051
| ast::JoinOperator::LeftOuter(join_constraint)
52+
| ast::JoinOperator::Join(join_constraint)
53+
| ast::JoinOperator::Left(join_constraint)
54+
| ast::JoinOperator::Right(join_constraint)
5155
| ast::JoinOperator::Semi(join_constraint)
5256
| ast::JoinOperator::Anti(join_constraint) => {
5357
if let ast::JoinConstraint::On(expr) = join_constraint {
@@ -81,7 +85,7 @@ fn rewrite_object_name(
8185
rewrite
8286
.parts
8387
.iter()
84-
.map(|p| Ident::new(p.to_string()))
88+
.map(|p| ObjectNamePart::Identifier(Ident::new(p.to_string())))
8589
.collect(),
8690
);
8791
*object_name = new_name;
@@ -182,7 +186,13 @@ fn rewrite_multi_part_table_reference_in_expr(
182186

183187
// Get the column name (last identifier) and table name (all other identifiers)
184188
let column_name = idents.last().cloned();
185-
let obj_name = ObjectName(idents[..idents.len() - 1].to_vec());
189+
let obj_name = ObjectName(
190+
idents[..idents.len() - 1]
191+
.iter()
192+
.cloned()
193+
.map(ObjectNamePart::Identifier)
194+
.collect(),
195+
);
186196

187197
if let Some(rewrite) = known_rewrites.get(&obj_name) {
188198
// Rewrite the table parts
@@ -240,18 +250,18 @@ fn rewrite_multi_part_table_reference_in_expr(
240250
ast::Expr::Case {
241251
operand,
242252
conditions,
243-
results,
244253
else_result,
245254
..
246255
} => {
247256
if let Some(op) = operand {
248257
rewrite_multi_part_table_reference_in_expr(op, known_rewrites);
249258
}
250259
for condition in conditions {
251-
rewrite_multi_part_table_reference_in_expr(condition, known_rewrites);
252-
}
253-
for result in results {
254-
rewrite_multi_part_table_reference_in_expr(result, known_rewrites);
260+
rewrite_multi_part_table_reference_in_expr(
261+
&mut condition.condition,
262+
known_rewrites,
263+
);
264+
rewrite_multi_part_table_reference_in_expr(&mut condition.result, known_rewrites);
255265
}
256266
if let Some(else_res) = else_result {
257267
rewrite_multi_part_table_reference_in_expr(else_res, known_rewrites);
@@ -264,9 +274,6 @@ fn rewrite_multi_part_table_reference_in_expr(
264274
ast::Expr::JsonAccess { value, .. } => {
265275
rewrite_multi_part_table_reference_in_expr(&mut *value, known_rewrites);
266276
}
267-
ast::Expr::CompositeAccess { expr, .. } => {
268-
rewrite_multi_part_table_reference_in_expr(&mut *expr, known_rewrites);
269-
}
270277
ast::Expr::IsFalse(expr) => {
271278
rewrite_multi_part_table_reference_in_expr(&mut *expr, known_rewrites);
272279
}
@@ -497,9 +504,6 @@ fn rewrite_multi_part_table_reference_in_expr(
497504
ast::Expr::Lambda(lambda_function) => {
498505
rewrite_multi_part_table_reference_in_expr(&mut lambda_function.body, known_rewrites);
499506
}
500-
ast::Expr::Method(method) => {
501-
rewrite_multi_part_table_reference_in_expr(&mut method.expr, known_rewrites);
502-
}
503507
ast::Expr::CompoundFieldAccess { root, access_chain } => {
504508
rewrite_multi_part_table_reference_in_expr(&mut *root, known_rewrites);
505509
for access in access_chain {
@@ -543,20 +547,23 @@ fn rewrite_multi_part_table_reference_in_expr(
543547
}
544548

545549
fn table_reference_to_object_name(table_reference: &TableReference) -> ObjectName {
550+
use datafusion::sql::sqlparser::ast::{Ident, ObjectNamePart};
546551
match table_reference {
547-
TableReference::Bare { table } => ObjectName(vec![Ident::new(table.to_string())]),
552+
TableReference::Bare { table } => ObjectName(vec![ObjectNamePart::Identifier(Ident::new(
553+
table.to_string(),
554+
))]),
548555
TableReference::Partial { schema, table } => ObjectName(vec![
549-
Ident::new(schema.to_string()),
550-
Ident::new(table.to_string()),
556+
ObjectNamePart::Identifier(Ident::new(schema.to_string())),
557+
ObjectNamePart::Identifier(Ident::new(table.to_string())),
551558
]),
552559
TableReference::Full {
553560
catalog,
554561
schema,
555562
table,
556563
} => ObjectName(vec![
557-
Ident::new(catalog.to_string()),
558-
Ident::new(schema.to_string()),
559-
Ident::new(table.to_string()),
564+
ObjectNamePart::Identifier(Ident::new(catalog.to_string())),
565+
ObjectNamePart::Identifier(Ident::new(schema.to_string())),
566+
ObjectNamePart::Identifier(Ident::new(table.to_string())),
560567
]),
561568
}
562569
}

sources/sql/src/rewrite/plan.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,7 @@ fn rewrite_table_scans_in_expr(
597597
Ok(Expr::ScalarSubquery(Subquery {
598598
subquery: Arc::new(new_subquery),
599599
outer_ref_columns,
600+
spans: subquery.spans,
600601
}))
601602
}
602603
Expr::BinaryExpr(binary_expr) => {
@@ -1098,6 +1099,7 @@ fn rewrite_table_scans_in_expr(
10981099
let subquery = Subquery {
10991100
subquery: Arc::new(subquery_plan),
11001101
outer_ref_columns,
1102+
spans: exists.subquery.spans,
11011103
};
11021104
Ok(Expr::Exists(Exists::new(subquery, exists.negated)))
11031105
}
@@ -1140,6 +1142,7 @@ fn rewrite_table_scans_in_expr(
11401142
let subquery = Subquery {
11411143
subquery: Arc::new(subquery_plan),
11421144
outer_ref_columns,
1145+
spans: is.subquery.spans,
11431146
};
11441147
Ok(Expr::InSubquery(InSubquery::new(
11451148
Box::new(expr),
@@ -1471,7 +1474,7 @@ mod tests {
14711474
// different tables in single aggregation expression
14721475
(
14731476
"SELECT COUNT(CASE WHEN appt.a > 0 THEN appt.a ELSE dft.a END) FROM app_table as appt, foo.df_table as dft",
1474-
"SELECT count(CASE WHEN (appt.a > 0) THEN appt.a ELSE dft.a END) FROM remote_table AS appt JOIN remote_table AS dft"
1477+
"SELECT count(CASE WHEN (appt.a > 0) THEN appt.a ELSE dft.a END) FROM remote_table AS appt CROSS JOIN remote_table AS dft"
14751478
),
14761479
];
14771480

@@ -1517,7 +1520,7 @@ mod tests {
15171520
let tests = vec![
15181521
(
15191522
"SELECT UNNEST([1, 2, 2, 5, NULL]), b, c from app_table where a > 10 order by b limit 10;",
1520-
r#"SELECT UNNEST(make_array(1, 2, 2, 5, NULL)) AS "UNNEST(make_array(Int64(1),Int64(2),Int64(2),Int64(5),NULL))", remote_table.b, remote_table.c FROM remote_table WHERE (remote_table.a > 10) ORDER BY remote_table.b ASC NULLS LAST LIMIT 10"#,
1523+
r#"SELECT UNNEST([1, 2, 2, 5, NULL]) AS "UNNEST(make_array(Int64(1),Int64(2),Int64(2),Int64(5),NULL))", remote_table.b, remote_table.c FROM remote_table WHERE (remote_table.a > 10) ORDER BY remote_table.b ASC NULLS LAST LIMIT 10"#,
15211524
),
15221525
(
15231526
"SELECT UNNEST(app_table.d), b, c from app_table where a > 10 order by b limit 10;",
@@ -1543,12 +1546,12 @@ mod tests {
15431546
let tests = vec![
15441547
(
15451548
"SELECT a FROM bar where a IN (SELECT a FROM bar)",
1546-
r#"SELECT remote_db.remote_schema.remote_table.a FROM remote_db.remote_schema.remote_table WHERE remote_db.remote_schema.remote_table.a IN (SELECT a FROM remote_db.remote_schema.remote_table)"#,
1549+
r#"SELECT remote_table.a FROM remote_db.remote_schema.remote_table WHERE remote_table.a IN (SELECT remote_table.a FROM remote_db.remote_schema.remote_table)"#,
15471550
true,
15481551
),
15491552
(
15501553
"SELECT a FROM bar where a IN (SELECT a FROM bar)",
1551-
r#"SELECT remote_db.remote_schema.remote_table.a FROM remote_db.remote_schema.remote_table WHERE remote_db.remote_schema.remote_table.a IN (SELECT remote_db.remote_schema.remote_table.a FROM remote_db.remote_schema.remote_table)"#,
1554+
r#"SELECT remote_table.a FROM remote_db.remote_schema.remote_table WHERE remote_table.a IN (SELECT remote_table.a FROM remote_db.remote_schema.remote_table)"#,
15521555
false,
15531556
),
15541557
];
@@ -1565,7 +1568,7 @@ mod tests {
15651568
let ctx = get_test_df_context();
15661569
let tests = vec![(
15671570
"SELECT foo.df_table.a FROM bar JOIN foo.df_table ON foo.df_table.a = (SELECT bar.a FROM bar WHERE bar.a > foo.df_table.a)",
1568-
r#"SELECT remote_table.a FROM remote_db.remote_schema.remote_table JOIN remote_table ON (remote_table.a = (SELECT a FROM remote_db.remote_schema.remote_table WHERE (remote_table.a > remote_table.a)))"#,
1571+
r#"SELECT remote_table.a FROM remote_db.remote_schema.remote_table INNER JOIN remote_table ON (remote_table.a = (SELECT remote_table.a FROM remote_db.remote_schema.remote_table WHERE (remote_table.a > remote_table.a)))"#,
15691572
true,
15701573
)];
15711574
for test in tests {
@@ -1588,7 +1591,7 @@ mod tests {
15881591
),
15891592
(
15901593
"SELECT a - 1, COUNT(*) AS c FROM app_table GROUP BY a - 1;",
1591-
r#"SELECT (remote_table.a - 1), count(*) AS c FROM remote_table GROUP BY (remote_table.a - 1)"#,
1594+
r#"SELECT (remote_table.a - 1), count(1) AS c FROM remote_table GROUP BY (remote_table.a - 1)"#,
15921595
),
15931596
];
15941597

@@ -1683,7 +1686,7 @@ mod collect_rewrites_tests {
16831686
use async_trait::async_trait;
16841687
use datafusion::{
16851688
arrow::datatypes::{DataType, Field, Schema, SchemaRef},
1686-
common::DFSchema,
1689+
common::{DFSchema, Spans},
16871690
datasource::DefaultTableSource,
16881691
execution::SendableRecordBatchStream,
16891692
sql::unparser::dialect::{DefaultDialect, Dialect},
@@ -1781,6 +1784,7 @@ mod collect_rewrites_tests {
17811784
let subquery = Expr::ScalarSubquery(Subquery {
17821785
subquery: Arc::new(table_scan),
17831786
outer_ref_columns: vec![],
1787+
spans: Spans::default(),
17841788
});
17851789

17861790
let mut known_rewrites = HashMap::new();
@@ -1843,6 +1847,7 @@ mod collect_rewrites_tests {
18431847
Subquery {
18441848
subquery: Arc::new(table_scan),
18451849
outer_ref_columns: vec![],
1850+
spans: Spans::default(),
18461851
},
18471852
false,
18481853
));

0 commit comments

Comments
 (0)