Skip to content

Commit a37be8f

Browse files
Fixing merge issues + use spiceai_duckdb_fork
1 parent 23ab4b6 commit a37be8f

8 files changed

Lines changed: 43 additions & 66 deletions

File tree

Cargo.lock

Lines changed: 26 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,4 @@ datafusion-proto = { version = "46" }
4848
datafusion-physical-expr = { version = "46" }
4949
datafusion-physical-plan = { version = "46" }
5050
datafusion-table-providers = { path = "core" }
51-
duckdb = { version = "=1.2.1" }
51+
duckdb = { version = "=1.2.1", package = "spiceai_duckdb_fork" } # Forked to add support for duckdb_scan_arrow, pending: https://github.com/duckdb/duckdb-rs/pull/488

core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ rstest = "0.25.0"
109109
test-log = { version = "0.2", features = ["trace"] }
110110
tokio-stream = { version = "0.1", features = ["net"] }
111111
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
112+
tempfile = "3.19.1"
112113

113114
[features]
114115
duckdb = [

core/src/duckdb.rs

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::sql::sql_provider_datafusion;
2-
use crate::util::constraints::get_primary_keys_from_constraints;
32
use crate::util::{
43
self,
54
column_reference::{self, ColumnReference},
@@ -36,10 +35,7 @@ use datafusion::{
3635
use duckdb::{AccessMode, DuckdbConnectionManager};
3736
use itertools::Itertools;
3837
use snafu::prelude::*;
39-
use std::{
40-
collections::{HashMap, HashSet},
41-
sync::Arc,
42-
};
38+
use std::{collections::HashMap, sync::Arc};
4339
use tokio::sync::Mutex;
4440
use write::DuckDBTableWriterBuilder;
4541

@@ -404,29 +400,6 @@ impl TableProviderFactory for DuckDBTableProviderFactory {
404400
.with_pool(pool)
405401
.set_on_conflict(on_conflict);
406402

407-
// If the table is already created, we don't create it again and don't apply primary keys and remove previosly created indexes (if any).
408-
// Thus we verify that primary keys and indexes for the table created match the configuration.
409-
let mut table_schema_matches = true;
410-
table_schema_matches &= duckdb
411-
.verify_primary_keys_match()
412-
.await
413-
.map_err(to_datafusion_error)?;
414-
415-
table_schema_matches &= duckdb
416-
.verify_indexes_match(&indexes)
417-
.await
418-
.map_err(to_datafusion_error)?;
419-
420-
if !table_schema_matches {
421-
tracing::warn!(
422-
"Schema mismatch detected for table '{table_name}' in database '{db_path}'.\n\
423-
The local table definition does not match the expected schema.\n\
424-
To resolve this issue, drop the existing table. A new table with the correct schema will be created automatically on the next access.",
425-
db_path = duckdb.pool.db_path(),
426-
table_name = duckdb.table_name
427-
);
428-
}
429-
430403
let dyn_pool: Arc<DynDuckDbConnectionPool> = Arc::new(read_pool);
431404

432405
if let Some(memory_limit) = options.get(DUCKDB_SETTING_MEMORY_LIMIT) {

core/src/duckdb/creator.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,7 @@ pub(crate) mod tests {
840840
Arc::clone(&table_definition),
841841
*overwrite,
842842
None,
843+
table_definition.schema(),
843844
);
844845
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
845846
let rows_written = data_sink
@@ -953,6 +954,7 @@ pub(crate) mod tests {
953954
Arc::clone(&table_definition),
954955
*overwrite,
955956
None,
957+
table_definition.schema(),
956958
);
957959
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
958960
let rows_written = data_sink

core/src/duckdb/write.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ use arrow::ffi_stream::FFI_ArrowArrayStream;
1313
use arrow::{array::RecordBatch, datatypes::SchemaRef};
1414
use arrow_schema::ArrowError;
1515
use async_trait::async_trait;
16-
use datafusion::arrow::{array::RecordBatch, datatypes::SchemaRef};
1716
use datafusion::catalog::Session;
1817
use datafusion::common::{Constraints, SchemaExt};
1918
use datafusion::logical_expr::dml::InsertOp;
@@ -178,7 +177,7 @@ impl TableProvider for DuckDBTableWriter {
178177
Arc::new(DuckDBDataSink::new(
179178
Arc::clone(&self.pool),
180179
Arc::clone(&self.table_definition),
181-
overwrite,
180+
op,
182181
self.on_conflict.clone(),
183182
self.schema(),
184183
)),
@@ -649,7 +648,7 @@ impl RecordBatchReader for RecordBatchReaderFromStream {
649648
#[cfg(test)]
650649
mod test {
651650
use arrow::array::{Int64Array, StringArray};
652-
use datafusion_physical_plan::memory::MemoryStream;
651+
use datafusion::physical_plan::memory::MemoryStream;
653652

654653
use super::*;
655654
use crate::{
@@ -672,6 +671,7 @@ mod test {
672671
Arc::clone(&table_definition),
673672
InsertOp::Overwrite,
674673
None,
674+
table_definition.schema(),
675675
);
676676
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
677677

@@ -771,6 +771,7 @@ mod test {
771771
Arc::clone(&table_definition),
772772
InsertOp::Overwrite,
773773
None,
774+
table_definition.schema(),
774775
);
775776
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
776777

@@ -876,6 +877,7 @@ mod test {
876877
Arc::clone(&table_definition),
877878
InsertOp::Overwrite,
878879
None,
880+
table_definition.schema(),
879881
);
880882
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
881883

@@ -975,6 +977,7 @@ mod test {
975977
Arc::clone(&table_definition),
976978
InsertOp::Append,
977979
None,
980+
table_definition.schema(),
978981
);
979982
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
980983

@@ -1075,6 +1078,7 @@ mod test {
10751078
Arc::clone(&table_definition),
10761079
InsertOp::Append,
10771080
None,
1081+
table_definition.schema(),
10781082
);
10791083
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
10801084

core/src/mysql/sql_table.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,8 @@ impl MySQLTable {
6969
Ok(Arc::new(MySQLSQLExec::new(
7070
projections,
7171
schema,
72-
&self.base_table.table_reference,
7372
Arc::clone(&self.pool),
74-
filters,
75-
limit,
73+
sql,
7674
)?))
7775
}
7876
}
@@ -123,10 +121,8 @@ impl MySQLSQLExec {
123121
fn new(
124122
projections: Option<&Vec<usize>>,
125123
schema: &SchemaRef,
126-
table_reference: &TableReference,
127124
pool: Arc<MySQLConnectionPool>,
128-
filters: &[Expr],
129-
limit: Option<usize>,
125+
sql: String,
130126
) -> DataFusionResult<Self> {
131127
let base_exec = SqlExec::new(projections, schema, pool, sql)?;
132128

core/tests/mysql/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use datafusion::execution::context::SessionContext;
1+
use datafusion::{datasource::memory::MemorySourceConfig, execution::context::SessionContext};
22
use datafusion_table_providers::sql::{
33
db_connection_pool::DbConnectionPool, sql_provider_datafusion::SqlTable,
44
};
55
use mysql_async::prelude::ToValue;
6-
use rstest::rstest;
6+
use rstest::{fixture, rstest};
77
use std::sync::Arc;
88

99
use arrow::{
@@ -13,6 +13,7 @@ use arrow::{
1313

1414
use datafusion_table_providers::sql::db_connection_pool::dbconnection::AsyncDbConnection;
1515

16+
use crate::arrow_record_batch_gen::*;
1617
use crate::docker::RunningContainer;
1718
use datafusion::arrow::datatypes::SchemaRef;
1819
use datafusion::catalog::TableProviderFactory;

0 commit comments

Comments
 (0)