-
Couldn't load subscription status.
- Fork 537
feat: allow passing a SessionState into a OptimizeBuilder
#3802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,10 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; | |
|
|
||
| use arrow_array::RecordBatch; | ||
| use arrow_schema::SchemaRef as ArrowSchemaRef; | ||
| use datafusion::prelude::SessionConfig; | ||
| use datafusion::execution::context::SessionState; | ||
| use datafusion::execution::memory_pool::FairSpillPool; | ||
| use datafusion::execution::runtime_env::RuntimeEnvBuilder; | ||
| use datafusion::execution::SessionStateBuilder; | ||
| use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; | ||
| use delta_kernel::expressions::Scalar; | ||
| use delta_kernel::table_properties::DataSkippingNumIndexedCols; | ||
|
|
@@ -215,8 +218,8 @@ pub struct OptimizeBuilder<'a> { | |
| max_spill_size: usize, | ||
| /// Optimize type | ||
| optimize_type: OptimizeType, | ||
| /// Optional [SessionConfig] for users that want more control over the Datafusion execution | ||
| session_config: Option<SessionConfig>, | ||
| /// Datafusion session state relevant for executing the input plan | ||
| state: Option<SessionState>, | ||
| min_commit_interval: Option<Duration>, | ||
| custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>, | ||
| } | ||
|
|
@@ -245,7 +248,7 @@ impl<'a> OptimizeBuilder<'a> { | |
| max_spill_size: 20 * 1024 * 1024 * 1024, // 20 GB. | ||
| optimize_type: OptimizeType::Compact, | ||
| min_commit_interval: None, | ||
| session_config: None, | ||
| state: None, | ||
| custom_execute_handler: None, | ||
| } | ||
| } | ||
|
|
@@ -293,6 +296,10 @@ impl<'a> OptimizeBuilder<'a> { | |
| } | ||
|
|
||
| /// Max spill size | ||
| #[deprecated( | ||
| since = "0.29.0", | ||
| note = "Pass in a `SessionState` configured with a `RuntimeEnv` and a `FairSpillPool`" | ||
| )] | ||
| pub fn with_max_spill_size(mut self, max_spill_size: usize) -> Self { | ||
| self.max_spill_size = max_spill_size; | ||
| self | ||
|
|
@@ -310,9 +317,9 @@ impl<'a> OptimizeBuilder<'a> { | |
| self | ||
| } | ||
|
|
||
| /// Add a custom [SessionConfig] to the optimize plan execution | ||
| pub fn with_session_config(mut self, config: SessionConfig) -> Self { | ||
| self.session_config = Some(config); | ||
| /// A session state accompanying a given input plan, containing e.g. registered object stores | ||
| pub fn with_input_session_state(mut self, state: SessionState) -> Self { | ||
| self.state = Some(state); | ||
| self | ||
| } | ||
|
Comment on lines
+320
to
324
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that the name of this function is inconsistent across the various builder APIs, happy to open a follow-up PR to rename everything to either |
||
| } | ||
|
|
@@ -345,7 +352,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { | |
| this.filters, | ||
| this.target_size.to_owned(), | ||
| writer_properties, | ||
| this.session_config, | ||
| this.state, | ||
| ) | ||
| .await?; | ||
|
|
||
|
|
@@ -441,7 +448,7 @@ enum OptimizeOperations { | |
| ZOrder( | ||
| Vec<String>, | ||
| HashMap<String, (IndexMap<String, Scalar>, MergeBin)>, | ||
| Option<SessionConfig>, | ||
| Box<Option<SessionState>>, | ||
| ), | ||
| // TODO: Sort | ||
| } | ||
|
|
@@ -624,6 +631,7 @@ impl MergePlan { | |
| handle: Option<&Arc<dyn CustomExecuteHandler>>, | ||
| ) -> Result<Metrics, DeltaTableError> { | ||
| let operations = std::mem::take(&mut self.operations); | ||
| let object_store = log_store.object_store(Some(operation_id)); | ||
|
|
||
| let stream = match operations { | ||
| OptimizeOperations::Compact(bins) => futures::stream::iter(bins) | ||
|
|
@@ -638,7 +646,7 @@ impl MergePlan { | |
| for file in files.iter() { | ||
| debug!(" file {}", file.path); | ||
| } | ||
| let object_store_ref = log_store.object_store(Some(operation_id)); | ||
| let object_store_ref = object_store.clone(); | ||
| let batch_stream = futures::stream::iter(files.clone()) | ||
| .then(move |file| { | ||
| let object_store_ref = object_store_ref.clone(); | ||
|
|
@@ -659,20 +667,32 @@ impl MergePlan { | |
| self.task_parameters.clone(), | ||
| partition, | ||
| files, | ||
| log_store.object_store(Some(operation_id)).clone(), | ||
| object_store.clone(), | ||
| futures::future::ready(Ok(batch_stream)), | ||
| )); | ||
| util::flatten_join_error(rewrite_result) | ||
| }) | ||
| .boxed(), | ||
| OptimizeOperations::ZOrder(zorder_columns, bins, session_config) => { | ||
| OptimizeOperations::ZOrder(zorder_columns, bins, state) => { | ||
| debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}"); | ||
|
|
||
| let state = if let Some(state) = *state { | ||
| state | ||
| } else { | ||
| let memory_pool = FairSpillPool::new(max_spill_size); | ||
| let runtime = RuntimeEnvBuilder::new() | ||
| .with_memory_pool(Arc::new(memory_pool)) | ||
| .build_arc()?; | ||
| SessionStateBuilder::new() | ||
| .with_default_features() | ||
| .with_runtime_env(runtime) | ||
| .build() | ||
| }; | ||
|
|
||
| let exec_context = Arc::new(zorder::ZOrderExecContext::new( | ||
| zorder_columns, | ||
| log_store.object_store(Some(operation_id)), | ||
| max_spill_size, | ||
| session_config, | ||
| state, | ||
| object_store, | ||
| )?); | ||
| let task_parameters = self.task_parameters.clone(); | ||
|
|
||
|
|
@@ -811,7 +831,7 @@ pub async fn create_merge_plan( | |
| filters: &[PartitionFilter], | ||
| target_size: Option<u64>, | ||
| writer_properties: WriterProperties, | ||
| session_config: Option<SessionConfig>, | ||
| state: Option<SessionState>, | ||
| ) -> Result<MergePlan, DeltaTableError> { | ||
| let target_size = | ||
| target_size.unwrap_or_else(|| snapshot.table_properties().target_file_size().get()); | ||
|
|
@@ -828,7 +848,7 @@ pub async fn create_merge_plan( | |
| snapshot, | ||
| partitions_keys, | ||
| filters, | ||
| session_config, | ||
| state, | ||
| ) | ||
| .await? | ||
| } | ||
|
|
@@ -989,7 +1009,7 @@ async fn build_zorder_plan( | |
| snapshot: &EagerSnapshot, | ||
| partition_keys: &[String], | ||
| filters: &[PartitionFilter], | ||
| session_config: Option<SessionConfig>, | ||
| state: Option<SessionState>, | ||
| ) -> Result<(OptimizeOperations, Metrics), DeltaTableError> { | ||
| if zorder_columns.is_empty() { | ||
| return Err(DeltaTableError::Generic( | ||
|
|
@@ -1046,7 +1066,7 @@ async fn build_zorder_plan( | |
| debug!("partition_files inside the zorder plan: {partition_files:?}"); | ||
| } | ||
|
|
||
| let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files, session_config); | ||
| let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files, Box::new(state)); | ||
| Ok((operation, metrics)) | ||
| } | ||
|
|
||
|
|
@@ -1093,10 +1113,7 @@ pub(super) mod zorder { | |
| ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, | ||
| Volatility, | ||
| }; | ||
| use ::datafusion::{ | ||
| execution::{memory_pool::FairSpillPool, runtime_env::RuntimeEnvBuilder}, | ||
| prelude::{SessionConfig, SessionContext}, | ||
| }; | ||
| use ::datafusion::prelude::SessionContext; | ||
| use arrow_schema::DataType; | ||
| use itertools::Itertools; | ||
| use std::any::Any; | ||
|
|
@@ -1111,21 +1128,14 @@ pub(super) mod zorder { | |
| impl ZOrderExecContext { | ||
| pub fn new( | ||
| columns: Vec<String>, | ||
| object_store: ObjectStoreRef, | ||
| max_spill_size: usize, | ||
| session_config: Option<SessionConfig>, | ||
| session_state: SessionState, | ||
| object_store_ref: ObjectStoreRef, | ||
| ) -> Result<Self, DataFusionError> { | ||
| let columns = columns.into(); | ||
|
|
||
| let memory_pool = FairSpillPool::new(max_spill_size); | ||
| let runtime = RuntimeEnvBuilder::new() | ||
| .with_memory_pool(Arc::new(memory_pool)) | ||
| .build_arc()?; | ||
| runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store); | ||
|
|
||
| let ctx = | ||
| SessionContext::new_with_config_rt(session_config.unwrap_or_default(), runtime); | ||
| let ctx = SessionContext::new_with_state(session_state); | ||
| ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF)); | ||
| ctx.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store_ref); | ||
| Ok(Self { columns, ctx }) | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't looked at all the APIs, but would we get away with tracking something like
Arc<dyn Session>here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, looks like that's preferred by Datafusion to pass this instead. https://docs.rs/datafusion-session/50.1.0/datafusion_session/session/trait.Session.html#migration-from-sessionstate. Let me give it a shot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@roeap I will resolve this in a follow-up PR, since I'm unifying all the interfaces anyways