Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use arrow_array::RecordBatch;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use datafusion::prelude::SessionConfig;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::expressions::Scalar;
use delta_kernel::table_properties::DataSkippingNumIndexedCols;
Expand Down Expand Up @@ -213,6 +214,8 @@ pub struct OptimizeBuilder<'a> {
max_spill_size: usize,
/// Optimize type
optimize_type: OptimizeType,
/// Optional [SessionConfig] for users that want more control over the Datafusion execution
session_config: Option<SessionConfig>,
min_commit_interval: Option<Duration>,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}
Expand Down Expand Up @@ -241,6 +244,7 @@ impl<'a> OptimizeBuilder<'a> {
max_spill_size: 20 * 1024 * 1024 * 1024, // 20 GB.
optimize_type: OptimizeType::Compact,
min_commit_interval: None,
session_config: None,
custom_execute_handler: None,
}
}
Expand Down Expand Up @@ -304,6 +308,12 @@ impl<'a> OptimizeBuilder<'a> {
self.custom_execute_handler = Some(handler);
self
}

/// Add a custom [SessionConfig] to the optimize plan execution
pub fn with_session_config(mut self, config: SessionConfig) -> Self {
self.session_config = Some(config);
self
}
}

impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
Expand Down Expand Up @@ -334,6 +344,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.filters,
this.target_size.to_owned(),
writer_properties,
this.session_config,
)
.await?;

Expand Down Expand Up @@ -428,6 +439,7 @@ enum OptimizeOperations {
ZOrder(
Vec<String>,
HashMap<String, (IndexMap<String, Scalar>, MergeBin)>,
Option<SessionConfig>,
),
// TODO: Sort
}
Expand Down Expand Up @@ -651,14 +663,14 @@ impl MergePlan {
util::flatten_join_error(rewrite_result)
})
.boxed(),
OptimizeOperations::ZOrder(zorder_columns, bins) => {
OptimizeOperations::ZOrder(zorder_columns, bins, session_config) => {
debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}");

#[cfg(feature = "datafusion")]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
log_store.object_store(Some(operation_id)),
max_spill_size,
session_config,
)?);
let task_parameters = self.task_parameters.clone();

Expand Down Expand Up @@ -796,6 +808,7 @@ pub async fn create_merge_plan(
filters: &[PartitionFilter],
target_size: Option<u64>,
writer_properties: WriterProperties,
session_config: Option<SessionConfig>,
) -> Result<MergePlan, DeltaTableError> {
let target_size =
target_size.unwrap_or_else(|| snapshot.table_config().target_file_size().get());
Expand All @@ -812,6 +825,7 @@ pub async fn create_merge_plan(
snapshot,
partitions_keys,
filters,
session_config,
)
.await?
}
Expand Down Expand Up @@ -972,6 +986,7 @@ async fn build_zorder_plan(
snapshot: &DeltaTableState,
partition_keys: &[String],
filters: &[PartitionFilter],
session_config: Option<SessionConfig>,
) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
if zorder_columns.is_empty() {
return Err(DeltaTableError::Generic(
Expand Down Expand Up @@ -1028,7 +1043,7 @@ async fn build_zorder_plan(
debug!("partition_files inside the zorder plan: {partition_files:?}");
}

let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files);
let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files, session_config);
Ok((operation, metrics))
}

Expand Down Expand Up @@ -1095,6 +1110,7 @@ pub(super) mod zorder {
columns: Vec<String>,
object_store: ObjectStoreRef,
max_spill_size: usize,
session_config: Option<SessionConfig>,
) -> Result<Self, DataFusionError> {
let columns = columns.into();

Expand All @@ -1104,7 +1120,8 @@ pub(super) mod zorder {
.build_arc()?;
runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store);

let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime);
let ctx =
SessionContext::new_with_config_rt(session_config.unwrap_or_default(), runtime);
ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF));
Ok(Self { columns, ctx })
}
Expand Down
3 changes: 3 additions & 0 deletions crates/core/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box<dyn Error>> {
&filter,
None,
WriterProperties::builder().build(),
None,
)
.await?;

Expand Down Expand Up @@ -363,6 +364,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box<dyn Error>> {
&filter,
None,
WriterProperties::builder().build(),
None,
)
.await?;

Expand Down Expand Up @@ -425,6 +427,7 @@ async fn test_commit_interval() -> Result<(), Box<dyn Error>> {
&[],
None,
WriterProperties::builder().build(),
None,
)
.await?;

Expand Down
Loading