Skip to content

Commit d3440d0

Browse files
Upgrade DF to 45 and Arrow to 54
1 parent 7fd0a14 commit d3440d0

10 files changed

Lines changed: 92 additions & 41 deletions

File tree

Cargo.toml

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ edition = "2021"
66
repository = "https://github.com/datafusion-contrib/datafusion-table-providers"
77

88
[dependencies]
9-
arrow = "53"
10-
arrow-array = { version = "53", optional = true }
11-
arrow-flight = { version = "53", optional = true, features = ["flight-sql-experimental", "tls"] }
12-
arrow-schema = { version = "53", optional = true, features = ["serde"] }
13-
arrow-json = "53"
9+
arrow = "54"
10+
arrow-array = { version = "54", optional = true }
11+
arrow-flight = { version = "54", optional = true, features = ["flight-sql-experimental", "tls"] }
12+
arrow-schema = { version = "54", optional = true, features = ["serde"] }
13+
arrow-json = "54"
1414
async-stream = { version = "0.3.5", optional = true }
1515
async-trait = "0.1.80"
1616
num-bigint = "0.4.4"
@@ -21,11 +21,11 @@ bigdecimal = "0.4.5"
2121
byteorder = "1.5.0"
2222
# pending next arrow-rs release which resolves chrono's quarter() conflict https://github.com/apache/arrow-rs/commit/2fddf85afcd20110ce783ed5b4cdeb82293da30b
2323
chrono = "=0.4.39"
24-
datafusion = "43"
25-
datafusion-expr = { version = "43", optional = true }
26-
datafusion-physical-expr = { version = "43", optional = true }
27-
datafusion-physical-plan = { version = "43", optional = true }
28-
datafusion-proto = { version = "43", optional = true }
24+
datafusion = "45"
25+
datafusion-expr = { version = "45", optional = true }
26+
datafusion-physical-expr = { version = "45", optional = true }
27+
datafusion-physical-plan = { version = "45", optional = true }
28+
datafusion-proto = { version = "45", optional = true }
2929
duckdb = { version = "1", features = [
3030
"bundled",
3131
"r2d2",
@@ -60,7 +60,7 @@ pem = { version = "3.0.4", optional = true }
6060
tokio-rusqlite = { version = "0.5.1", optional = true }
6161
tonic = { version = "0.12.2", optional = true }
6262
datafusion-federation = "0.1"
63-
datafusion-federation-sql = { git = "https://github.com/spiceai/datafusion-federation.git", rev = "8b373f95e3fa0eaa4f13b93435275acc4096bf2f" }
63+
datafusion-federation-sql = { git = "https://github.com/spiceai/datafusion-federation.git", rev = "1c814422cfbdeb7f7bd2399e3369656710413aa8" }
6464
itertools = "0.13.0"
6565
dyn-clone = { version = "1.0.17", optional = true }
6666
geo-types = "0.7.13"
@@ -78,7 +78,7 @@ rstest = "0.22.0"
7878
geozero = { version = "0.13.0", features = ["with-wkb"] }
7979
tokio-stream = { version = "0.1.15", features = ["net"] }
8080
insta = { version = "1.40.0", features = ["filters"] }
81-
datafusion-physical-plan = { version = "43" }
81+
datafusion-physical-plan = { version = "45" }
8282
tempfile = "3.8.1"
8383

8484
[features]
@@ -104,11 +104,11 @@ sqlite-federation = ["sqlite"]
104104
postgres-federation = ["postgres"]
105105

106106
[patch.crates-io]
107-
datafusion-federation = { git = "https://github.com/spiceai/datafusion-federation.git", rev = "8b373f95e3fa0eaa4f13b93435275acc4096bf2f" }
108-
duckdb = { git = "https://github.com/spiceai/duckdb-rs.git", rev = "bf6d04933ad9d69108567418303b60aa227d93cc" } # spiceai-1.1.3-backported
107+
datafusion-federation = { git = "https://github.com/spiceai/datafusion-federation.git", rev = "1c814422cfbdeb7f7bd2399e3369656710413aa8" } # spiceai-45
108+
duckdb = { git = "https://github.com/spiceai/duckdb-rs.git", rev = "2e24b958e44ec7419290249e27a15f1a19703fff" } # spiceai-1.1.3-backported-arrow-54
109109

110-
datafusion = { git = "https://github.com/spiceai/datafusion.git", rev = "37f0f144650db9e07a09c02fdbb69179438316be"}
111-
datafusion-expr = { git = "https://github.com/spiceai/datafusion.git", rev = "37f0f144650db9e07a09c02fdbb69179438316be"}
112-
datafusion-physical-expr = { git = "https://github.com/spiceai/datafusion.git", rev = "37f0f144650db9e07a09c02fdbb69179438316be"}
113-
datafusion-physical-plan = { git = "https://github.com/spiceai/datafusion.git", rev = "37f0f144650db9e07a09c02fdbb69179438316be"}
114-
datafusion-proto = { git = "https://github.com/spiceai/datafusion.git", rev = "37f0f144650db9e07a09c02fdbb69179438316be"}
110+
datafusion = { git = "https://github.com/spiceai/datafusion.git", rev = "e0e423e9e29a711d076892865d51747b06429d1b"} # spiceai-45
111+
datafusion-expr = { git = "https://github.com/spiceai/datafusion.git", rev = "e0e423e9e29a711d076892865d51747b06429d1b"} # spiceai-45
112+
datafusion-physical-expr = { git = "https://github.com/spiceai/datafusion.git", rev = "e0e423e9e29a711d076892865d51747b06429d1b"} # spiceai-45
113+
datafusion-physical-plan = { git = "https://github.com/spiceai/datafusion.git", rev = "e0e423e9e29a711d076892865d51747b06429d1b"} # spiceai-45
114+
datafusion-proto = { git = "https://github.com/spiceai/datafusion.git", rev = "e0e423e9e29a711d076892865d51747b06429d1b"} # spiceai-45

src/duckdb/creator.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,7 @@ pub(crate) mod tests {
840840
Arc::clone(&table_definition),
841841
*overwrite,
842842
None,
843+
table_definition.schema(),
843844
);
844845
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
845846
let rows_written = data_sink
@@ -953,6 +954,7 @@ pub(crate) mod tests {
953954
Arc::clone(&table_definition),
954955
*overwrite,
955956
None,
957+
table_definition.schema(),
956958
);
957959
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
958960
let rows_written = data_sink

src/duckdb/write.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,8 @@ impl TableProvider for DuckDBTableWriter {
173173
Arc::clone(&self.table_definition),
174174
overwrite,
175175
self.on_conflict.clone(),
176+
self.schema(),
176177
)),
177-
self.schema(),
178178
None,
179179
)) as _)
180180
}
@@ -186,6 +186,7 @@ pub(crate) struct DuckDBDataSink {
186186
table_definition: Arc<TableDefinition>,
187187
overwrite: InsertOp,
188188
on_conflict: Option<OnConflict>,
189+
schema: SchemaRef,
189190
}
190191

191192
#[async_trait]
@@ -198,6 +199,10 @@ impl DataSink for DuckDBDataSink {
198199
None
199200
}
200201

202+
fn schema(&self) -> &SchemaRef {
203+
&self.schema
204+
}
205+
201206
async fn write_all(
202207
&self,
203208
mut data: SendableRecordBatchStream,
@@ -295,12 +300,14 @@ impl DuckDBDataSink {
295300
table_definition: Arc<TableDefinition>,
296301
overwrite: InsertOp,
297302
on_conflict: Option<OnConflict>,
303+
schema: SchemaRef,
298304
) -> Self {
299305
Self {
300306
pool,
301307
table_definition,
302308
overwrite,
303309
on_conflict,
310+
schema,
304311
}
305312
}
306313
}
@@ -658,6 +665,7 @@ mod test {
658665
Arc::clone(&table_definition),
659666
InsertOp::Overwrite,
660667
None,
668+
table_definition.schema(),
661669
);
662670
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
663671

@@ -757,6 +765,7 @@ mod test {
757765
Arc::clone(&table_definition),
758766
InsertOp::Overwrite,
759767
None,
768+
table_definition.schema(),
760769
);
761770
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
762771

@@ -862,6 +871,7 @@ mod test {
862871
Arc::clone(&table_definition),
863872
InsertOp::Overwrite,
864873
None,
874+
table_definition.schema(),
865875
);
866876
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
867877

@@ -961,6 +971,7 @@ mod test {
961971
Arc::clone(&table_definition),
962972
InsertOp::Append,
963973
None,
974+
table_definition.schema(),
964975
);
965976
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
966977

@@ -1061,6 +1072,7 @@ mod test {
10611072
Arc::clone(&table_definition),
10621073
InsertOp::Append,
10631074
None,
1075+
table_definition.schema(),
10641076
);
10651077
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
10661078

src/flight/exec.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,9 @@ use datafusion::common::Result;
3333
use datafusion::common::{project_schema, DataFusionError};
3434
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
3535
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
36+
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
3637
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
37-
use datafusion_physical_plan::{
38-
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
39-
};
38+
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
4039
use futures::{StreamExt, TryStreamExt};
4140
use serde::{Deserialize, Serialize};
4241
use tonic::metadata::{AsciiMetadataKey, MetadataMap};
@@ -81,15 +80,18 @@ impl FlightExec {
8180

8281
impl From<FlightConfig> for FlightExec {
8382
fn from(config: FlightConfig) -> Self {
84-
let exec_mode = if config.properties.unbounded_stream {
85-
ExecutionMode::Unbounded
83+
let boundedness = if config.properties.unbounded_stream {
84+
Boundedness::Unbounded {
85+
requires_infinite_memory: false,
86+
}
8687
} else {
87-
ExecutionMode::Bounded
88+
Boundedness::Bounded
8889
};
8990
let plan_properties = PlanProperties::new(
9091
EquivalenceProperties::new(config.schema.clone()),
9192
Partitioning::UnknownPartitioning(config.partitions.len()),
92-
exec_mode,
93+
EmissionType::Incremental,
94+
boundedness,
9395
);
9496
let mut mm = MetadataMap::new();
9597
for (k, v) in config.properties.grpc_headers.iter() {

src/mysql/mysql_window.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ impl MySQLWindowVisitor {
7474
mod test {
7575
use datafusion::sql::sqlparser::{
7676
self,
77-
ast::{self, ObjectName, WindowFrame},
77+
ast::{self, helpers::attached_token::AttachedToken, ObjectName, WindowFrame},
78+
tokenizer::Span,
7879
};
7980

8081
use super::*;
@@ -85,13 +86,14 @@ mod test {
8586
name: ObjectName(vec![Ident {
8687
value: "RANK".to_string(),
8788
quote_style: None,
89+
span: Span::empty(),
8890
}]),
8991
args: ast::FunctionArguments::None,
9092
over: Some(WindowType::WindowSpec(ast::WindowSpec {
9193
window_name: None,
9294
partition_by: vec![],
9395
order_by: vec![sqlparser::ast::OrderByExpr {
94-
expr: sqlparser::ast::Expr::Wildcard,
96+
expr: sqlparser::ast::Expr::Wildcard(AttachedToken::empty()),
9597
asc: None,
9698
nulls_first: Some(true),
9799
with_fill: None,
@@ -106,13 +108,14 @@ mod test {
106108
null_treatment: None,
107109
filter: None,
108110
within_group: vec![],
111+
uses_odbc_syntax: false,
109112
};
110113

111114
let expected = Some(WindowType::WindowSpec(sqlparser::ast::WindowSpec {
112115
window_name: None,
113116
partition_by: vec![],
114117
order_by: vec![sqlparser::ast::OrderByExpr {
115-
expr: sqlparser::ast::Expr::Wildcard,
118+
expr: sqlparser::ast::Expr::Wildcard(AttachedToken::empty()),
116119
asc: None,
117120
nulls_first: Some(true),
118121
with_fill: None,
@@ -131,13 +134,14 @@ mod test {
131134
name: ObjectName(vec![Ident {
132135
value: "RANK".to_string(),
133136
quote_style: None,
137+
span: Span::empty(),
134138
}]),
135139
args: sqlparser::ast::FunctionArguments::None,
136140
over: Some(WindowType::WindowSpec(sqlparser::ast::WindowSpec {
137141
window_name: None,
138142
partition_by: vec![],
139143
order_by: vec![sqlparser::ast::OrderByExpr {
140-
expr: sqlparser::ast::Expr::Wildcard,
144+
expr: sqlparser::ast::Expr::Wildcard(AttachedToken::empty()),
141145
asc: None,
142146
nulls_first: Some(true),
143147
with_fill: None,
@@ -152,13 +156,14 @@ mod test {
152156
null_treatment: None,
153157
filter: None,
154158
within_group: vec![],
159+
uses_odbc_syntax: false,
155160
};
156161

157162
let expected = Some(WindowType::WindowSpec(sqlparser::ast::WindowSpec {
158163
window_name: None,
159164
partition_by: vec![],
160165
order_by: vec![sqlparser::ast::OrderByExpr {
161-
expr: sqlparser::ast::Expr::Wildcard,
166+
expr: sqlparser::ast::Expr::Wildcard(AttachedToken::empty()),
162167
asc: None,
163168
nulls_first: None,
164169
with_fill: None,

src/postgres/write.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ impl TableProvider for PostgresTableWriter {
9191
Arc::clone(&self.postgres),
9292
overwrite,
9393
self.on_conflict.clone(),
94+
self.schema(),
9495
)),
95-
self.schema(),
9696
None,
9797
)) as _)
9898
}
@@ -103,6 +103,7 @@ struct PostgresDataSink {
103103
postgres: Arc<Postgres>,
104104
overwrite: InsertOp,
105105
on_conflict: Option<OnConflict>,
106+
schema: SchemaRef,
106107
}
107108

108109
#[async_trait]
@@ -115,6 +116,10 @@ impl DataSink for PostgresDataSink {
115116
None
116117
}
117118

119+
fn schema(&self) -> &SchemaRef {
120+
&self.schema
121+
}
122+
118123
async fn write_all(
119124
&self,
120125
mut data: SendableRecordBatchStream,
@@ -221,11 +226,17 @@ impl DataSink for PostgresDataSink {
221226
}
222227

223228
impl PostgresDataSink {
224-
fn new(postgres: Arc<Postgres>, overwrite: InsertOp, on_conflict: Option<OnConflict>) -> Self {
229+
fn new(
230+
postgres: Arc<Postgres>,
231+
overwrite: InsertOp,
232+
on_conflict: Option<OnConflict>,
233+
schema: SchemaRef,
234+
) -> Self {
225235
Self {
226236
postgres,
227237
overwrite,
228238
on_conflict,
239+
schema,
229240
}
230241
}
231242
}

src/sql/sql_provider_datafusion/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ use datafusion::{
3131
logical_expr::{Expr, TableProviderFilterPushDown, TableType},
3232
physical_expr::EquivalenceProperties,
3333
physical_plan::{
34-
stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionMode,
35-
ExecutionPlan, Partitioning, PlanProperties, SendableRecordBatchStream,
34+
execution_plan::{Boundedness, EmissionType},
35+
stream::RecordBatchStreamAdapter,
36+
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
37+
SendableRecordBatchStream,
3638
},
3739
sql::TableReference,
3840
};
@@ -260,7 +262,8 @@ impl<T, P> SqlExec<T, P> {
260262
properties: PlanProperties::new(
261263
EquivalenceProperties::new(projected_schema),
262264
Partitioning::UnknownPartitioning(1),
263-
ExecutionMode::Bounded,
265+
EmissionType::Incremental,
266+
Boundedness::Bounded,
264267
),
265268
engine,
266269
})

src/sqlite/sqlite_interval.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ impl SQLiteIntervalVisitor {
240240
over: None,
241241
within_group: Vec::new(),
242242
parameters: ast::FunctionArguments::None,
243+
uses_odbc_syntax: false,
243244
});
244245

245246
Expr::Cast {
@@ -409,6 +410,7 @@ mod test {
409410
over: None,
410411
within_group: Vec::new(),
411412
parameters: ast::FunctionArguments::None,
413+
uses_odbc_syntax: false,
412414
})),
413415
data_type: ast::DataType::Text,
414416
format: None,
@@ -458,6 +460,7 @@ mod test {
458460
over: None,
459461
within_group: Vec::new(),
460462
parameters: ast::FunctionArguments::None,
463+
uses_odbc_syntax: false,
461464
})),
462465
data_type: ast::DataType::Text,
463466
format: None,

0 commit comments

Comments
 (0)