From 9affaa1b6ea87667a667523ddab13b93307b5acc Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Wed, 19 Mar 2025 18:02:30 +0900 Subject: [PATCH 1/3] Implement a phased commit for DuckDB --- src/duckdb.rs | 16 ++- src/duckdb/write.rs | 120 ++++++++++++++++++----- src/sql/db_connection_pool/duckdbpool.rs | 13 +-- 3 files changed, 113 insertions(+), 36 deletions(-) diff --git a/src/duckdb.rs b/src/duckdb.rs index 90a34d6b..3f833534 100644 --- a/src/duckdb.rs +++ b/src/duckdb.rs @@ -460,6 +460,12 @@ impl DuckDB { .context(DbConnectionSnafu) } + pub fn connect_sync_direct(self: Arc) -> Result { + Arc::clone(&self.pool) + .connect_sync_direct() + .context(DbConnectionSnafu) + } + pub fn duckdb_conn( db_connection: &mut Box< dyn DbConnection, DuckDBParameter>, @@ -616,12 +622,12 @@ impl DuckDB { ); } if !extra_in_actual.is_empty() { - tracing::warn!( - "Unexpected index(es) detected in table '{name}': {}.\n\ + tracing::warn!( + "Unexpected index(es) detected in table '{name}': {}.\n\ These indexes are not defined in the configuration.", - extra_in_actual.iter().join(", "), - name = self.table_name -); + extra_in_actual.iter().join(", "), + name = self.table_name + ); } Ok(missing_in_actual.is_empty() && extra_in_actual.is_empty()) diff --git a/src/duckdb/write.rs b/src/duckdb/write.rs index 30d8c100..af9c7234 100644 --- a/src/duckdb/write.rs +++ b/src/duckdb/write.rs @@ -1,6 +1,7 @@ use std::{any::Any, fmt, sync::Arc}; use crate::duckdb::DuckDB; +use crate::sql::db_connection_pool::dbconnection::duckdbconn::DuckDbConnection; use crate::util::{ constraints, on_conflict::OnConflict, @@ -22,7 +23,7 @@ use datafusion::{ DisplayAs, DisplayFormatType, ExecutionPlan, }, }; -use duckdb::Transaction; +use duckdb::{Error as DuckDBError, Transaction}; use futures::StreamExt; use snafu::prelude::*; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -30,6 +31,53 @@ use tokio::task::JoinHandle; use super::to_datafusion_error; +/// A transaction manager that ensures only a single transaction is active at a time +/// for a given DuckDB connection +pub struct DuckDbTransactionManager { + /// The DuckDB connection + conn: DuckDbConnection, + /// The currently active transaction, if any + transaction: Option>, +} + +impl<'a> DuckDbTransactionManager { + /// Create a new connection manager with the given connection + pub fn new(conn: DuckDbConnection) -> Self { + Self { + conn, + transaction: None, + } + } + + /// Begin a new transaction if one doesn't already exist + pub fn begin(&mut self) -> Result<(), DuckDBError> { + if self.transaction.is_none() { + let tx = self.conn.conn.transaction()?; + // SAFETY: The transaction is tied to the lifetime of the connection - because Rust + // doesn't support self-referential structs, we need to transmute the transaction + // to a static lifetime. We never give out a reference to the transaction with the static lifetime, + // so it's safe to transmute. + self.transaction = Some(unsafe { + std::mem::transmute::, duckdb::Transaction<'static>>(tx) + }); + } + Ok(()) + } + + /// Commit the current transaction if one exists and return success/failure + pub fn commit(&mut self) -> Result<(), DuckDBError> { + if let Some(tx) = self.transaction.take() { + tx.commit()?; + } + Ok(()) + } + + /// Execute a database operation with the current transaction + pub fn tx(&'a self) -> Option<&'a Transaction<'a>> { + self.transaction.as_ref() + } +} + #[derive(Debug, Clone)] pub struct DuckDBTableWriter { pub read_provider: Arc, @@ -141,24 +189,22 @@ impl DataSink for DuckDBDataSink { let duckdb_write_handle: JoinHandle> = tokio::task::spawn_blocking(move || { - let mut db_conn = duckdb - .connect_sync() + let db_conn = Arc::clone(&duckdb) + .connect_sync_direct() .map_err(to_retriable_data_write_error)?; - let duckdb_conn = - DuckDB::duckdb_conn(&mut db_conn).map_err(to_retriable_data_write_error)?; + let mut tx_manager = DuckDbTransactionManager::new(db_conn); - let tx = duckdb_conn - .conn - .transaction() + tx_manager + .begin() .context(super::UnableToBeginTransactionSnafu) .map_err(to_datafusion_error)?; - let num_rows = match **duckdb.constraints() { - [] => try_write_all_no_constraints(duckdb, &tx, batch_rx, overwrite)?, + let (num_rows, mut tx_manager) = match **duckdb.constraints() { + [] => try_write_all_no_constraints(duckdb, tx_manager, batch_rx, overwrite)?, _ => try_write_all_with_constraints( duckdb, - &tx, + tx_manager, batch_rx, overwrite, on_conflict, @@ -169,9 +215,11 @@ impl DataSink for DuckDBDataSink { .try_recv() .map_err(to_retriable_data_write_error)?; - tx.commit() + tx_manager + .commit() .context(super::UnableToCommitTransactionSnafu) .map_err(to_retriable_data_write_error)?; + drop(tx_manager); Ok(num_rows) }); @@ -256,11 +304,11 @@ impl DisplayAs for DuckDBDataSink { /// See: fn try_write_all_with_constraints( duckdb: Arc, - tx: &Transaction<'_>, + mut tx_manager: DuckDbTransactionManager, mut data_batches: Receiver, overwrite: InsertOp, on_conflict: Option, -) -> datafusion::common::Result { +) -> datafusion::common::Result<(u64, DuckDbTransactionManager)> { // We want to clone the current table into our insert table let Some(ref orig_table_creator) = duckdb.table_creator else { return Err(DataFusionError::Execution( @@ -271,53 +319,75 @@ fn try_write_all_with_constraints( let mut num_rows = 0; let mut insert_table = orig_table_creator - .create_empty_clone(tx) + .create_empty_clone(tx_manager.tx().unwrap()) .map_err(to_datafusion_error)?; let Some(insert_table_creator) = insert_table.table_creator.take() else { unreachable!() }; + // Auto-commit after processing this many rows + const MAX_ROWS_PER_COMMIT: usize = 10_000_000; + let mut rows_since_last_commit = 0; + while let Some(batch) = data_batches.blocking_recv() { num_rows += u64::try_from(batch.num_rows()).map_err(|e| { DataFusionError::Execution(format!("Unable to convert num_rows() to u64: {e}")) })?; + rows_since_last_commit += batch.num_rows(); + + if rows_since_last_commit > MAX_ROWS_PER_COMMIT { + // Commit the current transaction + tx_manager + .commit() + .context(super::UnableToCommitTransactionSnafu) + .map_err(to_datafusion_error)?; + + // Create a new transaction + tx_manager + .begin() + .context(super::UnableToBeginTransactionSnafu) + .map_err(to_datafusion_error)?; + + rows_since_last_commit = 0; + } + tracing::debug!("Inserting {} rows into cloned table.", batch.num_rows()); insert_table - .insert_batch_no_constraints(tx, &batch) + .insert_batch_no_constraints(tx_manager.tx().unwrap(), &batch) .map_err(to_datafusion_error)?; } if matches!(overwrite, InsertOp::Overwrite) { insert_table_creator - .replace_table(tx, orig_table_creator) + .replace_table(tx_manager.tx().unwrap(), orig_table_creator) .map_err(to_datafusion_error)?; } else { insert_table - .insert_table_into(tx, &duckdb, on_conflict.as_ref()) + .insert_table_into(tx_manager.tx().unwrap(), &duckdb, on_conflict.as_ref()) .map_err(to_datafusion_error)?; insert_table_creator - .delete_table(tx) + .delete_table(tx_manager.tx().unwrap()) .map_err(to_datafusion_error)?; } - Ok(num_rows) + Ok((num_rows, tx_manager)) } /// If there are no constraints on the `DuckDB` table, we can do a simple single transaction write. fn try_write_all_no_constraints( duckdb: Arc, - tx: &Transaction<'_>, + tx_manager: DuckDbTransactionManager, mut data_batches: Receiver, overwrite: InsertOp, -) -> datafusion::common::Result { +) -> datafusion::common::Result<(u64, DuckDbTransactionManager)> { let mut num_rows = 0; if matches!(overwrite, InsertOp::Overwrite) { tracing::debug!("Deleting all data from table."); duckdb - .delete_all_table_data(tx) + .delete_all_table_data(tx_manager.tx().unwrap()) .map_err(to_datafusion_error)?; } @@ -329,9 +399,9 @@ fn try_write_all_no_constraints( tracing::debug!("Inserting {} rows into table.", batch.num_rows()); duckdb - .insert_batch_no_constraints(tx, &batch) + .insert_batch_no_constraints(tx_manager.tx().unwrap(), &batch) .map_err(to_datafusion_error)?; } - Ok(num_rows) + Ok((num_rows, tx_manager)) } diff --git a/src/sql/db_connection_pool/duckdbpool.rs b/src/sql/db_connection_pool/duckdbpool.rs index 24cce629..23b63401 100644 --- a/src/sql/db_connection_pool/duckdbpool.rs +++ b/src/sql/db_connection_pool/duckdbpool.rs @@ -54,7 +54,6 @@ impl std::fmt::Debug for DuckDbConnectionPool { } impl DuckDbConnectionPool { - /// Get the dataset path. Returns `:memory:` if the in memory database is used. pub fn db_path(&self) -> &str { self.path.as_ref() @@ -165,17 +164,19 @@ impl DuckDbConnectionPool { ) -> Result< Box, DuckDBParameter>>, > { + Ok(Box::new(self.connect_sync_direct()?)) + } + + pub fn connect_sync_direct(self: Arc) -> Result { let pool = Arc::clone(&self.pool); let conn: r2d2::PooledConnection = pool.get().context(ConnectionPoolSnafu)?; let attachments = self.get_attachments()?; - Ok(Box::new( - DuckDbConnection::new(conn) - .with_attachments(attachments) - .with_unsupported_type_action(self.unsupported_type_action), - )) + Ok(DuckDbConnection::new(conn) + .with_attachments(attachments) + .with_unsupported_type_action(self.unsupported_type_action)) } #[must_use] From 5b7647f5b8e57f5a60060198c3a3ccb891fa73fa Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Wed, 19 Mar 2025 18:05:42 +0900 Subject: [PATCH 2/3] Add tracing --- src/duckdb/write.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/duckdb/write.rs b/src/duckdb/write.rs index af9c7234..93ecd292 100644 --- a/src/duckdb/write.rs +++ b/src/duckdb/write.rs @@ -338,6 +338,8 @@ fn try_write_all_with_constraints( rows_since_last_commit += batch.num_rows(); if rows_since_last_commit > MAX_ROWS_PER_COMMIT { + tracing::info!("Committing DuckDB transaction after {rows_since_last_commit} rows.",); + // Commit the current transaction tx_manager .commit() From 74ffba149ceb9a4c8a07de9d7b21fb09b62c6fb4 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Wed, 19 Mar 2025 18:52:41 +0900 Subject: [PATCH 3/3] Also handle for no constraints case --- src/duckdb/write.rs | 65 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/src/duckdb/write.rs b/src/duckdb/write.rs index 93ecd292..891518b9 100644 --- a/src/duckdb/write.rs +++ b/src/duckdb/write.rs @@ -377,33 +377,78 @@ fn try_write_all_with_constraints( Ok((num_rows, tx_manager)) } -/// If there are no constraints on the `DuckDB` table, we can do a simple single transaction write. +/// Even if there are no constraints on the `DuckDB` table, we use the temp table approach +/// to be consistent with the constrained case and to help with performance. fn try_write_all_no_constraints( duckdb: Arc, - tx_manager: DuckDbTransactionManager, + mut tx_manager: DuckDbTransactionManager, mut data_batches: Receiver, overwrite: InsertOp, ) -> datafusion::common::Result<(u64, DuckDbTransactionManager)> { + // We want to clone the current table into our insert table + let Some(ref orig_table_creator) = duckdb.table_creator else { + return Err(DataFusionError::Execution( + "Expected table to have a table creator".to_string(), + )); + }; + let mut num_rows = 0; - if matches!(overwrite, InsertOp::Overwrite) { - tracing::debug!("Deleting all data from table."); - duckdb - .delete_all_table_data(tx_manager.tx().unwrap()) - .map_err(to_datafusion_error)?; - } + let mut insert_table = orig_table_creator + .create_empty_clone(tx_manager.tx().unwrap()) + .map_err(to_datafusion_error)?; + + let Some(insert_table_creator) = insert_table.table_creator.take() else { + unreachable!() + }; + + // Auto-commit after processing this many rows + const MAX_ROWS_PER_COMMIT: usize = 10_000_000; + let mut rows_since_last_commit = 0; while let Some(batch) = data_batches.blocking_recv() { num_rows += u64::try_from(batch.num_rows()).map_err(|e| { DataFusionError::Execution(format!("Unable to convert num_rows() to u64: {e}")) })?; - tracing::debug!("Inserting {} rows into table.", batch.num_rows()); + rows_since_last_commit += batch.num_rows(); + + if rows_since_last_commit > MAX_ROWS_PER_COMMIT { + tracing::info!("Committing DuckDB transaction after {rows_since_last_commit} rows.",); + + // Commit the current transaction + tx_manager + .commit() + .context(super::UnableToCommitTransactionSnafu) + .map_err(to_datafusion_error)?; + + // Create a new transaction + tx_manager + .begin() + .context(super::UnableToBeginTransactionSnafu) + .map_err(to_datafusion_error)?; + + rows_since_last_commit = 0; + } - duckdb + tracing::debug!("Inserting {} rows into cloned table.", batch.num_rows()); + insert_table .insert_batch_no_constraints(tx_manager.tx().unwrap(), &batch) .map_err(to_datafusion_error)?; } + if matches!(overwrite, InsertOp::Overwrite) { + insert_table_creator + .replace_table(tx_manager.tx().unwrap(), orig_table_creator) + .map_err(to_datafusion_error)?; + } else { + insert_table + .insert_table_into(tx_manager.tx().unwrap(), &duckdb, None) + .map_err(to_datafusion_error)?; + insert_table_creator + .delete_table(tx_manager.tx().unwrap()) + .map_err(to_datafusion_error)?; + } + Ok((num_rows, tx_manager)) }