diff --git a/src/duckdb.rs b/src/duckdb.rs index 90a34d6..3f83353 100644 --- a/src/duckdb.rs +++ b/src/duckdb.rs @@ -460,6 +460,12 @@ impl DuckDB { .context(DbConnectionSnafu) } + pub fn connect_sync_direct(self: Arc) -> Result { + Arc::clone(&self.pool) + .connect_sync_direct() + .context(DbConnectionSnafu) + } + pub fn duckdb_conn( db_connection: &mut Box< dyn DbConnection, DuckDBParameter>, @@ -616,12 +622,12 @@ impl DuckDB { ); } if !extra_in_actual.is_empty() { - tracing::warn!( - "Unexpected index(es) detected in table '{name}': {}.\n\ + tracing::warn!( + "Unexpected index(es) detected in table '{name}': {}.\n\ These indexes are not defined in the configuration.", - extra_in_actual.iter().join(", "), - name = self.table_name -); + extra_in_actual.iter().join(", "), + name = self.table_name + ); } Ok(missing_in_actual.is_empty() && extra_in_actual.is_empty()) diff --git a/src/duckdb/write.rs b/src/duckdb/write.rs index 30d8c10..891518b 100644 --- a/src/duckdb/write.rs +++ b/src/duckdb/write.rs @@ -1,6 +1,7 @@ use std::{any::Any, fmt, sync::Arc}; use crate::duckdb::DuckDB; +use crate::sql::db_connection_pool::dbconnection::duckdbconn::DuckDbConnection; use crate::util::{ constraints, on_conflict::OnConflict, @@ -22,7 +23,7 @@ use datafusion::{ DisplayAs, DisplayFormatType, ExecutionPlan, }, }; -use duckdb::Transaction; +use duckdb::{Error as DuckDBError, Transaction}; use futures::StreamExt; use snafu::prelude::*; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -30,6 +31,53 @@ use tokio::task::JoinHandle; use super::to_datafusion_error; +/// A transaction manager that ensures only a single transaction is active at a time +/// for a given DuckDB connection +pub struct DuckDbTransactionManager { + /// The DuckDB connection + conn: DuckDbConnection, + /// The currently active transaction, if any + transaction: Option>, +} + +impl<'a> DuckDbTransactionManager { + /// Create a new connection manager with the given connection + pub fn new(conn: DuckDbConnection) -> Self { + Self { + conn, + transaction: None, + } + } + + /// Begin a new transaction if one doesn't already exist + pub fn begin(&mut self) -> Result<(), DuckDBError> { + if self.transaction.is_none() { + let tx = self.conn.conn.transaction()?; + // SAFETY: The transaction is tied to the lifetime of the connection - because Rust + // doesn't support self-referential structs, we need to transmute the transaction + // to a static lifetime. We never give out a reference to the transaction with the static lifetime, + // so it's safe to transmute. + self.transaction = Some(unsafe { + std::mem::transmute::, duckdb::Transaction<'static>>(tx) + }); + } + Ok(()) + } + + /// Commit the current transaction if one exists and return success/failure + pub fn commit(&mut self) -> Result<(), DuckDBError> { + if let Some(tx) = self.transaction.take() { + tx.commit()?; + } + Ok(()) + } + + /// Execute a database operation with the current transaction + pub fn tx(&'a self) -> Option<&'a Transaction<'a>> { + self.transaction.as_ref() + } +} + #[derive(Debug, Clone)] pub struct DuckDBTableWriter { pub read_provider: Arc, @@ -141,24 +189,22 @@ impl DataSink for DuckDBDataSink { let duckdb_write_handle: JoinHandle> = tokio::task::spawn_blocking(move || { - let mut db_conn = duckdb - .connect_sync() + let db_conn = Arc::clone(&duckdb) + .connect_sync_direct() .map_err(to_retriable_data_write_error)?; - let duckdb_conn = - DuckDB::duckdb_conn(&mut db_conn).map_err(to_retriable_data_write_error)?; + let mut tx_manager = DuckDbTransactionManager::new(db_conn); - let tx = duckdb_conn - .conn - .transaction() + tx_manager + .begin() .context(super::UnableToBeginTransactionSnafu) .map_err(to_datafusion_error)?; - let num_rows = match **duckdb.constraints() { - [] => try_write_all_no_constraints(duckdb, &tx, batch_rx, overwrite)?, + let (num_rows, mut tx_manager) = match **duckdb.constraints() { + [] => try_write_all_no_constraints(duckdb, tx_manager, batch_rx, overwrite)?, _ => try_write_all_with_constraints( duckdb, - &tx, + tx_manager, batch_rx, overwrite, on_conflict, @@ -169,9 +215,11 @@ impl DataSink for DuckDBDataSink { .try_recv() .map_err(to_retriable_data_write_error)?; - tx.commit() + tx_manager + .commit() .context(super::UnableToCommitTransactionSnafu) .map_err(to_retriable_data_write_error)?; + drop(tx_manager); Ok(num_rows) }); @@ -256,11 +304,11 @@ impl DisplayAs for DuckDBDataSink { /// See: fn try_write_all_with_constraints( duckdb: Arc, - tx: &Transaction<'_>, + mut tx_manager: DuckDbTransactionManager, mut data_batches: Receiver, overwrite: InsertOp, on_conflict: Option, -) -> datafusion::common::Result { +) -> datafusion::common::Result<(u64, DuckDbTransactionManager)> { // We want to clone the current table into our insert table let Some(ref orig_table_creator) = duckdb.table_creator else { return Err(DataFusionError::Execution( @@ -271,67 +319,136 @@ fn try_write_all_with_constraints( let mut num_rows = 0; let mut insert_table = orig_table_creator - .create_empty_clone(tx) + .create_empty_clone(tx_manager.tx().unwrap()) .map_err(to_datafusion_error)?; let Some(insert_table_creator) = insert_table.table_creator.take() else { unreachable!() }; + // Auto-commit after processing this many rows + const MAX_ROWS_PER_COMMIT: usize = 10_000_000; + let mut rows_since_last_commit = 0; + while let Some(batch) = data_batches.blocking_recv() { num_rows += u64::try_from(batch.num_rows()).map_err(|e| { DataFusionError::Execution(format!("Unable to convert num_rows() to u64: {e}")) })?; + rows_since_last_commit += batch.num_rows(); + + if rows_since_last_commit > MAX_ROWS_PER_COMMIT { + tracing::info!("Committing DuckDB transaction after {rows_since_last_commit} rows.",); + + // Commit the current transaction + tx_manager + .commit() + .context(super::UnableToCommitTransactionSnafu) + .map_err(to_datafusion_error)?; + + // Create a new transaction + tx_manager + .begin() + .context(super::UnableToBeginTransactionSnafu) + .map_err(to_datafusion_error)?; + + rows_since_last_commit = 0; + } + tracing::debug!("Inserting {} rows into cloned table.", batch.num_rows()); insert_table - .insert_batch_no_constraints(tx, &batch) + .insert_batch_no_constraints(tx_manager.tx().unwrap(), &batch) .map_err(to_datafusion_error)?; } if matches!(overwrite, InsertOp::Overwrite) { insert_table_creator - .replace_table(tx, orig_table_creator) + .replace_table(tx_manager.tx().unwrap(), orig_table_creator) .map_err(to_datafusion_error)?; } else { insert_table - .insert_table_into(tx, &duckdb, on_conflict.as_ref()) + .insert_table_into(tx_manager.tx().unwrap(), &duckdb, on_conflict.as_ref()) .map_err(to_datafusion_error)?; insert_table_creator - .delete_table(tx) + .delete_table(tx_manager.tx().unwrap()) .map_err(to_datafusion_error)?; } - Ok(num_rows) + Ok((num_rows, tx_manager)) } -/// If there are no constraints on the `DuckDB` table, we can do a simple single transaction write. +/// Even if there are no constraints on the `DuckDB` table, we use the temp table approach +/// to be consistent with the constrained case and to help with performance. fn try_write_all_no_constraints( duckdb: Arc, - tx: &Transaction<'_>, + mut tx_manager: DuckDbTransactionManager, mut data_batches: Receiver, overwrite: InsertOp, -) -> datafusion::common::Result { +) -> datafusion::common::Result<(u64, DuckDbTransactionManager)> { + // We want to clone the current table into our insert table + let Some(ref orig_table_creator) = duckdb.table_creator else { + return Err(DataFusionError::Execution( + "Expected table to have a table creator".to_string(), + )); + }; + let mut num_rows = 0; - if matches!(overwrite, InsertOp::Overwrite) { - tracing::debug!("Deleting all data from table."); - duckdb - .delete_all_table_data(tx) - .map_err(to_datafusion_error)?; - } + let mut insert_table = orig_table_creator + .create_empty_clone(tx_manager.tx().unwrap()) + .map_err(to_datafusion_error)?; + + let Some(insert_table_creator) = insert_table.table_creator.take() else { + unreachable!() + }; + + // Auto-commit after processing this many rows + const MAX_ROWS_PER_COMMIT: usize = 10_000_000; + let mut rows_since_last_commit = 0; while let Some(batch) = data_batches.blocking_recv() { num_rows += u64::try_from(batch.num_rows()).map_err(|e| { DataFusionError::Execution(format!("Unable to convert num_rows() to u64: {e}")) })?; - tracing::debug!("Inserting {} rows into table.", batch.num_rows()); + rows_since_last_commit += batch.num_rows(); + + if rows_since_last_commit > MAX_ROWS_PER_COMMIT { + tracing::info!("Committing DuckDB transaction after {rows_since_last_commit} rows.",); + + // Commit the current transaction + tx_manager + .commit() + .context(super::UnableToCommitTransactionSnafu) + .map_err(to_datafusion_error)?; + + // Create a new transaction + tx_manager + .begin() + .context(super::UnableToBeginTransactionSnafu) + .map_err(to_datafusion_error)?; - duckdb - .insert_batch_no_constraints(tx, &batch) + rows_since_last_commit = 0; + } + + tracing::debug!("Inserting {} rows into cloned table.", batch.num_rows()); + insert_table + .insert_batch_no_constraints(tx_manager.tx().unwrap(), &batch) + .map_err(to_datafusion_error)?; + } + + if matches!(overwrite, InsertOp::Overwrite) { + insert_table_creator + .replace_table(tx_manager.tx().unwrap(), orig_table_creator) + .map_err(to_datafusion_error)?; + } else { + insert_table + .insert_table_into(tx_manager.tx().unwrap(), &duckdb, None) + .map_err(to_datafusion_error)?; + insert_table_creator + .delete_table(tx_manager.tx().unwrap()) .map_err(to_datafusion_error)?; } - Ok(num_rows) + Ok((num_rows, tx_manager)) } diff --git a/src/sql/db_connection_pool/duckdbpool.rs b/src/sql/db_connection_pool/duckdbpool.rs index 24cce62..23b6340 100644 --- a/src/sql/db_connection_pool/duckdbpool.rs +++ b/src/sql/db_connection_pool/duckdbpool.rs @@ -54,7 +54,6 @@ impl std::fmt::Debug for DuckDbConnectionPool { } impl DuckDbConnectionPool { - /// Get the dataset path. Returns `:memory:` if the in memory database is used. pub fn db_path(&self) -> &str { self.path.as_ref() @@ -165,17 +164,19 @@ impl DuckDbConnectionPool { ) -> Result< Box, DuckDBParameter>>, > { + Ok(Box::new(self.connect_sync_direct()?)) + } + + pub fn connect_sync_direct(self: Arc) -> Result { let pool = Arc::clone(&self.pool); let conn: r2d2::PooledConnection = pool.get().context(ConnectionPoolSnafu)?; let attachments = self.get_attachments()?; - Ok(Box::new( - DuckDbConnection::new(conn) - .with_attachments(attachments) - .with_unsupported_type_action(self.unsupported_type_action), - )) + Ok(DuckDbConnection::new(conn) + .with_attachments(attachments) + .with_unsupported_type_action(self.unsupported_type_action)) } #[must_use]