Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 82 additions & 44 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;
use delta_kernel::schema::MetadataValue;
use futures::future::BoxFuture;
use serde_json::Value;
use tokio::runtime::Handle;
use tracing::log::*;
use uuid::Uuid;

Expand Down Expand Up @@ -66,6 +67,8 @@ pub struct CreateBuilder {
commit_properties: CommitProperties,
raise_if_key_not_exists: bool,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
/// Optional tokio handle to execute the operation on
handle: Option<Handle>,
}

impl super::Operation<()> for CreateBuilder {
Expand Down Expand Up @@ -102,6 +105,7 @@ impl CreateBuilder {
commit_properties: CommitProperties::default(),
raise_if_key_not_exists: true,
custom_execute_handler: None,
handle: None,
}
}

Expand Down Expand Up @@ -244,6 +248,12 @@ impl CreateBuilder {
self
}

/// Set a custom tokio handle to execute the operation on
pub fn with_handle(mut self, handle: Handle) -> Self {
self.handle = Some(handle);
self
}

/// Consume self into uninitialized table with corresponding create actions and operation meta
pub(crate) async fn into_table_and_actions(
mut self,
Expand Down Expand Up @@ -347,54 +357,65 @@ impl std::future::IntoFuture for CreateBuilder {

fn into_future(self) -> Self::IntoFuture {
let this = self;
Box::pin(async move {
let handler = this.custom_execute_handler.clone();
let mode = &this.mode;
let (mut table, mut actions, operation, operation_id) =
this.clone().into_table_and_actions().await?;

let table_state = if table.log_store.is_delta_table_location().await? {
match mode {
SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()),
SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()),
SaveMode::Ignore => {
table.load().await?;
return Ok(table);
}
SaveMode::Overwrite => {
table.load().await?;
let remove_actions = table
.snapshot()?
.log_data()
.iter()
.map(|p| p.remove_action(true).into());
actions.extend(remove_actions);
Some(table.snapshot()?)
Box::pin(async {
let handle = this.handle.clone();
let fut = async move {
let handler = this.custom_execute_handler.clone();
let mode = &this.mode;
let (mut table, mut actions, operation, operation_id) =
this.clone().into_table_and_actions().await?;

let table_state = if table.log_store.is_delta_table_location().await? {
match mode {
SaveMode::ErrorIfExists => {
return Err(CreateError::TableAlreadyExists.into())
}
SaveMode::Append => return Err(CreateError::AppendNotAllowed.into()),
SaveMode::Ignore => {
table.load().await?;
return Ok(table);
}
SaveMode::Overwrite => {
table.load().await?;
let remove_actions = table
.snapshot()?
.log_data()
.iter()
.map(|p| p.remove_action(true).into());
actions.extend(remove_actions);
Some(table.snapshot()?)
}
}
} else {
None
};

let version = CommitBuilder::from(this.commit_properties.clone())
.with_actions(actions)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(handler.clone())
.build(
table_state.map(|f| f as &dyn TableReference),
table.log_store.clone(),
operation,
)
.await?
.version();
table.load_version(version).await?;

if let Some(handler) = handler {
handler
.post_execute(&table.log_store(), operation_id)
.await?;
}
} else {
None
Ok(table)
};

let version = CommitBuilder::from(this.commit_properties.clone())
.with_actions(actions)
.with_operation_id(operation_id)
.with_post_commit_hook_handler(handler.clone())
.build(
table_state.map(|f| f as &dyn TableReference),
table.log_store.clone(),
operation,
)
.await?
.version();
table.load_version(version).await?;

if let Some(handler) = handler {
handler
.post_execute(&table.log_store(), operation_id)
.await?;
if let Some(handle) = handle {
let task_handle = handle.spawn(fut);
crate::operations::util::flatten_join_error(task_handle).await
} else {
fut.await
}
Ok(table)
})
}
}
Expand Down Expand Up @@ -614,6 +635,23 @@ mod tests {
assert_eq!(state.log_data().num_files(), 0);
}

#[tokio::test]
async fn test_create_table_with_custom_handle() {
let table_schema = get_delta_schema();

let cpu_runtime = crate::operations::util::CpuRuntime::try_new().unwrap();

let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.with_save_mode(SaveMode::Ignore)
.with_handle(cpu_runtime.handle().clone())
.await
.unwrap();
assert_eq!(table.version(), Some(0));
assert_eq!(table.snapshot().unwrap().schema(), &table_schema)
}

#[tokio::test]
async fn test_create_table_metadata_raise_if_key_not_exists() {
let schema = get_delta_schema();
Expand Down
107 changes: 107 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,113 @@ pub fn get_num_idx_cols_and_stats_columns(
)
}

pub(super) mod util {
use super::*;
use futures::Future;
use tokio::{runtime::Handle, sync::Notify, task::JoinError};

pub async fn flatten_join_error<T, E>(
future: impl Future<Output = Result<Result<T, E>, JoinError>>,
) -> Result<T, DeltaTableError>
where
E: Into<DeltaTableError>,
{
match future.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(error)) => Err(error.into()),
Err(error) => Err(DeltaTableError::GenericError {
source: Box::new(error),
}),
}
}

/// Creates a Tokio [`Runtime`] for use with CPU bound tasks
///
/// Tokio forbids dropping `Runtime`s in async contexts, so creating a separate
/// `Runtime` correctly is somewhat tricky. This structure manages the creation
/// and shutdown of a separate thread.
///
/// # Notes
/// On drop, the thread will wait for all remaining tasks to complete.
///
/// Depending on your application, more sophisticated shutdown logic may be
/// required, such as ensuring that no new tasks are added to the runtime.
///
/// # Credits
/// This code is derived from code originally written for [InfluxDB 3.0]
///
/// [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor
/// Vendored from [datafusion-examples](https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/thread_pools.rs)
pub struct CpuRuntime {
/// Handle is the tokio structure for interacting with a Runtime.
handle: Handle,
/// Signal to start shutting down
notify_shutdown: Arc<Notify>,
/// When thread is active, is Some
thread_join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Drop for CpuRuntime {
fn drop(&mut self) {
// Notify the thread to shutdown.
self.notify_shutdown.notify_one();
// In a production system you also need to ensure your code stops adding
// new tasks to the underlying runtime after this point to allow the
// thread to complete its work and exit cleanly.
if let Some(thread_join_handle) = self.thread_join_handle.take() {
// If the thread is still running, we wait for it to finish
print!("Shutting down CPU runtime thread...");
if let Err(e) = thread_join_handle.join() {
eprintln!("Error joining CPU runtime thread: {e:?}",);
} else {
println!("CPU runtime thread shutdown successfully.");
}
}
}
}

impl CpuRuntime {
/// Create a new Tokio Runtime for CPU bound tasks
pub fn try_new() -> DeltaResult<Self> {
let cpu_runtime = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.build()?;
let handle = cpu_runtime.handle().clone();
let notify_shutdown = Arc::new(Notify::new());
let notify_shutdown_captured = Arc::clone(&notify_shutdown);

// The cpu_runtime runs and is dropped on a separate thread
let thread_join_handle = std::thread::spawn(move || {
cpu_runtime.block_on(async move {
notify_shutdown_captured.notified().await;
});
// Note: cpu_runtime is dropped here, which will wait for all tasks
// to complete
});

Ok(Self {
handle,
notify_shutdown,
thread_join_handle: Some(thread_join_handle),
})
}

/// Return a handle suitable for spawning CPU bound tasks
///
/// # Notes
///
/// If a task spawned on this handle attempts to do IO, it will error with a
/// message such as:
///
/// ```text
///A Tokio 1.x context was found, but IO is disabled.
/// ```
pub fn handle(&self) -> &Handle {
&self.handle
}
}
}

/// Get the target_file_size from the table configuration in the sates
/// If table_config does not exist (only can occur in the first write action) it takes
/// the configuration that was passed to the writerBuilder.
Expand Down
25 changes: 2 additions & 23 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ impl MergePlan {
object_store.clone(),
futures::future::ready(Ok(batch_stream)),
));
util::flatten_join_error(rewrite_result)
crate::operations::util::flatten_join_error(rewrite_result)
})
.boxed(),
OptimizeOperations::ZOrder(zorder_columns, bins, state) => {
Expand Down Expand Up @@ -727,7 +727,7 @@ impl MergePlan {
log_store.object_store(Some(operation_id)),
batch_stream,
));
util::flatten_join_error(rewrite_result)
crate::operations::util::flatten_join_error(rewrite_result)
})
.boxed()
}
Expand Down Expand Up @@ -1070,27 +1070,6 @@ async fn build_zorder_plan(
Ok((operation, metrics))
}

pub(super) mod util {
use super::*;
use futures::Future;
use tokio::task::JoinError;

pub async fn flatten_join_error<T, E>(
future: impl Future<Output = Result<Result<T, E>, JoinError>>,
) -> Result<T, DeltaTableError>
where
E: Into<DeltaTableError>,
{
match future.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(error)) => Err(error.into()),
Err(error) => Err(DeltaTableError::GenericError {
source: Box::new(error),
}),
}
}
}

/// Z-order utilities
pub(super) mod zorder {
use super::*;
Expand Down
Loading