From 50c38219252f88a3f0f444ad0b6b9c8f95257062 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Thu, 20 Mar 2025 09:38:12 +0900 Subject: [PATCH 1/2] Add an appender to the DuckDbAppendManager to live for the duration of the transaction. --- src/duckdb.rs | 15 ++-- src/duckdb/write.rs | 199 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 167 insertions(+), 47 deletions(-) diff --git a/src/duckdb.rs b/src/duckdb.rs index 3f83353..f13bee0 100644 --- a/src/duckdb.rs +++ b/src/duckdb.rs @@ -32,7 +32,7 @@ use datafusion::{ logical_expr::CreateExternalTable, sql::TableReference, }; -use duckdb::{AccessMode, DuckdbConnectionManager, Transaction}; +use duckdb::{AccessMode, Appender, DuckdbConnectionManager, Transaction}; use itertools::Itertools; use snafu::prelude::*; use std::collections::HashSet; @@ -102,6 +102,9 @@ pub enum Error { #[snafu(display("Unable to begin duckdb transaction: {source}"))] UnableToBeginTransaction { source: duckdb::Error }, + #[snafu(display("Unable to flush appender: {source}"))] + UnableToFlushAppender { source: duckdb::Error }, + #[snafu(display("Unable to rollback transaction: {source}"))] UnableToRollbackTransaction { source: duckdb::Error }, @@ -515,21 +518,15 @@ impl DuckDB { fn insert_batch_no_constraints( &self, - transaction: &Transaction<'_>, + appender: &mut Appender<'_>, batch: &RecordBatch, ) -> Result<()> { - let mut appender = transaction - .appender(&self.table_name) - .context(UnableToGetAppenderToDuckDBTableSnafu)?; - for batch in Self::split_batch(batch) { appender .append_record_batch(batch.clone()) .context(UnableToInsertToDuckDBTableSnafu)?; } - appender.flush().context(UnableToInsertToDuckDBTableSnafu)?; - Ok(()) } @@ -791,7 +788,7 @@ pub(crate) mod tests { let ctx = SessionContext::new(); let cmd = CreateExternalTable { schema: Arc::new(schema.to_dfschema().expect("to df schema")), - name: table_name.into(), + name: table_name, location: "".to_string(), file_type: "".to_string(), table_partition_cols: vec![], diff --git a/src/duckdb/write.rs b/src/duckdb/write.rs index 891518b..8b5ceb2 100644 --- a/src/duckdb/write.rs +++ b/src/duckdb/write.rs @@ -23,7 +23,7 @@ use datafusion::{ DisplayAs, DisplayFormatType, ExecutionPlan, }, }; -use duckdb::{Error as DuckDBError, Transaction}; +use duckdb::{Appender, Error as DuckDBError, Transaction}; use futures::StreamExt; use snafu::prelude::*; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -31,35 +31,45 @@ use tokio::task::JoinHandle; use super::to_datafusion_error; -/// A transaction manager that ensures only a single transaction is active at a time +/// An append manager that ensures only a single appender and transaction is active at a time /// for a given DuckDB connection -pub struct DuckDbTransactionManager { +pub struct DuckDbAppendManager { /// The DuckDB connection conn: DuckDbConnection, /// The currently active transaction, if any transaction: Option>, + /// The currently active appender, if any + appender: Option>, } -impl<'a> DuckDbTransactionManager { +impl DuckDbAppendManager { /// Create a new connection manager with the given connection pub fn new(conn: DuckDbConnection) -> Self { Self { conn, transaction: None, + appender: None, } } /// Begin a new transaction if one doesn't already exist - pub fn begin(&mut self) -> Result<(), DuckDBError> { + pub fn begin(&mut self, table_name: &str) -> Result<(), DuckDBError> { if self.transaction.is_none() { let tx = self.conn.conn.transaction()?; // SAFETY: The transaction is tied to the lifetime of the connection - because Rust // doesn't support self-referential structs, we need to transmute the transaction // to a static lifetime. We never give out a reference to the transaction with the static lifetime, // so it's safe to transmute. - self.transaction = Some(unsafe { + let static_tx = unsafe { std::mem::transmute::, duckdb::Transaction<'static>>(tx) + }; + // SAFETY: The appender is tied to the lifetime of the transaction. + self.appender = Some(unsafe { + std::mem::transmute::, duckdb::Appender<'static>>( + static_tx.appender(table_name)?, + ) }); + self.transaction = Some(static_tx); } Ok(()) } @@ -73,9 +83,27 @@ impl<'a> DuckDbTransactionManager { } /// Execute a database operation with the current transaction - pub fn tx(&'a self) -> Option<&'a Transaction<'a>> { + pub fn tx(&self) -> Option<&Transaction<'_>> { self.transaction.as_ref() } + + pub fn appender(&self) -> Option<&Appender<'_>> { + self.appender.as_ref() + } + + #[allow(clippy::needless_lifetimes)] + pub fn appender_mut<'a>(&'a mut self) -> Option<&'a mut Appender<'a>> { + self.appender.as_mut().map(|appender| unsafe { + std::mem::transmute::<&mut duckdb::Appender<'_>, &mut duckdb::Appender<'a>>(appender) + }) + } + + pub fn appender_flush(&mut self) -> Result<(), DuckDBError> { + if let Some(appender) = self.appender_mut() { + appender.flush()?; + } + Ok(()) + } } #[derive(Debug, Clone)] @@ -193,18 +221,20 @@ impl DataSink for DuckDBDataSink { .connect_sync_direct() .map_err(to_retriable_data_write_error)?; - let mut tx_manager = DuckDbTransactionManager::new(db_conn); + let mut append_manager = DuckDbAppendManager::new(db_conn); - tx_manager - .begin() + append_manager + .begin(duckdb.table_name()) .context(super::UnableToBeginTransactionSnafu) .map_err(to_datafusion_error)?; - let (num_rows, mut tx_manager) = match **duckdb.constraints() { - [] => try_write_all_no_constraints(duckdb, tx_manager, batch_rx, overwrite)?, + let (num_rows, mut append_manager) = match **duckdb.constraints() { + [] => { + try_write_all_no_constraints(duckdb, append_manager, batch_rx, overwrite)? + } _ => try_write_all_with_constraints( duckdb, - tx_manager, + append_manager, batch_rx, overwrite, on_conflict, @@ -215,11 +245,16 @@ impl DataSink for DuckDBDataSink { .try_recv() .map_err(to_retriable_data_write_error)?; - tx_manager + append_manager + .appender_flush() + .context(super::UnableToFlushAppenderSnafu) + .map_err(to_retriable_data_write_error)?; + + append_manager .commit() .context(super::UnableToCommitTransactionSnafu) .map_err(to_retriable_data_write_error)?; - drop(tx_manager); + drop(append_manager); Ok(num_rows) }); @@ -304,11 +339,67 @@ impl DisplayAs for DuckDBDataSink { /// See: fn try_write_all_with_constraints( duckdb: Arc, - mut tx_manager: DuckDbTransactionManager, + mut append_manager: DuckDbAppendManager, + mut data_batches: Receiver, + overwrite: InsertOp, + on_conflict: Option, +) -> datafusion::common::Result<(u64, DuckDbAppendManager)> { + // We want to clone the current table into our insert table + let Some(ref orig_table_creator) = duckdb.table_creator else { + return Err(DataFusionError::Execution( + "Expected table with constraints to have a table creator".to_string(), + )); + }; + + let mut num_rows = 0; + + let mut insert_table = orig_table_creator + .create_empty_clone(append_manager.tx().unwrap()) + .map_err(to_datafusion_error)?; + + let Some(insert_table_creator) = insert_table.table_creator.take() else { + unreachable!() + }; + + while let Some(batch) = data_batches.blocking_recv() { + num_rows += u64::try_from(batch.num_rows()).map_err(|e| { + DataFusionError::Execution(format!("Unable to convert num_rows() to u64: {e}")) + })?; + + tracing::debug!("Inserting {} rows into cloned table.", batch.num_rows()); + insert_table + .insert_batch_no_constraints(append_manager.appender_mut().unwrap(), &batch) + .map_err(to_datafusion_error)?; + } + + if matches!(overwrite, InsertOp::Overwrite) { + insert_table_creator + .replace_table(append_manager.tx().unwrap(), orig_table_creator) + .map_err(to_datafusion_error)?; + } else { + insert_table + .insert_table_into(append_manager.tx().unwrap(), &duckdb, on_conflict.as_ref()) + .map_err(to_datafusion_error)?; + insert_table_creator + .delete_table(append_manager.tx().unwrap()) + .map_err(to_datafusion_error)?; + } + + Ok((num_rows, append_manager)) +} + +/// If there are constraints on the `DuckDB` table, we need to create an empty copy of the target table, write to that table copy and then depending on +/// if the mode is overwrite or not, insert into the target table or drop the target table and rename the current table. +/// +/// See: +#[allow(dead_code)] +fn try_write_all_with_constraints_phased_commit( + duckdb: Arc, + mut append_manager: DuckDbAppendManager, mut data_batches: Receiver, overwrite: InsertOp, on_conflict: Option, -) -> datafusion::common::Result<(u64, DuckDbTransactionManager)> { +) -> datafusion::common::Result<(u64, DuckDbAppendManager)> { // We want to clone the current table into our insert table let Some(ref orig_table_creator) = duckdb.table_creator else { return Err(DataFusionError::Execution( @@ -319,7 +410,7 @@ fn try_write_all_with_constraints( let mut num_rows = 0; let mut insert_table = orig_table_creator - .create_empty_clone(tx_manager.tx().unwrap()) + .create_empty_clone(append_manager.tx().unwrap()) .map_err(to_datafusion_error)?; let Some(insert_table_creator) = insert_table.table_creator.take() else { @@ -341,14 +432,14 @@ fn try_write_all_with_constraints( tracing::info!("Committing DuckDB transaction after {rows_since_last_commit} rows.",); // Commit the current transaction - tx_manager + append_manager .commit() .context(super::UnableToCommitTransactionSnafu) .map_err(to_datafusion_error)?; // Create a new transaction - tx_manager - .begin() + append_manager + .begin(duckdb.table_name()) .context(super::UnableToBeginTransactionSnafu) .map_err(to_datafusion_error)?; @@ -357,34 +448,66 @@ fn try_write_all_with_constraints( tracing::debug!("Inserting {} rows into cloned table.", batch.num_rows()); insert_table - .insert_batch_no_constraints(tx_manager.tx().unwrap(), &batch) + .insert_batch_no_constraints(append_manager.appender_mut().unwrap(), &batch) .map_err(to_datafusion_error)?; } if matches!(overwrite, InsertOp::Overwrite) { insert_table_creator - .replace_table(tx_manager.tx().unwrap(), orig_table_creator) + .replace_table(append_manager.tx().unwrap(), orig_table_creator) .map_err(to_datafusion_error)?; } else { insert_table - .insert_table_into(tx_manager.tx().unwrap(), &duckdb, on_conflict.as_ref()) + .insert_table_into(append_manager.tx().unwrap(), &duckdb, on_conflict.as_ref()) .map_err(to_datafusion_error)?; insert_table_creator - .delete_table(tx_manager.tx().unwrap()) + .delete_table(append_manager.tx().unwrap()) + .map_err(to_datafusion_error)?; + } + + Ok((num_rows, append_manager)) +} + +/// If there are no constraints on the `DuckDB` table, we can do a simple single transaction write. +fn try_write_all_no_constraints( + duckdb: Arc, + mut append_manager: DuckDbAppendManager, + mut data_batches: Receiver, + overwrite: InsertOp, +) -> datafusion::common::Result<(u64, DuckDbAppendManager)> { + let mut num_rows = 0; + + if matches!(overwrite, InsertOp::Overwrite) { + tracing::debug!("Deleting all data from table."); + duckdb + .delete_all_table_data(append_manager.tx().unwrap()) .map_err(to_datafusion_error)?; } - Ok((num_rows, tx_manager)) + while let Some(batch) = data_batches.blocking_recv() { + num_rows += u64::try_from(batch.num_rows()).map_err(|e| { + DataFusionError::Execution(format!("Unable to convert num_rows() to u64: {e}")) + })?; + + tracing::debug!("Inserting {} rows into table.", batch.num_rows()); + + duckdb + .insert_batch_no_constraints(append_manager.appender_mut().unwrap(), &batch) + .map_err(to_datafusion_error)?; + } + + Ok((num_rows, append_manager)) } /// Even if there are no constraints on the `DuckDB` table, we use the temp table approach /// to be consistent with the constrained case and to help with performance. -fn try_write_all_no_constraints( +#[allow(dead_code)] +fn try_write_all_no_constraints_phased_commit( duckdb: Arc, - mut tx_manager: DuckDbTransactionManager, + mut append_manager: DuckDbAppendManager, mut data_batches: Receiver, overwrite: InsertOp, -) -> datafusion::common::Result<(u64, DuckDbTransactionManager)> { +) -> datafusion::common::Result<(u64, DuckDbAppendManager)> { // We want to clone the current table into our insert table let Some(ref orig_table_creator) = duckdb.table_creator else { return Err(DataFusionError::Execution( @@ -395,7 +518,7 @@ fn try_write_all_no_constraints( let mut num_rows = 0; let mut insert_table = orig_table_creator - .create_empty_clone(tx_manager.tx().unwrap()) + .create_empty_clone(append_manager.tx().unwrap()) .map_err(to_datafusion_error)?; let Some(insert_table_creator) = insert_table.table_creator.take() else { @@ -417,14 +540,14 @@ fn try_write_all_no_constraints( tracing::info!("Committing DuckDB transaction after {rows_since_last_commit} rows.",); // Commit the current transaction - tx_manager + append_manager .commit() .context(super::UnableToCommitTransactionSnafu) .map_err(to_datafusion_error)?; // Create a new transaction - tx_manager - .begin() + append_manager + .begin(duckdb.table_name()) .context(super::UnableToBeginTransactionSnafu) .map_err(to_datafusion_error)?; @@ -433,22 +556,22 @@ fn try_write_all_no_constraints( tracing::debug!("Inserting {} rows into cloned table.", batch.num_rows()); insert_table - .insert_batch_no_constraints(tx_manager.tx().unwrap(), &batch) + .insert_batch_no_constraints(append_manager.appender_mut().unwrap(), &batch) .map_err(to_datafusion_error)?; } if matches!(overwrite, InsertOp::Overwrite) { insert_table_creator - .replace_table(tx_manager.tx().unwrap(), orig_table_creator) + .replace_table(append_manager.tx().unwrap(), orig_table_creator) .map_err(to_datafusion_error)?; } else { insert_table - .insert_table_into(tx_manager.tx().unwrap(), &duckdb, None) + .insert_table_into(append_manager.tx().unwrap(), &duckdb, None) .map_err(to_datafusion_error)?; insert_table_creator - .delete_table(tx_manager.tx().unwrap()) + .delete_table(append_manager.tx().unwrap()) .map_err(to_datafusion_error)?; } - Ok((num_rows, tx_manager)) + Ok((num_rows, append_manager)) } From c1b519d4635c4c4245218d5dae1dad15059a577c Mon Sep 17 00:00:00 2001 From: peasee <98815791+peasee@users.noreply.github.com> Date: Thu, 20 Mar 2025 14:14:38 +0900 Subject: [PATCH 2/2] wip: Flush before table alterations --- src/duckdb/write.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/duckdb/write.rs b/src/duckdb/write.rs index 8b5ceb2..da40d0d 100644 --- a/src/duckdb/write.rs +++ b/src/duckdb/write.rs @@ -245,10 +245,11 @@ impl DataSink for DuckDBDataSink { .try_recv() .map_err(to_retriable_data_write_error)?; - append_manager - .appender_flush() - .context(super::UnableToFlushAppenderSnafu) - .map_err(to_retriable_data_write_error)?; + // flush in the respective write functions + // append_manager + // .appender_flush() + // .context(super::UnableToFlushAppenderSnafu) + // .map_err(to_retriable_data_write_error)?; append_manager .commit() @@ -372,6 +373,8 @@ fn try_write_all_with_constraints( .map_err(to_datafusion_error)?; } + append_manager.appender_flush().unwrap(); + if matches!(overwrite, InsertOp::Overwrite) { insert_table_creator .replace_table(append_manager.tx().unwrap(), orig_table_creator) @@ -496,6 +499,8 @@ fn try_write_all_no_constraints( .map_err(to_datafusion_error)?; } + append_manager.appender_flush().unwrap(); + Ok((num_rows, append_manager)) }