-
Couldn't load subscription status.
- Fork 537
feat: add framework for File Format Options #3794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
0aeb7ea
c081278
9059d8e
a41d3d3
cd88ef8
e5915e0
56ec968
1b7d6c2
4a89682
56a0cb5
73cccbe
5359a9e
2d29203
b9376bd
4ae8443
c7281de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,6 +62,7 @@ use crate::kernel::{Action, Add, Remove}; | |
| use crate::operations::write::writer::{DeltaWriter, WriterConfig}; | ||
| use crate::operations::write::WriterStatsConfig; | ||
| use crate::protocol::{DeltaOperation, SaveMode}; | ||
| use crate::table::file_format_options::{to_table_parquet_options_from_ffo, FileFormatRef}; | ||
| use crate::{ensure_table_uri, DeltaTable}; | ||
| use crate::{logstore::LogStoreRef, table::state::DeltaTableState, DeltaResult, DeltaTableError}; | ||
| use delta_kernel::table_properties::DataSkippingNumIndexedCols; | ||
|
|
@@ -107,7 +108,7 @@ impl DeltaDataSink { | |
| log_store: LogStoreRef, | ||
| snapshot: DeltaTableState, | ||
| save_mode: SaveMode, | ||
| session_state: Arc<SessionState>, | ||
| _session_state: Arc<SessionState>, | ||
| ) -> datafusion::common::Result<Self> { | ||
| let schema = snapshot | ||
| .arrow_schema() | ||
|
|
@@ -405,6 +406,7 @@ pub(crate) struct DeltaScanBuilder<'a> { | |
| limit: Option<usize>, | ||
| files: Option<&'a [Add]>, | ||
| config: Option<DeltaScanConfig>, | ||
| parquet_options: Option<TableParquetOptions>, | ||
| } | ||
|
|
||
| impl<'a> DeltaScanBuilder<'a> { | ||
|
|
@@ -422,9 +424,15 @@ impl<'a> DeltaScanBuilder<'a> { | |
| limit: None, | ||
| files: None, | ||
| config: None, | ||
| parquet_options: None, | ||
| } | ||
| } | ||
|
|
||
| pub fn with_parquet_options(mut self, parquet_options: Option<TableParquetOptions>) -> Self { | ||
| self.parquet_options = parquet_options; | ||
| self | ||
| } | ||
|
|
||
| pub fn with_filter(mut self, filter: Option<Expr>) -> Self { | ||
| self.filter = filter; | ||
| self | ||
|
|
@@ -657,13 +665,30 @@ impl<'a> DeltaScanBuilder<'a> { | |
|
|
||
| let stats = stats.unwrap_or(Statistics::new_unknown(&schema)); | ||
|
|
||
| let parquet_options = TableParquetOptions { | ||
| global: self.session.config().options().execution.parquet.clone(), | ||
| ..Default::default() | ||
| }; | ||
| let parquet_options = self | ||
| .parquet_options | ||
| .unwrap_or_else(|| self.session.table_options().parquet.clone()); | ||
|
|
||
| // We have to set the encryption factory on the ParquetSource based on the Parquet options, | ||
| // as this is usually handled by the ParquetFormat type in DataFusion, | ||
| // which is not used in delta-rs. | ||
| let encryption_factory = parquet_options | ||
| .crypto | ||
| .factory_id | ||
| .as_ref() | ||
| .map(|factory_id| { | ||
| self.session | ||
| .runtime_env() | ||
| .parquet_encryption_factory(factory_id) | ||
| }) | ||
| .transpose()?; | ||
|
|
||
| let mut file_source = ParquetSource::new(parquet_options); | ||
|
|
||
| if let Some(encryption_factory) = encryption_factory { | ||
| file_source = file_source.with_encryption_factory(encryption_factory); | ||
| } | ||
|
|
||
| // Sometimes (i.e Merge) we want to prune files that don't make the | ||
| // filter and read the entire contents for files that do match the | ||
| // filter | ||
|
|
@@ -743,9 +768,17 @@ impl TableProvider for DeltaTable { | |
| limit: Option<usize>, | ||
| ) -> Result<Arc<dyn ExecutionPlan>> { | ||
| register_store(self.log_store(), session.runtime_env().clone()); | ||
| if let Some(format_options) = &self.file_format_options { | ||
| format_options.update_session(session)?; | ||
| } | ||
| let filter_expr = conjunction(filters.iter().cloned()); | ||
|
|
||
| let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session) | ||
| .with_parquet_options( | ||
| crate::table::file_format_options::to_table_parquet_options_from_ffo( | ||
| self.file_format_options.as_ref(), | ||
| ), | ||
| ) | ||
| .with_projection(projection) | ||
| .with_limit(limit) | ||
| .with_filter(filter_expr) | ||
|
|
@@ -776,6 +809,7 @@ pub struct DeltaTableProvider { | |
| config: DeltaScanConfig, | ||
| schema: Arc<Schema>, | ||
| files: Option<Vec<Add>>, | ||
| file_format_options: Option<FileFormatRef>, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, not needed anymore, since we can pass it through the DeltaScanConfig |
||
| } | ||
|
|
||
| impl DeltaTableProvider { | ||
|
|
@@ -791,9 +825,15 @@ impl DeltaTableProvider { | |
| log_store, | ||
| config, | ||
| files: None, | ||
| file_format_options: None, | ||
| }) | ||
| } | ||
|
|
||
| pub fn with_file_format_options(mut self, file_format_options: Option<FileFormatRef>) -> Self { | ||
| self.file_format_options = file_format_options; | ||
| self | ||
| } | ||
|
|
||
| /// Define which files to consider while building a scan, for advanced usecases | ||
| pub fn with_files(mut self, files: Vec<Add>) -> DeltaTableProvider { | ||
| self.files = Some(files); | ||
|
|
@@ -831,9 +871,17 @@ impl TableProvider for DeltaTableProvider { | |
| limit: Option<usize>, | ||
| ) -> Result<Arc<dyn ExecutionPlan>> { | ||
| register_store(self.log_store.clone(), session.runtime_env().clone()); | ||
| if let Some(format_options) = &self.file_format_options { | ||
| format_options.update_session(session)?; | ||
| } | ||
|
|
||
| let filter_expr = conjunction(filters.iter().cloned()); | ||
|
|
||
| let table_parquet_options = | ||
| to_table_parquet_options_from_ffo(self.file_format_options.as_ref()); | ||
|
|
||
| let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session) | ||
| .with_parquet_options(table_parquet_options) | ||
| .with_projection(projection) | ||
| .with_limit(limit) | ||
| .with_filter(filter_expr) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -144,6 +144,7 @@ impl std::future::IntoFuture for AddTableFeatureBuilder { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Ok(DeltaTable::new_with_state( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this.log_store, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| commit.snapshot(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| None, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Configuration options for delta table | |
| #[derive(Debug, Serialize, Deserialize, Clone, DeltaConfig)] | |
| #[serde(rename_all = "camelCase")] | |
| pub struct DeltaTableConfig { | |
| /// Indicates whether DeltaTable should track files. | |
| /// This defaults to `true` | |
| /// | |
| /// Some append-only applications might have no need of tracking any files. | |
| /// Hence, DeltaTable will be loaded with significant memory reduction. | |
| pub require_files: bool, | |
| /// Controls how many files to buffer from the commit log when updating the table. | |
| /// This defaults to 4 * number of cpus | |
| /// | |
| /// Setting a value greater than 1 results in concurrent calls to the storage api. | |
| /// This can decrease latency if there are many files in the log since the | |
| /// last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should | |
| /// also be considered for optimal performance. | |
| pub log_buffer_size: usize, | |
| /// Control the number of records to read / process from the commit / checkpoint files | |
| /// when processing record batches. | |
| pub log_batch_size: usize, | |
| #[serde(skip_serializing, skip_deserializing)] | |
| #[delta(skip)] | |
| /// When a runtime handler is provided, all IO tasks are spawn in that handle | |
| pub io_runtime: Option<IORuntime>, | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I think you are suggesting that I add file_format_options to the config member of DeltaTable.
In fact, that was the approach I tried initially but I had to back that out and add this as a direct member for the following reasons:
- The file format options don't really seem to fit properly into
DeltaTableConfigsince these options seem to be more about managing the logfile. - We need to preserve the formatting options when going from
DeltaTabletoDeltaOpsand back. Right now, the config gets lost whennew_with_stateis called. See below where theconfiggets reset to default:
delta-rs/crates/core/src/table/mod.rs
Line 128 in 18f949e
pub(crate) fn new_with_state(log_store: LogStoreRef, state: DeltaTableState) -> Self {
We need to preserve these settings throughout any chained operations. This means we still need an extra parameter innew_with_stateto preserve any existings config. Also, I felt it was cleaner to create a new entry and directly set and pass it. Possibly we could move this intoDeltaTableConfigbut I think this may be more of a hindrance than a help.
@adamreeve It was a couple of months ago that we moved this, do you remember anything else from our discussion? See commit below:
666f0ba
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Correct but they are related to loading the table, which file formats (encryption) should belong as well
- DeltaTableConfig tags along with the snapshot:
delta-rs/crates/core/src/table/state.rs
Lines 73 to 76 in a61ac16
/// Get the table config which is loaded with of the snapshot pub fn load_config(&self) -> &DeltaTableConfig { self.snapshot.load_config() }
so that shouldn't be an issue to allow it to stay there when you do new_with_state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I think it makes sense to move this here. I will investigate this week. The main issue is whether I have to support serialization out of the gate to make this work / at what points config gets reconstructed from serialized properties. The reason why is that serialization of the FileFormatOptions will be a bit tricky because:
- I'm not sure if
TableOptionsfully supports serialization. - For direct encryption and decryption properties, we will need to modify the serialization to make sure that passwords don't get serialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we remove this here. We can move all this logic into:
DeltaScanConfigBuilder.build(), there we can introspect the TableLoadConfig and set the parquetOptions on the DeltaScanConfig