Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions core/src/duckdb/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,28 @@ impl TableManager {
Ok(())
}

/// Creates the internal staging table without PRIMARY KEY constraints.
/// This is used for overwrite operations where we're loading data into a temporary table
/// before swapping with the main table. The incoming data may have duplicates that would
/// violate PRIMARY KEY constraints.
#[tracing::instrument(level = "debug", skip_all)]
pub fn create_table_without_constraints(
&self,
pool: Arc<DuckDbConnectionPool>,
tx: &Transaction<'_>,
) -> super::Result<()> {
let mut db_conn = pool.connect_sync().context(super::DbConnectionPoolSnafu)?;
let duckdb_conn = DuckDB::duckdb_conn(&mut db_conn)?;

let create_stmt = self.get_table_create_statement(duckdb_conn)?;
tracing::debug!("{create_stmt}");

tx.execute(&create_stmt, [])
.context(super::UnableToCreateDuckDBTableSnafu)?;

Ok(())
}

/// Drops indexes from the table, then drops the table itself.
#[tracing::instrument(level = "debug", skip_all)]
pub fn delete_table(&self, tx: &Transaction<'_>) -> super::Result<()> {
Expand Down
158 changes: 136 additions & 22 deletions core/src/duckdb/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,15 +325,19 @@ impl DataSink for DuckDBDataSink {
while let Some(batch) = data.next().await {
let batch = batch.map_err(check_and_mark_retriable_error)?;

if let Some(constraints) = self.table_definition.constraints() {
constraints::validate_batch_with_constraints(
vec![batch.clone()],
constraints,
&crate::util::constraints::UpsertOptions::default(),
)
.await
.context(super::ConstraintViolationSnafu)
.map_err(to_datafusion_error)?;
// Skip constraint validation for Overwrite operations since we're replacing all data
// and uniqueness constraints don't apply to the incoming data in isolation.
if self.overwrite != InsertOp::Overwrite {
if let Some(constraints) = self.table_definition.constraints() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

constraints should still be applied on the incoming data if the final table will have constraints, otherwise you could end up with a table that has violated its constraints.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the last write wins should still allow the validation to proceed if that is configured.

constraints::validate_batch_with_constraints(
vec![batch.clone()],
constraints,
&crate::util::constraints::UpsertOptions::default(),
)
.await
.context(super::ConstraintViolationSnafu)
.map_err(to_datafusion_error)?;
}
}

if let Err(send_error) = batch_tx.send(batch).await {
Expand Down Expand Up @@ -571,7 +575,7 @@ fn insert_overwrite(
.map_err(to_retriable_data_write_error)?;

new_table
.create_table(cloned_pool, &tx)
.create_table_without_constraints(cloned_pool, &tx)
.map_err(to_retriable_data_write_error)?;

let existing_tables = new_table
Expand Down Expand Up @@ -618,22 +622,15 @@ fn insert_overwrite(
));
}

// Note: We skip primary key verification for insert_overwrite because
// the internal staging table is intentionally created without constraints
// to allow loading data with potential duplicates.
if !should_apply_indexes {
// compare indexes and primary keys
let primary_keys_match = new_table
.verify_primary_keys_match(last_table, &tx)
.map_err(to_retriable_data_write_error)?;
// Only verify indexes match, skip primary key verification
let indexes_match = new_table
.verify_indexes_match(last_table, &tx)
.map_err(to_retriable_data_write_error)?;

if !primary_keys_match {
return Err(DataFusionError::Execution(
"Primary keys do not match between the new table and the existing table.\nEnsure primary key configuration is the same as the existing table, or manually migrate the table."
.to_string(),
));
}

if !indexes_match {
return Err(DataFusionError::Execution(
"Indexes do not match between the new table and the existing table.\nEnsure index configuration is the same as the existing table, or manually migrate the table.".to_string(),
Expand Down Expand Up @@ -792,7 +789,10 @@ mod test {
use super::*;
use crate::{
duckdb::creator::tests::{get_basic_table_definition, get_mem_duckdb, init_tracing},
util::{column_reference::ColumnReference, indexes::IndexType},
util::{
column_reference::ColumnReference, constraints::tests::get_pk_constraints,
indexes::IndexType,
},
};

#[tokio::test]
Expand Down Expand Up @@ -1390,4 +1390,118 @@ mod test {

tx.rollback().expect("to rollback");
}

#[tokio::test]
async fn test_overwrite_skips_pk_constraint_validation_with_duplicate_pks() {
// Test scenario: Overwrite a table whose definition has a PRIMARY KEY constraint,
// using incoming data that contains duplicate primary-key values.
// Expected behavior: Overwrite succeeds because constraint validation is skipped
// for Overwrite (and the staging table is created without constraints).
let _guard = init_tracing(None);
let pool = get_mem_duckdb();

let schema = Arc::new(arrow::datatypes::Schema::new(vec![
arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int64, false),
arrow::datatypes::Field::new("name", arrow::datatypes::DataType::Utf8, false),
]));
let table_definition = Arc::new(
TableDefinition::new(RelationName::new("test_table"), Arc::clone(&schema))
.with_constraints(get_pk_constraints(&["id"], Arc::clone(&schema))),
);

let duckdb_sink = DuckDBDataSink::new(
Arc::clone(&pool),
Arc::clone(&table_definition),
InsertOp::Overwrite,
None,
Arc::clone(&schema),
);
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);

// Duplicate primary keys in incoming batch.
let batches = vec![RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int64Array::from(vec![Some(1), Some(1), Some(2)])),
Arc::new(StringArray::from(vec![Some("a"), Some("b"), Some("c")])),
],
)
.expect("should create a record batch")];

let stream = Box::pin(
MemoryStream::try_new(batches, Arc::clone(&schema), None).expect("to get stream"),
);

data_sink
.write_all(stream, &Arc::new(TaskContext::default()))
.await
.expect("overwrite should succeed despite duplicate primary keys");

let mut conn = pool.connect_sync().expect("to connect");
let duckdb = DuckDB::duckdb_conn(&mut conn).expect("to get duckdb conn");
let tx = duckdb.conn.transaction().expect("to begin transaction");

let view_rows = tx
.query_row(
&format!(
"SELECT COUNT(1) FROM {view_name}",
view_name = table_definition.name()
),
[],
|row| row.get::<_, i64>(0),
)
.expect("to get count");
assert_eq!(view_rows, 3);

tx.rollback().expect("to rollback");
}

#[tokio::test]
async fn test_append_still_enforces_pk_constraint_validation() {
// Test scenario: Append mode should still enforce constraint validation.
// Expected behavior: Appending a batch with duplicate primary keys fails
// (regression guard ensuring the Overwrite-only skip didn't regress Append).
let _guard = init_tracing(None);
let pool = get_mem_duckdb();

let schema = Arc::new(arrow::datatypes::Schema::new(vec![
arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int64, false),
arrow::datatypes::Field::new("name", arrow::datatypes::DataType::Utf8, false),
]));
let table_definition = Arc::new(
TableDefinition::new(RelationName::new("test_table"), Arc::clone(&schema))
.with_constraints(get_pk_constraints(&["id"], Arc::clone(&schema))),
);

let duckdb_sink = DuckDBDataSink::new(
Arc::clone(&pool),
Arc::clone(&table_definition),
InsertOp::Append,
None,
Arc::clone(&schema),
);
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);

let batches = vec![RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int64Array::from(vec![Some(1), Some(1)])),
Arc::new(StringArray::from(vec![Some("a"), Some("b")])),
],
)
.expect("should create a record batch")];

let stream = Box::pin(
MemoryStream::try_new(batches, Arc::clone(&schema), None).expect("to get stream"),
);

let result = data_sink
.write_all(stream, &Arc::new(TaskContext::default()))
.await;

assert!(
result.is_err(),
"Append with duplicate primary keys should fail constraint validation"
);
}
}
22 changes: 19 additions & 3 deletions core/src/sql/arrow_sql_gen/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1040,33 +1040,45 @@ impl<'a> InsertBuilder<'a> {
///
/// Returns an error if any `RecordBatch` fails to convert into a valid postgres insert statement.
pub fn build_postgres(self, on_conflict: Option<OnConflict>) -> Result<String> {
self.build(PostgresQueryBuilder, on_conflict)
self.build(PostgresQueryBuilder, on_conflict, false)
}

///
/// # Errors
///
/// Returns an error if any `RecordBatch` fails to convert into a valid sqlite insert statement.
pub fn build_sqlite(self, on_conflict: Option<OnConflict>) -> Result<String> {
self.build(SqliteQueryBuilder, on_conflict)
self.build(SqliteQueryBuilder, on_conflict, false)
}

///
/// # Errors
///
/// Returns an error if any `RecordBatch` fails to convert into a valid sqlite REPLACE statement.
/// Uses SQLite's REPLACE INTO syntax which deletes existing rows with conflicting keys.
pub fn build_sqlite_replace(self) -> Result<String> {
self.build(SqliteQueryBuilder, None, true)
}

///
/// # Errors
///
/// Returns an error if any `RecordBatch` fails to convert into a valid `MySQL` insert statement.
pub fn build_mysql(self, on_conflict: Option<OnConflict>) -> Result<String> {
self.build(MysqlQueryBuilder, on_conflict)
self.build(MysqlQueryBuilder, on_conflict, false)
}

/// # Errors
///
/// Returns an error if any `RecordBatch` fails to convert into a valid insert statement. Upon
/// error, no further `RecordBatch` is processed.
///
/// If `replace` is true, uses REPLACE INTO syntax (SQLite/MySQL) instead of INSERT INTO.
pub fn build<T: GenericBuilder + 'static>(
&self,
query_builder: T,
on_conflict: Option<OnConflict>,
replace: bool,
) -> Result<String> {
let columns: Vec<Alias> = (self.record_batches[0])
.schema()
Expand All @@ -1080,6 +1092,10 @@ impl<'a> InsertBuilder<'a> {
.columns(columns)
.to_owned();

if replace {
insert_stmt.replace();
}

for record_batch in self.record_batches {
self.construct_insert_stmt(&mut insert_stmt, record_batch, &query_builder)?;
}
Expand Down
45 changes: 37 additions & 8 deletions core/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use datafusion::{
common::Constraints,
datasource::TableProvider,
error::{DataFusionError, Result as DataFusionResult},
logical_expr::CreateExternalTable,
logical_expr::{dml::InsertOp, CreateExternalTable},
sql::TableReference,
};
use futures::TryStreamExt;
Expand Down Expand Up @@ -740,16 +740,32 @@ impl Sqlite {
transaction: &Transaction<'_>,
batch: RecordBatch,
on_conflict: Option<&OnConflict>,
insert_op: InsertOp,
) -> rusqlite::Result<()> {
let batches = vec![batch];
let insert_table_builder = InsertBuilder::new(&self.table, &batches);

let sea_query_on_conflict =
on_conflict.map(|oc| oc.build_sea_query_on_conflict(&self.schema));

let sql = insert_table_builder
.build_sqlite(sea_query_on_conflict)
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(e.into()))?;
// Validate supported insert operations
let sql = match insert_op {
InsertOp::Overwrite => {
// Use REPLACE INTO for overwrite mode
insert_table_builder
.build_sqlite_replace()
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(e.into()))?
}
InsertOp::Append => {
let sea_query_on_conflict =
on_conflict.map(|oc| oc.build_sea_query_on_conflict(&self.schema));
insert_table_builder
.build_sqlite(sea_query_on_conflict)
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(e.into()))?
}
_ => {
return Err(rusqlite::Error::ToSqlConversionFailure(
format!("Unsupported insert operation: {insert_op:?}").into(),
));
}
};

transaction.execute(&sql, [])?;

Expand Down Expand Up @@ -786,6 +802,7 @@ impl Sqlite {
transaction: &Transaction<'_>,
batch: RecordBatch,
on_conflict: Option<&OnConflict>,
insert_op: InsertOp,
) -> rusqlite::Result<()> {
use arrow::array::*;
use arrow::datatypes::DataType;
Expand All @@ -794,6 +811,17 @@ impl Sqlite {
return Ok(());
}

// Validate supported insert operations
let insert_keyword = match insert_op {
InsertOp::Overwrite => "REPLACE INTO",
InsertOp::Append => "INSERT INTO",
_ => {
return Err(rusqlite::Error::ToSqlConversionFailure(
format!("Unsupported insert operation: {insert_op:?}").into(),
));
}
};

// Build the prepared statement SQL
let schema = batch.schema();
let column_names: Vec<String> = schema
Expand All @@ -808,7 +836,8 @@ impl Sqlite {
.collect();

let mut sql = format!(
"INSERT INTO {} ({}) VALUES ({})",
"{} {} ({}) VALUES ({})",
insert_keyword,
self.table.to_quoted_string(),
column_names.join(", "),
placeholders.join(", ")
Expand Down
Loading
Loading