Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 44 additions & 34 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use arrow_array::RecordBatch;
use arrow_schema::SchemaRef as ArrowSchemaRef;
use datafusion::prelude::SessionConfig;
use datafusion::execution::context::SessionState;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::SessionStateBuilder;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::expressions::Scalar;
use delta_kernel::table_properties::DataSkippingNumIndexedCols;
Expand Down Expand Up @@ -214,8 +217,8 @@ pub struct OptimizeBuilder<'a> {
max_spill_size: usize,
/// Optimize type
optimize_type: OptimizeType,
/// Optional [SessionConfig] for users that want more control over the Datafusion execution
session_config: Option<SessionConfig>,
/// Datafusion session state relevant for executing the input plan
state: Option<SessionState>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't looked at all the APIs, but would we get away with tracking something like Arc<dyn Session> here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, looks like that's preferred by Datafusion to pass this instead. https://docs.rs/datafusion-session/50.1.0/datafusion_session/session/trait.Session.html#migration-from-sessionstate. Let me give it a shot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roeap I will resolve this in a follow-up PR, since I'm unifying all the interfaces anyways

min_commit_interval: Option<Duration>,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
}
Expand Down Expand Up @@ -244,7 +247,7 @@ impl<'a> OptimizeBuilder<'a> {
max_spill_size: 20 * 1024 * 1024 * 1024, // 20 GB.
optimize_type: OptimizeType::Compact,
min_commit_interval: None,
session_config: None,
state: None,
custom_execute_handler: None,
}
}
Expand Down Expand Up @@ -292,6 +295,10 @@ impl<'a> OptimizeBuilder<'a> {
}

/// Max spill size
#[deprecated(
since = "0.29.0",
note = "Pass in a `SessionState` configured with a `RuntimeEnv` and a `FairSpillPool`"
)]
pub fn with_max_spill_size(mut self, max_spill_size: usize) -> Self {
self.max_spill_size = max_spill_size;
self
Expand All @@ -309,9 +316,9 @@ impl<'a> OptimizeBuilder<'a> {
self
}

/// Add a custom [SessionConfig] to the optimize plan execution
pub fn with_session_config(mut self, config: SessionConfig) -> Self {
self.session_config = Some(config);
/// A session state accompanying a given input plan, containing e.g. registered object stores
pub fn with_input_session_state(mut self, state: SessionState) -> Self {
self.state = Some(state);
self
}
Comment on lines +320 to 324
Copy link
Contributor Author

@abhi-airspace-intelligence abhi-airspace-intelligence Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the name of this function is inconsistent across the various builder APIs, happy to open a follow-up PR to rename everything to either with_input_session_state or with_session_state and deprecate the other.

}
Expand Down Expand Up @@ -344,7 +351,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.filters,
this.target_size.to_owned(),
writer_properties,
this.session_config,
this.state,
)
.await?;

Expand Down Expand Up @@ -439,7 +446,7 @@ enum OptimizeOperations {
ZOrder(
Vec<String>,
HashMap<String, (IndexMap<String, Scalar>, MergeBin)>,
Option<SessionConfig>,
Box<Option<SessionState>>,
),
// TODO: Sort
}
Expand Down Expand Up @@ -622,6 +629,7 @@ impl MergePlan {
handle: Option<&Arc<dyn CustomExecuteHandler>>,
) -> Result<Metrics, DeltaTableError> {
let operations = std::mem::take(&mut self.operations);
let object_store = log_store.object_store(Some(operation_id));

let stream = match operations {
OptimizeOperations::Compact(bins) => futures::stream::iter(bins)
Expand All @@ -636,7 +644,7 @@ impl MergePlan {
for file in files.iter() {
debug!(" file {}", file.path);
}
let object_store_ref = log_store.object_store(Some(operation_id));
let object_store_ref = object_store.clone();
let batch_stream = futures::stream::iter(files.clone())
.then(move |file| {
let object_store_ref = object_store_ref.clone();
Expand All @@ -657,20 +665,32 @@ impl MergePlan {
self.task_parameters.clone(),
partition,
files,
log_store.object_store(Some(operation_id)).clone(),
object_store.clone(),
futures::future::ready(Ok(batch_stream)),
));
util::flatten_join_error(rewrite_result)
})
.boxed(),
OptimizeOperations::ZOrder(zorder_columns, bins, session_config) => {
OptimizeOperations::ZOrder(zorder_columns, bins, state) => {
debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}");

let state = if let Some(state) = *state {
state
} else {
let memory_pool = FairSpillPool::new(max_spill_size);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(memory_pool))
.build_arc()?;
SessionStateBuilder::new()
.with_default_features()
.with_runtime_env(runtime)
.build()
};

let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
log_store.object_store(Some(operation_id)),
max_spill_size,
session_config,
state,
object_store,
)?);
let task_parameters = self.task_parameters.clone();

Expand Down Expand Up @@ -808,7 +828,7 @@ pub async fn create_merge_plan(
filters: &[PartitionFilter],
target_size: Option<u64>,
writer_properties: WriterProperties,
session_config: Option<SessionConfig>,
state: Option<SessionState>,
) -> Result<MergePlan, DeltaTableError> {
let target_size =
target_size.unwrap_or_else(|| snapshot.table_config().target_file_size().get());
Expand All @@ -825,7 +845,7 @@ pub async fn create_merge_plan(
snapshot,
partitions_keys,
filters,
session_config,
state,
)
.await?
}
Expand Down Expand Up @@ -986,7 +1006,7 @@ async fn build_zorder_plan(
snapshot: &DeltaTableState,
partition_keys: &[String],
filters: &[PartitionFilter],
session_config: Option<SessionConfig>,
state: Option<SessionState>,
) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
if zorder_columns.is_empty() {
return Err(DeltaTableError::Generic(
Expand Down Expand Up @@ -1043,7 +1063,7 @@ async fn build_zorder_plan(
debug!("partition_files inside the zorder plan: {partition_files:?}");
}

let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files, session_config);
let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files, Box::new(state));
Ok((operation, metrics))
}

Expand Down Expand Up @@ -1090,10 +1110,7 @@ pub(super) mod zorder {
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature,
Volatility,
};
use ::datafusion::{
execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder},
prelude::{SessionConfig, SessionContext},
};
use ::datafusion::prelude::SessionContext;
use arrow_schema::DataType;
use itertools::Itertools;
use std::any::Any;
Expand All @@ -1108,21 +1125,14 @@ pub(super) mod zorder {
impl ZOrderExecContext {
pub fn new(
columns: Vec<String>,
object_store: ObjectStoreRef,
max_spill_size: usize,
session_config: Option<SessionConfig>,
session_state: SessionState,
object_store_ref: ObjectStoreRef,
) -> Result<Self, DataFusionError> {
let columns = columns.into();

let memory_pool = FairSpillPool::new(max_spill_size);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(memory_pool))
.build_arc()?;
runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store);

let ctx =
SessionContext::new_with_config_rt(session_config.unwrap_or_default(), runtime);
let ctx = SessionContext::new_with_state(session_state);
ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF));
ctx.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store_ref);
Ok(Self { columns, ctx })
}
}
Expand Down
Loading