Skip to content

Commit ebdf355

Browse files
authored
update datafusion to 52 & update deps (#525)
* update datafusion to 52 & update deps * Update byte-unit to 5.2 * python: update FFI_TableProvider::new function * Downgrade adbc to 0.21 * upgrade datafusion-python * python: Fix an issue with RawTableProvider * python/pyproject: use compatible version of maturin * Use datafusion-python 52 * Fix segfault in RawTableProvider by keeping SessionContext alive The FFI_TableProvider stores a Weak reference to the TaskContextProvider, but the Arc was created locally and dropped immediately. Use a static OnceLock to keep it alive. Also update the __datafusion_table_provider__ signature for the v52 protocol and bump datafusion dependency to >=52.0.0. * core/tests/flight: add a workaround for df.count()
1 parent a0dfd18 commit ebdf355

8 files changed

Lines changed: 1780 additions & 1289 deletions

File tree

Cargo.lock

Lines changed: 1230 additions & 724 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,25 @@ license = "Apache-2.0"
2828
description = "Extend the capabilities of DataFusion to support additional data sources via implementations of the `TableProvider` trait."
2929

3030
[workspace.dependencies]
31-
arrow = "57.0.0"
32-
arrow-array = { version = "57.0.0" }
33-
arrow-flight = { version = "57.0.0", features = [
34-
"flight-sql-experimental",
35-
"tls-ring",
31+
arrow = "57.2"
32+
arrow-array = { version = "57.2" }
33+
arrow-flight = { version = "57.2", features = [
34+
"flight-sql-experimental",
35+
"tls-ring",
3636
] }
37-
arrow-ipc = { version = "57.0.0" }
38-
arrow-schema = { version = "57.2.0", features = ["serde"] }
39-
arrow-json = "57.0.0"
40-
arrow-odbc = { version = "21.0.0" }
41-
datafusion = { version = "51", default-features = false }
42-
datafusion-expr = { version = "51" }
43-
datafusion-federation = { version = "0.4.11" }
44-
datafusion-ffi = { version = "51" }
45-
datafusion-proto = { version = "51" }
46-
datafusion-physical-expr = { version = "51" }
47-
datafusion-physical-plan = { version = "51" }
37+
arrow-ipc = { version = "57.2" }
38+
arrow-schema = { version = "57.2", features = ["serde"] }
39+
arrow-json = "57.2"
40+
arrow-odbc = { version = "21.0" }
41+
datafusion = { version = "52.0", default-features = false }
42+
datafusion-expr = { version = "52.0" }
43+
datafusion-federation = { version = "0.4" }
44+
datafusion-ffi = { version = "52.0" }
45+
datafusion-proto = { version = "52.0" }
46+
datafusion-physical-expr = { version = "52.0" }
47+
datafusion-physical-plan = { version = "52.0" }
48+
datafusion-python = { version = "52.0" }
4849
datafusion-table-providers = { path = "core" }
49-
duckdb = { version = "=1.3.2", package = "spiceai_duckdb_fork" } # Forked to add support for duckdb_scan_arrow, pending: https://github.com/duckdb/duckdb-rs/pull/488
50-
adbc_core = { version = "0.21.0" }
51-
adbc_driver_manager = { version = "0.21.0" }
50+
duckdb = { version = "=1.3", package = "spiceai_duckdb_fork" } # Forked to add support for duckdb_scan_arrow, pending: https://github.com/duckdb/duckdb-rs/pull/488
51+
adbc_core = { version = "0.21" }
52+
adbc_driver_manager = { version = "0.21" }

core/Cargo.toml

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,7 @@ arrow-flight = { workspace = true, optional = true, features = [
1818
arrow-schema = { workspace = true, optional = true, features = ["serde"] }
1919
arrow-json = { workspace = true }
2020
arrow-odbc = { workspace = true, optional = true }
21-
async-stream = { version = "0.3", optional = true }
22-
async-trait = "0.1"
23-
base64 = { version = "0.22.1", optional = true }
24-
bb8 = { version = "0.9", optional = true }
25-
bb8-postgres = { version = "0.9", optional = true }
26-
bigdecimal = "0.4"
27-
byteorder = "1.5.0"
28-
bytes = { version = "1.11.0", optional = true }
29-
byte-unit = { version = "5.1.4", optional = true }
30-
chrono = "0.4"
31-
clickhouse = { version = "0.13.3", optional = true }
32-
dashmap = "6.1.0"
21+
3322
datafusion = { workspace = true, default-features = false, features = ["sql"]}
3423
datafusion-expr = { workspace = true, optional = true }
3524
datafusion-federation = { workspace = true, features = [
@@ -45,13 +34,28 @@ duckdb = { workspace = true, features = [
4534
"vtab-arrow",
4635
"appender-arrow",
4736
], optional = true }
48-
libduckdb-sys = { version = "=1.3.0", optional = true }
37+
adbc_driver_manager = { workspace = true, optional = true }
38+
adbc_core = { workspace = true, optional = true }
39+
40+
async-stream = { version = "0.3", optional = true }
41+
async-trait = "0.1"
42+
base64 = { version = "0.22", optional = true }
43+
bb8 = { version = "0.9", optional = true }
44+
bb8-postgres = { version = "0.9", optional = true }
45+
bigdecimal = "0.4"
46+
byteorder = "1.5"
47+
bytes = { version = "1.11", optional = true }
48+
byte-unit = { version = "5.2", optional = true }
49+
chrono = "0.4"
50+
clickhouse = { version = "0.13", optional = true }
51+
dashmap = "6.1"
52+
libduckdb-sys = { version = "=1.3", optional = true }
4953
dyn-clone = { version = "1.0", optional = true }
50-
fallible-iterator = "0.3.0"
51-
fundu = "2.0.1"
54+
fallible-iterator = "0.3"
55+
fundu = "2.0"
5256
futures = "0.3"
5357
geo-types = "0.7"
54-
itertools = "0.14.0"
58+
itertools = "0.14"
5559
mysql_async = { version = "0.36", features = [
5660
"native-tls-tls",
5761
"chrono",
@@ -61,13 +65,13 @@ mysql_async = { version = "0.36", features = [
6165
native-tls = { version = "0.2", optional = true }
6266
num-bigint = "0.4"
6367
odbc-api = { version = "19.0", optional = true }
64-
pem = { version = "3.0.4", optional = true }
68+
pem = { version = "3.0", optional = true }
6569
postgres-native-tls = { version = "0.5.0", optional = true }
66-
prost = { version = "=0.14.1", optional = true }
70+
prost = { version = "=0.14", optional = true }
6771
rand = { version = "0.9" }
6872
regex = { version = "1" }
6973
r2d2 = { version = "0.8", optional = true }
70-
rusqlite = { version = "0.37.0", optional = true }
74+
rusqlite = { version = "0.37", optional = true }
7175
sea-query = { version = "0.32", features = [
7276
"backend-sqlite",
7377
"backend-postgres",
@@ -77,7 +81,7 @@ sea-query = { version = "0.32", features = [
7781
"with-time",
7882
"with-chrono",
7983
] }
80-
secrecy = "0.10.3"
84+
secrecy = "0.10"
8185
serde = { version = "1.0", features = ["derive"] }
8286
serde_json = "1.0"
8387
sha2 = "0.10"
@@ -90,18 +94,17 @@ tokio-postgres = { version = "0.7", features = [
9094
"with-serde_json-1",
9195
"with-geo-types-0_7",
9296
], optional = true }
93-
tokio-rusqlite = { version = "0.7.0", optional = true }
97+
tokio-rusqlite = { version = "0.7", optional = true }
9498
tonic = { version = "0.14", optional = true, features = [
9599
"tls-native-roots",
96100
"tls-webpki-roots",
97101
] }
98102
tracing = "0.1"
99-
trust-dns-resolver = "0.23.2"
100-
url = "2.5.4"
103+
trust-dns-resolver = "0.23"
104+
url = "2.5"
101105
uuid = { version = "1.18", optional = true }
102-
rust_decimal = { version = "1.38.0", features = ["db-postgres"] }
103-
adbc_driver_manager = { workspace = true, optional = true }
104-
adbc_core = { workspace = true, optional = true }
106+
rust_decimal = { version = "1.38", features = ["db-postgres"] }
107+
105108
r2d2_adbc = { version = "0.2.0", optional = true }
106109

107110
[dev-dependencies]
@@ -203,4 +206,4 @@ required-features = ["sqlite"]
203206
[[example]]
204207
name = "adbc"
205208
path = "examples/adbc_duckdb.rs"
206-
required-features = ["adbc"]
209+
required-features = ["adbc"]

core/tests/flight/mod.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,15 @@ async fn test_flight_sql_data_source() -> datafusion::common::Result<()> {
213213
.await
214214
.unwrap();
215215
let df = ctx.sql("select col1 from fsql").await.unwrap();
216-
assert_eq!(
217-
df.count().await.unwrap(),
218-
rows_per_partition * num_partitions
219-
);
216+
217+
let row_count: usize = df
218+
.collect()
219+
.await
220+
.unwrap()
221+
.iter()
222+
.map(|b| b.num_rows())
223+
.sum();
224+
assert_eq!(row_count, rows_per_partition * num_partitions);
220225
let df = ctx.sql("select sum(col2) from fsql").await?;
221226
let rb = df
222227
.collect()

python/Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,19 @@ doc = false
1616
[dependencies]
1717
arrow = { workspace = true }
1818
arrow-flight = { workspace = true, optional = true }
19-
datafusion = { workspace = true, features = ["pyarrow"] }
19+
datafusion = { workspace = true }
2020
datafusion-ffi = { workspace = true }
21+
datafusion-python = { workspace = true }
2122
datafusion-table-providers = { workspace = true }
22-
pyo3 = { version = "0.26.0" }
23+
duckdb = { workspace = true, optional = true }
24+
25+
pyo3 = { version = "0.26" }
2326
tokio = { version = "1.46", features = [
2427
"macros",
2528
"rt",
2629
"rt-multi-thread",
2730
"sync",
2831
] }
29-
duckdb = { workspace = true, optional = true }
3032

3133
[features]
3234
default = [

python/pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[build-system]
2-
requires = ["maturin>=1.5.1,<1.6.0"]
2+
requires = ["maturin>=1.5.1,<1.9.0"]
33
build-backend = "maturin"
44

55
[project]
@@ -8,7 +8,7 @@ version = "0.1.0"
88
description = "Build and run queries against data"
99
readme = "../README.md"
1010
license = { file = "../LICENSE" }
11-
requires-python = ">=3.9"
11+
requires-python = ">=3.10"
1212
keywords = ["datafusion", "dataframe", "rust", "query-engine"]
1313
classifier = [
1414
"Development Status :: 2 - Pre-Alpha",
@@ -24,7 +24,7 @@ classifier = [
2424
"Programming Language :: Python",
2525
"Programming Language :: Rust",
2626
]
27-
dependencies = ["datafusion>=45.0.0"]
27+
dependencies = ["datafusion>=52.0.0"]
2828

2929
[project.urls]
3030
repository = "https://github.com/datafusion-contrib/datafusion-table-providers"

python/src/lib.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
sync::{Arc, OnceLock},
44
};
55

6-
use datafusion::catalog::TableProvider;
6+
use datafusion::{catalog::TableProvider, execution::TaskContextProvider, prelude::SessionContext};
77
use datafusion_ffi::table_provider::FFI_TableProvider;
88
use pyo3::{prelude::*, types::PyCapsule};
99

@@ -19,11 +19,20 @@ pub(crate) fn get_tokio_runtime() -> &'static tokio::runtime::Runtime {
1919
RUNTIME.get_or_init(|| tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime"))
2020
}
2121

22+
/// Returns a static reference to a default SessionContext.
23+
/// This ensures the Arc backing the Weak reference inside
24+
/// FFI_TaskContextProvider is never dropped.
25+
fn get_default_session_context() -> &'static Arc<SessionContext> {
26+
static CTX: OnceLock<Arc<SessionContext>> = OnceLock::new();
27+
CTX.get_or_init(|| Arc::new(SessionContext::default()))
28+
}
29+
2230
#[pymethods]
2331
impl RawTableProvider {
2432
fn __datafusion_table_provider__<'py>(
2533
&self,
2634
py: Python<'py>,
35+
_session: Bound<'py, PyAny>,
2736
) -> PyResult<Bound<'py, PyCapsule>> {
2837
let name = CString::new("datafusion_table_provider").unwrap();
2938

@@ -33,10 +42,13 @@ impl RawTableProvider {
3342
None
3443
};
3544

45+
let ctx = Arc::clone(get_default_session_context()) as Arc<dyn TaskContextProvider>;
3646
let provider = FFI_TableProvider::new(
3747
Arc::clone(&self.table),
3848
self.supports_pushdown_filters,
3949
runtime,
50+
&ctx,
51+
None,
4052
);
4153

4254
PyCapsule::new(py, provider, Some(name.clone()))

0 commit comments

Comments
 (0)