Skip to content

Commit 00d1b28

Browse files
authored
Upgrade to DataFusion 53 (#559)
* Upgrade to DataFusion 53 * Update dependencies * Fix merge * Update dependencies * Update dependencies * Fix clippy * Update Rust toolchain * Fix MongoDBExec * Fix merge * Update pyproject.toml --------- Signed-off-by: Nuno Faria <nunofpfaria@gmail.com>
1 parent 285b317 commit 00d1b28

22 files changed

Lines changed: 332 additions & 304 deletions

Cargo.lock

Lines changed: 272 additions & 260 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,25 @@ license = "Apache-2.0"
2828
description = "Extend the capabilities of DataFusion to support additional data sources via implementations of the `TableProvider` trait."
2929

3030
[workspace.dependencies]
31-
arrow = "57.2"
32-
arrow-array = { version = "57.2" }
33-
arrow-flight = { version = "57.2", features = [
31+
arrow = "58.0"
32+
arrow-array = { version = "58.0" }
33+
arrow-flight = { version = "58.0", features = [
3434
"flight-sql-experimental",
3535
"tls-ring",
3636
] }
37-
arrow-ipc = { version = "57.2" }
38-
arrow-schema = { version = "57.2", features = ["serde"] }
39-
arrow-json = "57.2"
37+
arrow-ipc = { version = "58.0" }
38+
arrow-schema = { version = "58.0", features = ["serde"] }
39+
arrow-json = "58.0"
4040
arrow-odbc = { version = "23.1" }
41-
datafusion = { version = "52.0", default-features = false }
42-
datafusion-expr = { version = "52.0" }
43-
datafusion-federation = { version = "0.4" }
44-
datafusion-ffi = { version = "52.0" }
45-
datafusion-proto = { version = "52.0" }
46-
datafusion-physical-expr = { version = "52.0" }
47-
datafusion-physical-plan = { version = "52.0" }
48-
datafusion-python = { version = "52.0" }
41+
datafusion = { version = "53.0", default-features = false }
42+
datafusion-expr = { version = "53.0" }
43+
datafusion-federation = { version = "0.5.3" }
44+
datafusion-ffi = { version = "53.0" }
45+
datafusion-proto = { version = "53.0" }
46+
datafusion-physical-expr = { version = "53.0" }
47+
datafusion-physical-plan = { version = "53.0" }
48+
datafusion-python = { version = "53.0" }
4949
datafusion-table-providers = { path = "core" }
50-
duckdb = { version = "=1.3", package = "spiceai_duckdb_fork" } # Forked to add support for duckdb_scan_arrow, pending: https://github.com/duckdb/duckdb-rs/pull/488
50+
duckdb = { version = "=1.4.4", package = "spiceai_duckdb_fork" } # Forked to add support for duckdb_scan_arrow, pending: https://github.com/duckdb/duckdb-rs/pull/488
5151
adbc_core = { version = "0.23" }
5252
adbc_driver_manager = { version = "0.23" }

core/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ bigdecimal = "0.4"
4646
byteorder = "1.5"
4747
bytes = { version = "1.11", optional = true }
4848
byte-unit = { version = "5.2", optional = true }
49-
chrono = "0.4"
49+
chrono = "0.4.44"
5050
clickhouse = { version = "0.14", optional = true }
5151
dashmap = "6.1"
52-
libduckdb-sys = { version = "=1.3", optional = true }
52+
libduckdb-sys = { version = "=1.4.4", optional = true }
5353
dyn-clone = { version = "1.0", optional = true }
5454
fallible-iterator = "0.3"
5555
fundu = "2.0"
@@ -69,7 +69,7 @@ num-traits = { version = "0.2", optional = true }
6969
odbc-api = { version = "24.1", optional = true }
7070
pem = { version = "3.0", optional = true }
7171
postgres-native-tls = { version = "0.5.0", optional = true }
72-
prost = { version = "=0.14", optional = true }
72+
prost = { version = "=0.14.3", optional = true }
7373
rand = { version = "0.9" }
7474
regex = { version = "1" }
7575
r2d2 = { version = "0.8", optional = true }

core/src/adbc/sql_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ impl<T: 'static, P: 'static> ExecutionPlan for AdbcSqlExec<T, P> {
163163
self.base_exec.schema()
164164
}
165165

166-
fn properties(&self) -> &PlanProperties {
166+
fn properties(&self) -> &Arc<PlanProperties> {
167167
self.base_exec.properties()
168168
}
169169

core/src/clickhouse/federation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::sql::db_connection_pool::{DbConnectionPool, JoinPushDown};
33
use crate::sql::sql_provider_datafusion::{get_stream, to_execution_error};
44
use arrow::datatypes::SchemaRef;
55
use async_trait::async_trait;
6+
use datafusion::physical_expr::PhysicalExpr;
67
use datafusion::sql::unparser::dialect::Dialect;
78
use datafusion_federation::sql::{
89
AstAnalyzer, RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
@@ -71,6 +72,7 @@ impl SQLExecutor for ClickHouseTable {
7172
&self,
7273
query: &str,
7374
schema: SchemaRef,
75+
_filters: &[Arc<dyn PhysicalExpr>],
7476
) -> DataFusionResult<SendableRecordBatchStream> {
7577
let fut = get_stream(self.pool.clone(), query.to_string(), Arc::clone(&schema));
7678
let stream = futures::stream::once(fut).try_flatten();

core/src/duckdb/federation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::sql::db_connection_pool::dbconnection::{get_schema, Error as DbError};
22
use crate::sql::sql_provider_datafusion::{get_stream, to_execution_error};
33
use datafusion::arrow::datatypes::SchemaRef;
4+
use datafusion::physical_expr::PhysicalExpr;
45
use datafusion::sql::unparser::dialect::Dialect;
56
use datafusion_federation::sql::{
67
RemoteTableRef, SQLExecutor, SQLFederationProvider, SQLTableSource,
@@ -64,6 +65,7 @@ impl<T, P> SQLExecutor for DuckDBTable<T, P> {
6465
&self,
6566
query: &str,
6667
schema: SchemaRef,
68+
_filters: &[Arc<dyn PhysicalExpr>],
6769
) -> DataFusionResult<SendableRecordBatchStream> {
6870
let fut = get_stream(
6971
self.base_table.clone_pool(),

core/src/duckdb/sql_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ impl<T: 'static, P: 'static> ExecutionPlan for DuckSqlExec<T, P> {
175175
self.base_exec.schema()
176176
}
177177

178-
fn properties(&self) -> &PlanProperties {
178+
fn properties(&self) -> &Arc<PlanProperties> {
179179
self.base_exec.properties()
180180
}
181181

core/src/flight/exec.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use tonic::metadata::{AsciiMetadataKey, MetadataMap};
4848
#[derive(Clone, Debug)]
4949
pub(crate) struct FlightExec {
5050
config: FlightConfig,
51-
plan_properties: PlanProperties,
51+
plan_properties: Arc<PlanProperties>,
5252
metadata_map: Arc<MetadataMap>,
5353
}
5454

@@ -90,12 +90,12 @@ impl From<FlightConfig> for FlightExec {
9090
} else {
9191
Boundedness::Bounded
9292
};
93-
let plan_properties = PlanProperties::new(
93+
let plan_properties = Arc::new(PlanProperties::new(
9494
EquivalenceProperties::new(config.schema.clone()),
9595
Partitioning::UnknownPartitioning(config.partitions.len()),
9696
EmissionType::Incremental,
9797
exec_mode,
98-
);
98+
));
9999
let mut mm = MetadataMap::new();
100100
for (k, v) in config.properties.grpc_headers.iter() {
101101
let key = AsciiMetadataKey::from_str(k.as_str()).expect("invalid header name");
@@ -292,7 +292,7 @@ impl ExecutionPlan for FlightExec {
292292
self
293293
}
294294

295-
fn properties(&self) -> &PlanProperties {
295+
fn properties(&self) -> &Arc<PlanProperties> {
296296
&self.plan_properties
297297
}
298298

core/src/mongodb/table.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ struct MongoDBExec {
106106
filters_doc: Document,
107107
sort_doc: Document,
108108
limit: Option<i32>,
109-
properties: PlanProperties,
109+
properties: Arc<PlanProperties>,
110110
}
111111

112112
impl MongoDBExec {
@@ -159,12 +159,12 @@ impl MongoDBExec {
159159
filters_doc: mongo_filters_doc,
160160
sort_doc: Document::new(),
161161
limit,
162-
properties: PlanProperties::new(
162+
properties: Arc::new(PlanProperties::new(
163163
EquivalenceProperties::new(projected_schema),
164164
Partitioning::UnknownPartitioning(1),
165165
EmissionType::Final,
166166
Boundedness::Bounded,
167-
),
167+
)),
168168
})
169169
}
170170
}
@@ -213,7 +213,7 @@ impl ExecutionPlan for MongoDBExec {
213213
Arc::clone(&self.projected_schema)
214214
}
215215

216-
fn properties(&self) -> &PlanProperties {
216+
fn properties(&self) -> &Arc<PlanProperties> {
217217
&self.properties
218218
}
219219

@@ -259,7 +259,8 @@ impl ExecutionPlan for MongoDBExec {
259259
Arc::clone(&self.projected_schema),
260260
vec![order.to_vec()],
261261
);
262-
new_exec.properties = new_exec.properties.with_eq_properties(eq_properties);
262+
new_exec.properties =
263+
Arc::new(PlanProperties::clone(&new_exec.properties).with_eq_properties(eq_properties));
263264

264265
// Return Inexact rather than Exact so DataFusion keeps the SortExec wrapper
265266
// above us. Exact would replace the SortExec with `inner`, which loses the

core/src/mysql/federation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::sql::db_connection_pool::dbconnection::{get_schema, Error as DbError}
22
use crate::sql::sql_provider_datafusion::{get_stream, to_execution_error};
33
use arrow::datatypes::SchemaRef;
44
use async_trait::async_trait;
5+
use datafusion::physical_expr::PhysicalExpr;
56
use datafusion::sql::sqlparser::ast::{self, VisitMut};
67
use datafusion::sql::unparser::dialect::Dialect;
78
use datafusion_federation::sql::{
@@ -84,6 +85,7 @@ impl SQLExecutor for MySQLTable {
8485
&self,
8586
query: &str,
8687
schema: SchemaRef,
88+
_filters: &[Arc<dyn PhysicalExpr>],
8789
) -> DataFusionResult<SendableRecordBatchStream> {
8890
let fut = get_stream(
8991
self.base_table.clone_pool(),

0 commit comments

Comments
 (0)