Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Phillip/250319 duckdb phased commit #274

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,12 @@ impl DuckDB {
.context(DbConnectionSnafu)
}

pub fn connect_sync_direct(self: Arc<Self>) -> Result<DuckDbConnection> {
Arc::clone(&self.pool)
.connect_sync_direct()
.context(DbConnectionSnafu)
}

pub fn duckdb_conn(
db_connection: &mut Box<
dyn DbConnection<r2d2::PooledConnection<DuckdbConnectionManager>, DuckDBParameter>,
Expand Down Expand Up @@ -616,12 +622,12 @@ impl DuckDB {
);
}
if !extra_in_actual.is_empty() {
tracing::warn!(
"Unexpected index(es) detected in table '{name}': {}.\n\
tracing::warn!(
"Unexpected index(es) detected in table '{name}': {}.\n\
These indexes are not defined in the configuration.",
extra_in_actual.iter().join(", "),
name = self.table_name
);
extra_in_actual.iter().join(", "),
name = self.table_name
);
}

Ok(missing_in_actual.is_empty() && extra_in_actual.is_empty())
Expand Down
183 changes: 150 additions & 33 deletions src/duckdb/write.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{any::Any, fmt, sync::Arc};

use crate::duckdb::DuckDB;
use crate::sql::db_connection_pool::dbconnection::duckdbconn::DuckDbConnection;
use crate::util::{
constraints,
on_conflict::OnConflict,
Expand All @@ -22,14 +23,61 @@ use datafusion::{
DisplayAs, DisplayFormatType, ExecutionPlan,
},
};
use duckdb::Transaction;
use duckdb::{Error as DuckDBError, Transaction};
use futures::StreamExt;
use snafu::prelude::*;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinHandle;

use super::to_datafusion_error;

/// A transaction manager that ensures only a single transaction is active at a time
/// for a given DuckDB connection
pub struct DuckDbTransactionManager {
/// The DuckDB connection
conn: DuckDbConnection,
/// The currently active transaction, if any
transaction: Option<Transaction<'static>>,
}

impl<'a> DuckDbTransactionManager {
/// Create a new connection manager with the given connection
pub fn new(conn: DuckDbConnection) -> Self {
Self {
conn,
transaction: None,
}
}

/// Begin a new transaction if one doesn't already exist
pub fn begin(&mut self) -> Result<(), DuckDBError> {
if self.transaction.is_none() {
let tx = self.conn.conn.transaction()?;
// SAFETY: The transaction is tied to the lifetime of the connection - because Rust
// doesn't support self-referential structs, we need to transmute the transaction
// to a static lifetime. We never give out a reference to the transaction with the static lifetime,
// so it's safe to transmute.
self.transaction = Some(unsafe {
std::mem::transmute::<duckdb::Transaction<'_>, duckdb::Transaction<'static>>(tx)
});
}
Ok(())
}

/// Commit the current transaction if one exists and return success/failure
pub fn commit(&mut self) -> Result<(), DuckDBError> {
if let Some(tx) = self.transaction.take() {
tx.commit()?;
}
Ok(())
}

/// Execute a database operation with the current transaction
pub fn tx(&'a self) -> Option<&'a Transaction<'a>> {
self.transaction.as_ref()
}
}

#[derive(Debug, Clone)]
pub struct DuckDBTableWriter {
pub read_provider: Arc<dyn TableProvider>,
Expand Down Expand Up @@ -141,24 +189,22 @@ impl DataSink for DuckDBDataSink {

let duckdb_write_handle: JoinHandle<datafusion::common::Result<u64>> =
tokio::task::spawn_blocking(move || {
let mut db_conn = duckdb
.connect_sync()
let db_conn = Arc::clone(&duckdb)
.connect_sync_direct()
.map_err(to_retriable_data_write_error)?;

let duckdb_conn =
DuckDB::duckdb_conn(&mut db_conn).map_err(to_retriable_data_write_error)?;
let mut tx_manager = DuckDbTransactionManager::new(db_conn);

let tx = duckdb_conn
.conn
.transaction()
tx_manager
.begin()
.context(super::UnableToBeginTransactionSnafu)
.map_err(to_datafusion_error)?;

let num_rows = match **duckdb.constraints() {
[] => try_write_all_no_constraints(duckdb, &tx, batch_rx, overwrite)?,
let (num_rows, mut tx_manager) = match **duckdb.constraints() {
[] => try_write_all_no_constraints(duckdb, tx_manager, batch_rx, overwrite)?,
_ => try_write_all_with_constraints(
duckdb,
&tx,
tx_manager,
batch_rx,
overwrite,
on_conflict,
Expand All @@ -169,9 +215,11 @@ impl DataSink for DuckDBDataSink {
.try_recv()
.map_err(to_retriable_data_write_error)?;

tx.commit()
tx_manager
.commit()
.context(super::UnableToCommitTransactionSnafu)
.map_err(to_retriable_data_write_error)?;
drop(tx_manager);

Ok(num_rows)
});
Expand Down Expand Up @@ -256,11 +304,11 @@ impl DisplayAs for DuckDBDataSink {
/// See: <https://duckdb.org/docs/sql/indexes#over-eager-unique-constraint-checking>
fn try_write_all_with_constraints(
duckdb: Arc<DuckDB>,
tx: &Transaction<'_>,
mut tx_manager: DuckDbTransactionManager,
mut data_batches: Receiver<RecordBatch>,
overwrite: InsertOp,
on_conflict: Option<OnConflict>,
) -> datafusion::common::Result<u64> {
) -> datafusion::common::Result<(u64, DuckDbTransactionManager)> {
// We want to clone the current table into our insert table
let Some(ref orig_table_creator) = duckdb.table_creator else {
return Err(DataFusionError::Execution(
Expand All @@ -271,67 +319,136 @@ fn try_write_all_with_constraints(
let mut num_rows = 0;

let mut insert_table = orig_table_creator
.create_empty_clone(tx)
.create_empty_clone(tx_manager.tx().unwrap())
.map_err(to_datafusion_error)?;

let Some(insert_table_creator) = insert_table.table_creator.take() else {
unreachable!()
};

// Auto-commit after processing this many rows
const MAX_ROWS_PER_COMMIT: usize = 10_000_000;
let mut rows_since_last_commit = 0;

while let Some(batch) = data_batches.blocking_recv() {
num_rows += u64::try_from(batch.num_rows()).map_err(|e| {
DataFusionError::Execution(format!("Unable to convert num_rows() to u64: {e}"))
})?;

rows_since_last_commit += batch.num_rows();

if rows_since_last_commit > MAX_ROWS_PER_COMMIT {
tracing::info!("Committing DuckDB transaction after {rows_since_last_commit} rows.",);

// Commit the current transaction
tx_manager
.commit()
.context(super::UnableToCommitTransactionSnafu)
.map_err(to_datafusion_error)?;

// Create a new transaction
tx_manager
.begin()
.context(super::UnableToBeginTransactionSnafu)
.map_err(to_datafusion_error)?;

rows_since_last_commit = 0;
}

tracing::debug!("Inserting {} rows into cloned table.", batch.num_rows());
insert_table
.insert_batch_no_constraints(tx, &batch)
.insert_batch_no_constraints(tx_manager.tx().unwrap(), &batch)
.map_err(to_datafusion_error)?;
}

if matches!(overwrite, InsertOp::Overwrite) {
insert_table_creator
.replace_table(tx, orig_table_creator)
.replace_table(tx_manager.tx().unwrap(), orig_table_creator)
.map_err(to_datafusion_error)?;
} else {
insert_table
.insert_table_into(tx, &duckdb, on_conflict.as_ref())
.insert_table_into(tx_manager.tx().unwrap(), &duckdb, on_conflict.as_ref())
.map_err(to_datafusion_error)?;
insert_table_creator
.delete_table(tx)
.delete_table(tx_manager.tx().unwrap())
.map_err(to_datafusion_error)?;
}

Ok(num_rows)
Ok((num_rows, tx_manager))
}

/// If there are no constraints on the `DuckDB` table, we can do a simple single transaction write.
/// Even if there are no constraints on the `DuckDB` table, we use the temp table approach
/// to be consistent with the constrained case and to help with performance.
fn try_write_all_no_constraints(
duckdb: Arc<DuckDB>,
tx: &Transaction<'_>,
mut tx_manager: DuckDbTransactionManager,
mut data_batches: Receiver<RecordBatch>,
overwrite: InsertOp,
) -> datafusion::common::Result<u64> {
) -> datafusion::common::Result<(u64, DuckDbTransactionManager)> {
// We want to clone the current table into our insert table
let Some(ref orig_table_creator) = duckdb.table_creator else {
return Err(DataFusionError::Execution(
"Expected table to have a table creator".to_string(),
));
};

let mut num_rows = 0;

if matches!(overwrite, InsertOp::Overwrite) {
tracing::debug!("Deleting all data from table.");
duckdb
.delete_all_table_data(tx)
.map_err(to_datafusion_error)?;
}
let mut insert_table = orig_table_creator
.create_empty_clone(tx_manager.tx().unwrap())
.map_err(to_datafusion_error)?;

let Some(insert_table_creator) = insert_table.table_creator.take() else {
unreachable!()
};

// Auto-commit after processing this many rows
const MAX_ROWS_PER_COMMIT: usize = 10_000_000;
let mut rows_since_last_commit = 0;

while let Some(batch) = data_batches.blocking_recv() {
num_rows += u64::try_from(batch.num_rows()).map_err(|e| {
DataFusionError::Execution(format!("Unable to convert num_rows() to u64: {e}"))
})?;

tracing::debug!("Inserting {} rows into table.", batch.num_rows());
rows_since_last_commit += batch.num_rows();

if rows_since_last_commit > MAX_ROWS_PER_COMMIT {
tracing::info!("Committing DuckDB transaction after {rows_since_last_commit} rows.",);

// Commit the current transaction
tx_manager
.commit()
.context(super::UnableToCommitTransactionSnafu)
.map_err(to_datafusion_error)?;

// Create a new transaction
tx_manager
.begin()
.context(super::UnableToBeginTransactionSnafu)
.map_err(to_datafusion_error)?;

duckdb
.insert_batch_no_constraints(tx, &batch)
rows_since_last_commit = 0;
}

tracing::debug!("Inserting {} rows into cloned table.", batch.num_rows());
insert_table
.insert_batch_no_constraints(tx_manager.tx().unwrap(), &batch)
.map_err(to_datafusion_error)?;
}

if matches!(overwrite, InsertOp::Overwrite) {
insert_table_creator
.replace_table(tx_manager.tx().unwrap(), orig_table_creator)
.map_err(to_datafusion_error)?;
} else {
insert_table
.insert_table_into(tx_manager.tx().unwrap(), &duckdb, None)
.map_err(to_datafusion_error)?;
insert_table_creator
.delete_table(tx_manager.tx().unwrap())
.map_err(to_datafusion_error)?;
}

Ok(num_rows)
Ok((num_rows, tx_manager))
}
13 changes: 7 additions & 6 deletions src/sql/db_connection_pool/duckdbpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ impl std::fmt::Debug for DuckDbConnectionPool {
}

impl DuckDbConnectionPool {

/// Get the dataset path. Returns `:memory:` if the in memory database is used.
pub fn db_path(&self) -> &str {
self.path.as_ref()
Expand Down Expand Up @@ -165,17 +164,19 @@ impl DuckDbConnectionPool {
) -> Result<
Box<dyn DbConnection<r2d2::PooledConnection<DuckdbConnectionManager>, DuckDBParameter>>,
> {
Ok(Box::new(self.connect_sync_direct()?))
}

pub fn connect_sync_direct(self: Arc<Self>) -> Result<DuckDbConnection> {
let pool = Arc::clone(&self.pool);
let conn: r2d2::PooledConnection<DuckdbConnectionManager> =
pool.get().context(ConnectionPoolSnafu)?;

let attachments = self.get_attachments()?;

Ok(Box::new(
DuckDbConnection::new(conn)
.with_attachments(attachments)
.with_unsupported_type_action(self.unsupported_type_action),
))
Ok(DuckDbConnection::new(conn)
.with_attachments(attachments)
.with_unsupported_type_action(self.unsupported_type_action))
}

#[must_use]
Expand Down
Loading