Skip to content

Commit e67eef9

Browse files
authored
Merge branch 'spiceai' into lukim/skip-constraint-validation-on-overwrite-v2
2 parents 484994b + c9363be commit e67eef9

12 files changed

Lines changed: 1825 additions & 156 deletions

File tree

Cargo.lock

Lines changed: 10 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,50 @@ datafusion-table-providers = { path = "core" }
5050
duckdb = "1.3.2" # Forked to add support for duckdb_scan_arrow, pending: https://github.com/duckdb/duckdb-rs/pull/488
5151
adbc_core = { version = "0.20.0" }
5252
adbc_driver_manager = { version = "0.20.0" }
53+
parquet = "56"
54+
rusqlite = "0.37"
55+
tokio-rusqlite = "0.7.0"
56+
async-stream = "0.3.5"
57+
async-trait = "0.1.80"
58+
bigdecimal = "0.4.5"
59+
byteorder = "1.5.0"
60+
chrono = "0.4"
61+
futures = "0.3.30"
62+
itertools = "0.14.0"
63+
num-bigint = "0.4"
64+
secrecy = "0.10.3"
65+
serde = { version = "1.0.209", features = ["derive"] }
66+
serde_json = "1.0.124"
67+
snafu = "0.8.3"
68+
time = "0.3.36"
69+
tokio = { version = "1.46", features = ["macros", "fs"] }
70+
tracing = "0.1.40"
71+
url = "2.5.1"
72+
rand = "0.9"
73+
regex = "1"
74+
uuid = "1.9.1"
75+
prost = "0.14.1"
76+
tonic = { version = "0.13", features = ["tls-native-roots", "tls-webpki-roots"] }
77+
bytes = "1.7.1"
78+
base64 = "0.22.1"
79+
dashmap = "6.1.0"
80+
fallible-iterator = "0.3.0"
81+
fundu = "2.0.1"
82+
geo-types = "0.7.13"
83+
sha2 = "0.10"
84+
hickory-resolver = "0.25.2"
85+
sea-query = { git = "https://github.com/spiceai/sea-query.git", rev = "213b6b876068f58159ebdd5852604a021afaebf9", features = [
86+
"backend-sqlite",
87+
"backend-postgres",
88+
"postgres-array",
89+
"with-rust_decimal",
90+
"with-bigdecimal",
91+
"with-time",
92+
"with-chrono",
93+
] }
5394

5495
[patch.crates-io]
55-
datafusion-federation = { git = "https://github.com/spiceai/datafusion-federation.git", rev = "007c82d039220478f406d648417d28c0d40a7987" } # spiceai-50
96+
datafusion-federation = { git = "https://github.com/spiceai/datafusion-federation.git", rev = "afcbd1ce99703f1322d176fc1b99745f647234d0" } # spiceai-50
5697
duckdb = { git = "https://github.com/spiceai/duckdb-rs.git", rev = "dd02045c3aa77895723e873222cbe30f5c8f77a9" } # spiceai-1.3.2
5798

5899
datafusion = { git = "https://github.com/spiceai/datafusion.git", rev = "41d08054e37e9cdca9b760c16fc2b97a21893af2" } # spiceai-50
@@ -71,3 +112,5 @@ arrow-ipc = { git = "https://github.com/spiceai/arrow-rs.git", rev = "9f9c372ff5
71112
arrow-ord = { git = "https://github.com/spiceai/arrow-rs.git", rev = "9f9c372ff5744488226462e3ea7d94ff47909833" } # spiceai-56
72113
arrow-schema = { git = "https://github.com/spiceai/arrow-rs.git", rev = "9f9c372ff5744488226462e3ea7d94ff47909833" } # spiceai-56
73114
parquet = { git = "https://github.com/spiceai/arrow-rs.git", rev = "9f9c372ff5744488226462e3ea7d94ff47909833" } # spiceai-56
115+
116+
rusqlite = { git = "https://github.com/spiceai/rusqlite.git", rev = "3d1f5f6f6d6d062676210d095df45eafa6e19fd8" } # spiceai-v0.37.0

core/Cargo.toml

Lines changed: 38 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,18 @@ arrow-flight = { workspace = true, optional = true, features = [
1818
arrow-schema = { workspace = true, optional = true, features = ["serde"] }
1919
arrow-json = { workspace = true }
2020
arrow-odbc = { workspace = true, optional = true }
21-
async-stream = { version = "0.3.5", optional = true }
22-
async-trait = "0.1.80"
23-
base64 = { version = "0.22.1", optional = true }
21+
async-stream = { workspace = true, optional = true }
22+
async-trait = { workspace = true }
23+
base64 = { workspace = true, optional = true }
2424
bb8 = { version = "0.9.0", optional = true }
2525
bb8-postgres = { version = "0.9.0", optional = true }
26-
bigdecimal = "0.4.5"
27-
byteorder = "1.5.0"
28-
bytes = { version = "1.7.1", optional = true }
26+
bigdecimal = { workspace = true }
27+
byteorder = { workspace = true }
28+
bytes = { workspace = true, optional = true }
2929
byte-unit = { version = "5.1.4", optional = true }
30-
chrono = "0.4"
30+
chrono = { workspace = true }
3131
clickhouse = { version = "0.13.3", optional = true }
32-
dashmap = "6.1.0"
32+
dashmap = { workspace = true }
3333
datafusion = { workspace = true, default-features = false }
3434
datafusion-expr = { workspace = true, optional = true }
3535
datafusion-federation = { workspace = true, features = ["sql"], optional = true }
@@ -45,11 +45,11 @@ duckdb = { workspace = true, features = [
4545
], optional = true }
4646
libduckdb-sys = { version = "=1.3.0", optional = true }
4747
dyn-clone = { version = "1.0.17", optional = true }
48-
fallible-iterator = "0.3.0"
49-
fundu = "2.0.1"
50-
futures = "0.3.30"
51-
geo-types = "0.7.13"
52-
itertools = "0.14.0"
48+
fallible-iterator = { workspace = true }
49+
fundu = { workspace = true }
50+
futures = { workspace = true }
51+
geo-types = { workspace = true }
52+
itertools = { workspace = true }
5353
mongodb = { version = "3.2.2", features = ["openssl-tls"], optional = true }
5454
mysql_async = { version = "0.36", features = [
5555
"native-tls-tls",
@@ -58,49 +58,39 @@ mysql_async = { version = "0.36", features = [
5858
"bigdecimal",
5959
], optional = true }
6060
native-tls = { version = "0.2.11", optional = true }
61-
num-bigint = "0.4"
61+
num-bigint = { workspace = true }
6262
num-traits = { version = "0.2", optional = true }
6363
odbc-api = { version = "19.0", optional = true }
64+
once_cell = "1.21"
6465
pem = { version = "3.0.4", optional = true }
6566
postgres-native-tls = { version = "0.5.0", optional = true }
66-
prost = { version = "0.14.1", optional = true }
67-
rand = { version = "0.9" }
68-
regex = { version = "1" }
67+
prost = { workspace = true, optional = true }
68+
rand = { workspace = true }
69+
regex = { workspace = true }
6970
r2d2 = { version = "0.8", optional = true }
7071
r2d2_adbc = { version = "0.1.0", optional = true }
71-
rusqlite = { version = "0.37", optional = true }
72+
rusqlite = { workspace = true, optional = true }
7273
rust_decimal = { version = "1.38.0", features = ["db-postgres"], optional = true }
73-
sea-query = { version = "0.32", features = [
74-
"backend-sqlite",
75-
"backend-postgres",
76-
"postgres-array",
77-
"with-rust_decimal",
78-
"with-bigdecimal",
79-
"with-time",
80-
"with-chrono",
81-
] }
82-
secrecy = "0.10.3"
83-
serde = { version = "1.0.209", features = ["derive"] }
84-
serde_json = "1.0.124"
85-
sha2 = "0.10"
86-
snafu = "0.8.3"
87-
time = "0.3.36"
88-
tokio = { version = "1.46", features = ["macros", "fs"] }
74+
sea-query = { workspace = true }
75+
secrecy = { workspace = true }
76+
serde = { workspace = true }
77+
serde_json = { workspace = true }
78+
sha2 = { workspace = true }
79+
snafu = { workspace = true }
80+
time = { workspace = true }
81+
tokio = { workspace = true }
8982
tokio-postgres = { version = "0.7.10", features = [
9083
"with-chrono-0_4",
9184
"with-uuid-1",
9285
"with-serde_json-1",
9386
"with-geo-types-0_7",
9487
], optional = true }
95-
tokio-rusqlite = { version = "0.7.0", optional = true }
96-
tonic = { version = "0.13", optional = true, features = [
97-
"tls-native-roots",
98-
"tls-webpki-roots",
99-
] }
100-
tracing = "0.1.40"
101-
uuid = { version = "1.9.1", optional = true }
102-
hickory-resolver = "0.25.2"
103-
url = "2.5.1"
88+
tokio-rusqlite = { workspace = true, optional = true }
89+
tonic = { workspace = true, optional = true }
90+
tracing = { workspace = true }
91+
uuid = { workspace = true, optional = true }
92+
hickory-resolver = { workspace = true }
93+
url = { workspace = true }
10494
adbc_driver_manager = { workspace = true, optional = true }
10595
adbc_core = { workspace = true, optional = true }
10696

@@ -109,9 +99,9 @@ anyhow = "1.0"
10999
bollard = "0.19"
110100
geozero = { version = "0.14.0", features = ["with-wkb"] }
111101
insta = { version = "1.43.2", features = ["filters"] }
112-
parquet = "56"
102+
parquet = { workspace = true }
113103
prost = { version = "=0.13.5" }
114-
rand = "0.9"
104+
rand = { workspace = true }
115105
reqwest = "0.12"
116106
rstest = "0.26.1"
117107
test-log = { version = "0.2", features = ["trace"] }
@@ -130,6 +120,7 @@ duckdb = [
130120
"dep:async-stream",
131121
"dep:arrow-schema",
132122
"dep:byte-unit",
123+
"dep:datafusion-physical-expr",
133124
]
134125
duckdb-federation = ["duckdb", "federation"]
135126
federation = ["dep:datafusion-federation"]
@@ -152,6 +143,7 @@ mongodb = [
152143
"dep:async-stream",
153144
"dep:arrow-schema",
154145
"dep:num-traits",
146+
"dep:rust_decimal",
155147
]
156148
mysql = ["dep:mysql_async", "dep:async-stream"]
157149
mysql-federation = ["mysql", "federation"]
@@ -173,6 +165,7 @@ postgres-federation = ["postgres", "federation"]
173165
sqlite = ["dep:rusqlite", "dep:tokio-rusqlite", "dep:arrow-schema"]
174166
sqlite-federation = ["sqlite", "federation"]
175167
sqlite-bundled = ["sqlite", "rusqlite/bundled"]
168+
sqlite-bundled-decimal = ["sqlite", "rusqlite/bundled-decimal"]
176169
adbc = [
177170
"dep:adbc_driver_manager",
178171
"dep:adbc_core",

core/src/duckdb.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,10 @@ impl TableProviderFactory for DuckDBTableProviderFactory {
435435
Mode::File => {
436436
let read_pool = pool.clone();
437437

438-
read_pool.set_attached_databases(&self.attach_databases(&options))
438+
read_pool
439+
.set_attached_databases(&self.attach_databases(&options))
440+
.context(DbConnectionPoolSnafu)
441+
.map_err(to_datafusion_error)?
439442
}
440443
Mode::Memory => pool.clone(),
441444
};

core/src/duckdb/sql_table.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use futures::TryStreamExt;
2626
use std::collections::HashMap;
2727
use std::fmt::Display;
2828
use std::{any::Any, fmt, sync::Arc};
29+
use datafusion_physical_expr::EquivalenceProperties;
2930

3031
pub struct DuckDBTable<T: 'static, P: 'static> {
3132
pub(crate) base_table: SqlTable<T, P>,
@@ -146,6 +147,7 @@ pub struct DuckSqlExec<T, P> {
146147
indexes: Vec<(ColumnReference, IndexType)>,
147148
optimized_sql: Option<String>,
148149
optimized_sql_schema: Option<SchemaRef>,
150+
optimized_sql_properties: Option<PlanProperties>,
149151
}
150152

151153
impl<T, P> Clone for DuckSqlExec<T, P> {
@@ -156,11 +158,12 @@ impl<T, P> Clone for DuckSqlExec<T, P> {
156158
indexes: self.indexes.clone(),
157159
optimized_sql: self.optimized_sql.clone(),
158160
optimized_sql_schema: self.optimized_sql_schema.clone(),
161+
optimized_sql_properties: self.optimized_sql_properties.clone(),
159162
}
160163
}
161164
}
162165

163-
impl<T, P> DuckSqlExec<T, P> {
166+
impl<T: 'static, P: 'static> DuckSqlExec<T, P> {
164167
#[allow(clippy::too_many_arguments)]
165168
fn new(
166169
projections: Option<&Vec<usize>>,
@@ -188,6 +191,7 @@ impl<T, P> DuckSqlExec<T, P> {
188191
indexes,
189192
optimized_sql: None,
190193
optimized_sql_schema: None,
194+
optimized_sql_properties: None,
191195
})
192196
}
193197

@@ -225,18 +229,26 @@ impl<T, P> DuckSqlExec<T, P> {
225229
) -> Self {
226230
self.optimized_sql = Some(sql.into());
227231
self.optimized_sql_schema = new_schema;
232+
233+
if let Some(schema) = self.optimized_sql_schema.as_ref() {
234+
let mut properties = self.base_exec.properties().clone();
235+
let eq_properties = EquivalenceProperties::new(Arc::clone(schema));
236+
properties.eq_properties = eq_properties;
237+
self.optimized_sql_properties = Some(properties);
238+
}
239+
228240
self
229241
}
230242
}
231243

232-
impl<T, P> std::fmt::Debug for DuckSqlExec<T, P> {
244+
impl<T: 'static, P: 'static> std::fmt::Debug for DuckSqlExec<T, P> {
233245
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
234246
let sql = self.sql().unwrap_or_default();
235247
write!(f, "DuckSqlExec sql={sql}")
236248
}
237249
}
238250

239-
impl<T, P> DisplayAs for DuckSqlExec<T, P> {
251+
impl<T: 'static, P: 'static> DisplayAs for DuckSqlExec<T, P> {
240252
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
241253
let sql = self.sql().unwrap_or_default();
242254
write!(f, "DuckSqlExec sql={sql}")
@@ -260,7 +272,7 @@ impl<T: 'static, P: 'static> ExecutionPlan for DuckSqlExec<T, P> {
260272
}
261273

262274
fn properties(&self) -> &PlanProperties {
263-
self.base_exec.properties()
275+
self.optimized_sql_properties.as_ref().unwrap_or(self.base_exec.properties())
264276
}
265277

266278
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {

0 commit comments

Comments
 (0)