diff --git a/crates/core/src/operations/create.rs b/crates/core/src/operations/create.rs index ba07fa84d4..96025d1d60 100644 --- a/crates/core/src/operations/create.rs +++ b/crates/core/src/operations/create.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use delta_kernel::schema::MetadataValue; use futures::future::BoxFuture; use serde_json::Value; +use tokio::runtime::Handle; use tracing::log::*; use uuid::Uuid; @@ -66,6 +67,8 @@ pub struct CreateBuilder { commit_properties: CommitProperties, raise_if_key_not_exists: bool, custom_execute_handler: Option>, + /// Optional tokio handle to execute the operation on + handle: Option, } impl super::Operation<()> for CreateBuilder { @@ -102,6 +105,7 @@ impl CreateBuilder { commit_properties: CommitProperties::default(), raise_if_key_not_exists: true, custom_execute_handler: None, + handle: None, } } @@ -244,6 +248,12 @@ impl CreateBuilder { self } + /// Set a custom tokio handle to execute the operation on + pub fn with_handle(mut self, handle: Handle) -> Self { + self.handle = Some(handle); + self + } + /// Consume self into uninitialized table with corresponding create actions and operation meta pub(crate) async fn into_table_and_actions( mut self, @@ -347,54 +357,65 @@ impl std::future::IntoFuture for CreateBuilder { fn into_future(self) -> Self::IntoFuture { let this = self; - Box::pin(async move { - let handler = this.custom_execute_handler.clone(); - let mode = &this.mode; - let (mut table, mut actions, operation, operation_id) = - this.clone().into_table_and_actions().await?; - - let table_state = if table.log_store.is_delta_table_location().await? { - match mode { - SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()), - SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()), - SaveMode::Ignore => { - table.load().await?; - return Ok(table); - } - SaveMode::Overwrite => { - table.load().await?; - let remove_actions = table - .snapshot()? - .log_data() - .iter() - .map(|p| p.remove_action(true).into()); - actions.extend(remove_actions); - Some(table.snapshot()?) + Box::pin(async { + let handle = this.handle.clone(); + let fut = async move { + let handler = this.custom_execute_handler.clone(); + let mode = &this.mode; + let (mut table, mut actions, operation, operation_id) = + this.clone().into_table_and_actions().await?; + + let table_state = if table.log_store.is_delta_table_location().await? { + match mode { + SaveMode::ErrorIfExists => { + return Err(CreateError::TableAlreadyExists.into()) + } + SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()), + SaveMode::Ignore => { + table.load().await?; + return Ok(table); + } + SaveMode::Overwrite => { + table.load().await?; + let remove_actions = table + .snapshot()? + .log_data() + .iter() + .map(|p| p.remove_action(true).into()); + actions.extend(remove_actions); + Some(table.snapshot()?) + } } + } else { + None + }; + + let version = CommitBuilder::from(this.commit_properties.clone()) + .with_actions(actions) + .with_operation_id(operation_id) + .with_post_commit_hook_handler(handler.clone()) + .build( + table_state.map(|f| f as &dyn TableReference), + table.log_store.clone(), + operation, + ) + .await? + .version(); + table.load_version(version).await?; + + if let Some(handler) = handler { + handler + .post_execute(&table.log_store(), operation_id) + .await?; } - } else { - None + Ok(table) }; - - let version = CommitBuilder::from(this.commit_properties.clone()) - .with_actions(actions) - .with_operation_id(operation_id) - .with_post_commit_hook_handler(handler.clone()) - .build( - table_state.map(|f| f as &dyn TableReference), - table.log_store.clone(), - operation, - ) - .await? - .version(); - table.load_version(version).await?; - - if let Some(handler) = handler { - handler - .post_execute(&table.log_store(), operation_id) - .await?; + if let Some(handle) = handle { + let task_handle = handle.spawn(fut); + crate::operations::util::flatten_join_error(task_handle).await + } else { + fut.await } - Ok(table) }) } } @@ -614,6 +635,23 @@ mod tests { assert_eq!(state.log_data().num_files(), 0); } + #[tokio::test] + async fn test_create_table_with_custom_handle() { + let table_schema = get_delta_schema(); + + let cpu_runtime = crate::operations::util::CpuRuntime::try_new().unwrap(); + + let table = DeltaOps::new_in_memory() + .create() + .with_columns(table_schema.fields().cloned()) + .with_save_mode(SaveMode::Ignore) + .with_handle(cpu_runtime.handle().clone()) + .await + .unwrap(); + assert_eq!(table.version(), Some(0)); + assert_eq!(table.snapshot().unwrap().schema(), &table_schema) + } + #[tokio::test] async fn test_create_table_metadata_raise_if_key_not_exists() { let schema = get_delta_schema(); diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index c024bc5538..fbc696648e 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -404,6 +404,113 @@ pub fn get_num_idx_cols_and_stats_columns( ) } +pub(super) mod util { + use super::*; + use futures::Future; + use tokio::{runtime::Handle, sync::Notify, task::JoinError}; + + pub async fn flatten_join_error( + future: impl Future, JoinError>>, + ) -> Result + where + E: Into, + { + match future.await { + Ok(Ok(result)) => Ok(result), + Ok(Err(error)) => Err(error.into()), + Err(error) => Err(DeltaTableError::GenericError { + source: Box::new(error), + }), + } + } + + /// Creates a Tokio [`Runtime`] for use with CPU bound tasks + /// + /// Tokio forbids dropping `Runtime`s in async contexts, so creating a separate + /// `Runtime` correctly is somewhat tricky. This structure manages the creation + /// and shutdown of a separate thread. + /// + /// # Notes + /// On drop, the thread will wait for all remaining tasks to complete. + /// + /// Depending on your application, more sophisticated shutdown logic may be + /// required, such as ensuring that no new tasks are added to the runtime. + /// + /// # Credits + /// This code is derived from code originally written for [InfluxDB 3.0] + /// + /// [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor + /// Vendored from [datafusion-examples](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/thread_pools.rs) + pub struct CpuRuntime { + /// Handle is the tokio structure for interacting with a Runtime. + handle: Handle, + /// Signal to start shutting down + notify_shutdown: Arc, + /// When thread is active, is Some + thread_join_handle: Option>, + } + + impl Drop for CpuRuntime { + fn drop(&mut self) { + // Notify the thread to shutdown. + self.notify_shutdown.notify_one(); + // In a production system you also need to ensure your code stops adding + // new tasks to the underlying runtime after this point to allow the + // thread to complete its work and exit cleanly. + if let Some(thread_join_handle) = self.thread_join_handle.take() { + // If the thread is still running, we wait for it to finish + print!("Shutting down CPU runtime thread..."); + if let Err(e) = thread_join_handle.join() { + eprintln!("Error joining CPU runtime thread: {e:?}",); + } else { + println!("CPU runtime thread shutdown successfully."); + } + } + } + } + + impl CpuRuntime { + /// Create a new Tokio Runtime for CPU bound tasks + pub fn try_new() -> DeltaResult { + let cpu_runtime = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .build()?; + let handle = cpu_runtime.handle().clone(); + let notify_shutdown = Arc::new(Notify::new()); + let notify_shutdown_captured = Arc::clone(¬ify_shutdown); + + // The cpu_runtime runs and is dropped on a separate thread + let thread_join_handle = std::thread::spawn(move || { + cpu_runtime.block_on(async move { + notify_shutdown_captured.notified().await; + }); + // Note: cpu_runtime is dropped here, which will wait for all tasks + // to complete + }); + + Ok(Self { + handle, + notify_shutdown, + thread_join_handle: Some(thread_join_handle), + }) + } + + /// Return a handle suitable for spawning CPU bound tasks + /// + /// # Notes + /// + /// If a task spawned on this handle attempts to do IO, it will error with a + /// message such as: + /// + /// ```text + ///A Tokio 1.x context was found, but IO is disabled. + /// ``` + pub fn handle(&self) -> &Handle { + &self.handle + } + } +} + /// Get the target_file_size from the table configuration in the sates /// If table_config does not exist (only can occur in the first write action) it takes /// the configuration that was passed to the writerBuilder. diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index b351988e1e..d981851241 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -670,7 +670,7 @@ impl MergePlan { object_store.clone(), futures::future::ready(Ok(batch_stream)), )); - util::flatten_join_error(rewrite_result) + crate::operations::util::flatten_join_error(rewrite_result) }) .boxed(), OptimizeOperations::ZOrder(zorder_columns, bins, state) => { @@ -727,7 +727,7 @@ impl MergePlan { log_store.object_store(Some(operation_id)), batch_stream, )); - util::flatten_join_error(rewrite_result) + crate::operations::util::flatten_join_error(rewrite_result) }) .boxed() } @@ -1070,27 +1070,6 @@ async fn build_zorder_plan( Ok((operation, metrics)) } -pub(super) mod util { - use super::*; - use futures::Future; - use tokio::task::JoinError; - - pub async fn flatten_join_error( - future: impl Future, JoinError>>, - ) -> Result - where - E: Into, - { - match future.await { - Ok(Ok(result)) => Ok(result), - Ok(Err(error)) => Err(error.into()), - Err(error) => Err(DeltaTableError::GenericError { - source: Box::new(error), - }), - } - } -} - /// Z-order utilities pub(super) mod zorder { use super::*;