Skip to content

Commit 0888b06

Browse files
Upgrade datafusion to 47 and datafusion-federation to 0.4.2 (#324)
* Upgrade datafusion and datafusion-federation * Use RemoteTableRef::from instead of RemoteTableRef::try_from * Upgrade DuckDB to 1.3.0 * Add Rust toolchain to pin rustc/clippy version --------- Co-authored-by: Phillip LeBlanc <phillip@leblanc.tech>
1 parent 469959d commit 0888b06

21 files changed

Lines changed: 376 additions & 238 deletions

File tree

Cargo.lock

Lines changed: 265 additions & 138 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,21 @@ license = "Apache-2.0"
3131
description = "Extend the capabilities of DataFusion to support additional data sources via implementations of the `TableProvider` trait."
3232

3333
[workspace.dependencies]
34-
arrow = "54.2.1"
35-
arrow-array = { version = "54.2.1" }
36-
arrow-flight = { version = "54.2.1", features = [
34+
arrow = "55.0.0"
35+
arrow-array = { version = "55.0.0" }
36+
arrow-flight = { version = "55.0.0", features = [
3737
"flight-sql-experimental",
3838
"tls",
3939
] }
40-
arrow-schema = { version = "54.2.1", features = ["serde"] }
41-
arrow-json = "54.2.1"
42-
arrow-odbc = { version = "16" }
43-
datafusion = { version = "46", default-features = false }
44-
datafusion-expr = { version = "46" }
45-
datafusion-federation = { version = "=0.3.7" }
46-
datafusion-ffi = { version = "46" }
47-
datafusion-proto = { version = "46" }
48-
datafusion-physical-expr = { version = "46" }
49-
datafusion-physical-plan = { version = "46" }
40+
arrow-schema = { version = "55.0.0", features = ["serde"] }
41+
arrow-json = "55.0.0"
42+
arrow-odbc = { version = "16.0.1" }
43+
datafusion = { version = "47", default-features = false }
44+
datafusion-expr = { version = "47" }
45+
datafusion-federation = { version = "=0.4.2" }
46+
datafusion-ffi = { version = "47" }
47+
datafusion-proto = { version = "47" }
48+
datafusion-physical-expr = { version = "47" }
49+
datafusion-physical-plan = { version = "47" }
5050
datafusion-table-providers = { path = "core" }
51-
duckdb = { version = "=1.2.1", package = "spiceai_duckdb_fork" } # Forked to add support for duckdb_scan_arrow, pending: https://github.com/duckdb/duckdb-rs/pull/488
51+
duckdb = { version = "=1.3.0", package = "spiceai_duckdb_fork" } # Forked to add support for duckdb_scan_arrow, pending: https://github.com/duckdb/duckdb-rs/pull/488

core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ duckdb = { workspace = true, features = [
4343
"vtab-arrow",
4444
"appender-arrow",
4545
], optional = true }
46-
libduckdb-sys = { version = "=1.2.1", optional = true }
46+
libduckdb-sys = { version = "=1.3.0", optional = true }
4747
dyn-clone = { version = "1.0", optional = true }
4848
fallible-iterator = "0.3.0"
4949
fundu = "2.0.1"

core/src/duckdb/creator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -731,13 +731,13 @@ pub(crate) mod tests {
731731
dbconnection::duckdbconn::DuckDbConnection, duckdbpool::DuckDbConnectionPool,
732732
},
733733
};
734-
use datafusion::arrow::array::RecordBatch;
734+
use datafusion::{arrow::array::RecordBatch, datasource::sink::DataSink};
735735
use datafusion::{
736736
common::SchemaExt,
737737
execution::{SendableRecordBatchStream, TaskContext},
738738
logical_expr::dml::InsertOp,
739739
parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder,
740-
physical_plan::{insert::DataSink, memory::MemoryStream},
740+
physical_plan::memory::MemoryStream,
741741
};
742742
use tracing::subscriber::DefaultGuard;
743743
use tracing_subscriber::EnvFilter;

core/src/duckdb/federation.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use crate::sql::db_connection_pool::dbconnection::{get_schema, Error as DbError}
22
use crate::sql::sql_provider_datafusion::{get_stream, to_execution_error};
33
use datafusion::arrow::datatypes::SchemaRef;
44
use datafusion::sql::unparser::dialect::Dialect;
5-
use datafusion_federation::sql::{SQLExecutor, SQLFederationProvider, SQLTableSource};
5+
use datafusion_federation::sql::{
6+
RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
7+
};
68
use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
79
use futures::TryStreamExt;
810
use snafu::ResultExt;
@@ -23,14 +25,14 @@ impl<T, P> DuckDBTable<T, P> {
2325
fn create_federated_table_source(
2426
self: Arc<Self>,
2527
) -> DataFusionResult<Arc<dyn FederatedTableSource>> {
26-
let table_name = self.base_table.table_reference.to_quoted_string();
28+
let table_reference = self.base_table.table_reference.clone();
2729
let schema = Arc::clone(&Arc::clone(&self).base_table.schema());
2830
let fed_provider = Arc::new(SQLFederationProvider::new(self));
2931
Ok(Arc::new(SQLTableSource::new_with_schema(
3032
fed_provider,
31-
table_name,
33+
RemoteTableRef::from(table_reference),
3234
schema,
33-
)?))
35+
)))
3436
}
3537

3638
pub fn create_federated_table_provider(

core/src/duckdb/write.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,14 @@ use arrow_schema::ArrowError;
1515
use async_trait::async_trait;
1616
use datafusion::catalog::Session;
1717
use datafusion::common::{Constraints, SchemaExt};
18+
use datafusion::datasource::sink::{DataSink, DataSinkExec};
1819
use datafusion::logical_expr::dml::InsertOp;
1920
use datafusion::{
2021
datasource::{TableProvider, TableType},
2122
error::DataFusionError,
2223
execution::{SendableRecordBatchStream, TaskContext},
2324
logical_expr::Expr,
24-
physical_plan::{
25-
insert::{DataSink, DataSinkExec},
26-
metrics::MetricsSet,
27-
DisplayAs, DisplayFormatType, ExecutionPlan,
28-
},
25+
physical_plan::{metrics::MetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan},
2926
};
3027
use duckdb::Transaction;
3128
use futures::StreamExt;

core/src/flight/exec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ fn find_matching_column(
268268
impl DisplayAs for FlightExec {
269269
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
270270
match t {
271-
DisplayFormatType::Default => write!(
271+
DisplayFormatType::Default | DisplayFormatType::TreeRender => write!(
272272
f,
273273
"FlightExec: origin={}, streams={}",
274274
self.config.origin,

core/src/mysql/federation.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use arrow::datatypes::SchemaRef;
44
use async_trait::async_trait;
55
use datafusion::sql::sqlparser::ast::{self, VisitMut};
66
use datafusion::sql::unparser::dialect::Dialect;
7-
use datafusion_federation::sql::{AstAnalyzer, SQLExecutor, SQLFederationProvider, SQLTableSource};
7+
use datafusion_federation::sql::{
8+
AstAnalyzer, RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
9+
};
810
use datafusion_federation::{FederatedTableProviderAdaptor, FederatedTableSource};
911
use futures::TryStreamExt;
1012
use snafu::ResultExt;
@@ -24,14 +26,14 @@ impl MySQLTable {
2426
fn create_federated_table_source(
2527
self: Arc<Self>,
2628
) -> DataFusionResult<Arc<dyn FederatedTableSource>> {
27-
let table_name = self.base_table.table_reference.to_quoted_string();
29+
let table_reference = self.base_table.table_reference.clone();
2830
let schema = Arc::clone(&Arc::clone(&self).base_table.schema());
2931
let fed_provider = Arc::new(SQLFederationProvider::new(self));
3032
Ok(Arc::new(SQLTableSource::new_with_schema(
3133
fed_provider,
32-
table_name,
34+
RemoteTableRef::from(table_reference),
3335
schema,
34-
)?))
36+
)))
3537
}
3638

3739
pub fn create_federated_table_provider(

core/src/mysql/mysql_window.rs

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use datafusion::sql::sqlparser::ast::{Expr, Function, Ident, VisitorMut, WindowType};
1+
use datafusion::sql::sqlparser::ast::{
2+
Expr, Function, Ident, ObjectNamePart, VisitorMut, WindowType,
3+
};
24
use std::ops::ControlFlow;
35

46
#[derive(PartialEq, Eq)]
@@ -41,7 +43,7 @@ impl VisitorMut for MySQLWindowVisitor {
4143

4244
fn pre_visit_expr(&mut self, expr: &mut Expr) -> ControlFlow<Self::Break> {
4345
if let Expr::Function(func) = expr {
44-
if let Some(Ident { value, .. }) = func.name.0.first() {
46+
if let Some(ObjectNamePart::Identifier(Ident { value, .. })) = func.name.0.first() {
4547
// match for some scalars that support window functions
4648
// all of them need to remove nulls first/last, but only rank removes the frame clause
4749
if let Some(func_type) = FunctionType::from_str(value) {
@@ -58,7 +60,7 @@ impl MySQLWindowVisitor {
5860
pub fn remove_nulls_first_last(func: &mut Function) {
5961
if let Some(WindowType::WindowSpec(spec)) = func.over.as_mut() {
6062
for order_by in &mut spec.order_by {
61-
order_by.nulls_first = None; // nulls first/last are not supported in MySQL
63+
order_by.options.nulls_first = None; // nulls first/last are not supported in MySQL
6264
}
6365
}
6466
}
@@ -83,11 +85,11 @@ mod test {
8385
#[test]
8486
fn test_remove_frame_clause() {
8587
let mut func = Function {
86-
name: ObjectName(vec![Ident {
88+
name: ObjectName(vec![ObjectNamePart::Identifier(Ident {
8789
value: "RANK".to_string(),
8890
quote_style: None,
8991
span: Span::empty(),
90-
}]),
92+
})]),
9193
args: ast::FunctionArguments::None,
9294
over: Some(WindowType::WindowSpec(ast::WindowSpec {
9395
window_name: None,
@@ -96,8 +98,10 @@ mod test {
9698
expr: sqlparser::ast::Expr::Wildcard(AttachedToken(TokenWithSpan::wrap(
9799
Token::Char('*'),
98100
))),
99-
asc: None,
100-
nulls_first: Some(true),
101+
options: sqlparser::ast::OrderByOptions {
102+
asc: None,
103+
nulls_first: Some(true),
104+
},
101105
with_fill: None,
102106
}],
103107
window_frame: Some(WindowFrame {
@@ -120,8 +124,10 @@ mod test {
120124
expr: sqlparser::ast::Expr::Wildcard(AttachedToken(TokenWithSpan::wrap(
121125
Token::Char('*'),
122126
))),
123-
asc: None,
124-
nulls_first: Some(true),
127+
options: sqlparser::ast::OrderByOptions {
128+
asc: None,
129+
nulls_first: Some(true),
130+
},
125131
with_fill: None,
126132
}],
127133
window_frame: None,
@@ -135,11 +141,11 @@ mod test {
135141
#[test]
136142
fn test_remove_nulls_first_last() {
137143
let mut func = Function {
138-
name: ObjectName(vec![Ident {
144+
name: ObjectName(vec![ObjectNamePart::Identifier(Ident {
139145
value: "RANK".to_string(),
140146
quote_style: None,
141147
span: Span::empty(),
142-
}]),
148+
})]),
143149
args: sqlparser::ast::FunctionArguments::None,
144150
over: Some(WindowType::WindowSpec(sqlparser::ast::WindowSpec {
145151
window_name: None,
@@ -148,8 +154,10 @@ mod test {
148154
expr: sqlparser::ast::Expr::Wildcard(AttachedToken(TokenWithSpan::wrap(
149155
Token::Char('*'),
150156
))),
151-
asc: None,
152-
nulls_first: Some(true),
157+
options: sqlparser::ast::OrderByOptions {
158+
asc: None,
159+
nulls_first: Some(true),
160+
},
153161
with_fill: None,
154162
}],
155163
window_frame: Some(WindowFrame {
@@ -172,8 +180,10 @@ mod test {
172180
expr: sqlparser::ast::Expr::Wildcard(AttachedToken(TokenWithSpan::wrap(
173181
Token::Char('*'),
174182
))),
175-
asc: None,
176-
nulls_first: None,
183+
options: sqlparser::ast::OrderByOptions {
184+
asc: None,
185+
nulls_first: None,
186+
},
177187
with_fill: None,
178188
}],
179189
window_frame: Some(WindowFrame {

core/src/mysql/write.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,13 @@ use crate::util::retriable_error::check_and_mark_retriable_error;
44
use crate::util::{constraints, to_datafusion_error};
55
use async_trait::async_trait;
66
use datafusion::arrow::datatypes::SchemaRef;
7+
use datafusion::datasource::sink::{DataSink, DataSinkExec};
78
use datafusion::{
89
catalog::Session,
910
datasource::{TableProvider, TableType},
1011
execution::{SendableRecordBatchStream, TaskContext},
1112
logical_expr::{dml::InsertOp, Expr},
12-
physical_plan::{
13-
insert::{DataSink, DataSinkExec},
14-
metrics::MetricsSet,
15-
DisplayAs, DisplayFormatType, ExecutionPlan,
16-
},
13+
physical_plan::{metrics::MetricsSet, DisplayAs, DisplayFormatType, ExecutionPlan},
1714
};
1815
use futures::StreamExt;
1916
use mysql_async::TxOpts;

0 commit comments

Comments
 (0)