Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object_store = { version = "0.12.1" }
parquet = { version = "56.0.0" }

# datafusion
datafusion = "50.0.0"
datafusion = { version = "50.0.0" , features = ["parquet_encryption", "default"]}
datafusion-ffi = "50.0.0"
datafusion-proto = "50.0.0"

Expand All @@ -69,7 +69,7 @@ percent-encoding-rfc3986 = { version = "0.1.3" }
uuid = { version = "1" }

# runtime / async
async-trait = { version = "0.1" }
async-trait = { version = "0.1.89" }
futures = { version = "0.3" }
tokio = { version = "1" }
num_cpus = { version = "1" }
Expand Down
5 changes: 5 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ datatest-stable = "0.2"
deltalake-test = { path = "../test" }
dotenvy = "0"
fs_extra = "1.2.0"
parquet-key-management = { version = "0.4.0", features = ["_test_utils", "datafusion"] }
pretty_assertions = "1.2.1"
pretty_env_logger = "0.5.0"
rstest = { version = "0.26.1" }
Expand Down Expand Up @@ -143,6 +144,10 @@ required-features = ["datafusion"]
name = "command_vacuum"
required-features = ["datafusion"]

[[test]]
name = "commands_with_encryption"
required-features = ["datafusion"]

[[test]]
name = "commit_info_format"
required-features = ["datafusion"]
Expand Down
16 changes: 14 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub mod planner;
mod schema_adapter;
mod table_provider;

use crate::table::file_format_options::{to_table_parquet_options_from_ffo, FileFormatRef};
pub use cdf::scan::DeltaCdfTableProvider;
pub(crate) use table_provider::DeltaScanBuilder;
pub use table_provider::{DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider};
Expand Down Expand Up @@ -940,6 +941,7 @@ pub(crate) async fn find_files_scan(
snapshot: &DeltaTableState,
log_store: LogStoreRef,
state: &SessionState,
file_format_options: Option<&FileFormatRef>,
expression: Expr,
) -> DeltaResult<Vec<Add>> {
let candidate_map: HashMap<String, Add> = snapshot
Expand All @@ -965,10 +967,13 @@ pub(crate) async fn find_files_scan(
// Add path column
used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);

let table_parquet_options = to_table_parquet_options_from_ffo(file_format_options);

let scan = DeltaScanBuilder::new(snapshot, log_store, state)
.with_filter(Some(expression.clone()))
.with_projection(Some(&used_columns))
.with_scan_config(scan_config)
.with_parquet_options(table_parquet_options)
.build()
.await?;
let scan = Arc::new(scan);
Expand Down Expand Up @@ -1055,6 +1060,7 @@ pub async fn find_files(
snapshot: &DeltaTableState,
log_store: LogStoreRef,
state: &SessionState,
file_format_options: Option<&FileFormatRef>,
predicate: Option<Expr>,
) -> DeltaResult<FindFiles> {
let current_metadata = snapshot.metadata();
Expand All @@ -1078,8 +1084,14 @@ pub async fn find_files(
partition_scan: true,
})
} else {
let candidates =
find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?;
let candidates = find_files_scan(
snapshot,
log_store,
state,
file_format_options,
predicate.to_owned(),
)
.await?;

Ok(FindFiles {
candidates,
Expand Down
64 changes: 56 additions & 8 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use crate::kernel::{Action, Add, Remove};
use crate::operations::write::writer::{DeltaWriter, WriterConfig};
use crate::operations::write::WriterStatsConfig;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::table::file_format_options::{to_table_parquet_options_from_ffo, FileFormatRef};
use crate::{ensure_table_uri, DeltaTable};
use crate::{logstore::LogStoreRef, table::state::DeltaTableState, DeltaResult, DeltaTableError};
use delta_kernel::table_properties::DataSkippingNumIndexedCols;
Expand Down Expand Up @@ -107,7 +108,7 @@ impl DeltaDataSink {
log_store: LogStoreRef,
snapshot: DeltaTableState,
save_mode: SaveMode,
session_state: Arc<SessionState>,
_session_state: Arc<SessionState>,
) -> datafusion::common::Result<Self> {
let schema = snapshot
.arrow_schema()
Expand Down Expand Up @@ -405,6 +406,7 @@ pub(crate) struct DeltaScanBuilder<'a> {
limit: Option<usize>,
files: Option<&'a [Add]>,
config: Option<DeltaScanConfig>,
parquet_options: Option<TableParquetOptions>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we remove this here. We can move all this logic into:

DeltaScanConfigBuilder.build(), there we can introspect the TableLoadConfig and set the parquetOptions on the DeltaScanConfig

}

impl<'a> DeltaScanBuilder<'a> {
Expand All @@ -422,9 +424,15 @@ impl<'a> DeltaScanBuilder<'a> {
limit: None,
files: None,
config: None,
parquet_options: None,
}
}

pub fn with_parquet_options(mut self, parquet_options: Option<TableParquetOptions>) -> Self {
self.parquet_options = parquet_options;
self
}

pub fn with_filter(mut self, filter: Option<Expr>) -> Self {
self.filter = filter;
self
Expand Down Expand Up @@ -657,13 +665,30 @@ impl<'a> DeltaScanBuilder<'a> {

let stats = stats.unwrap_or(Statistics::new_unknown(&schema));

let parquet_options = TableParquetOptions {
global: self.session.config().options().execution.parquet.clone(),
..Default::default()
};
let parquet_options = self
.parquet_options
.unwrap_or_else(|| self.session.table_options().parquet.clone());

// We have to set the encryption factory on the ParquetSource based on the Parquet options,
// as this is usually handled by the ParquetFormat type in DataFusion,
// which is not used in delta-rs.
let encryption_factory = parquet_options
.crypto
.factory_id
.as_ref()
.map(|factory_id| {
self.session
.runtime_env()
.parquet_encryption_factory(factory_id)
})
.transpose()?;

let mut file_source = ParquetSource::new(parquet_options);

if let Some(encryption_factory) = encryption_factory {
file_source = file_source.with_encryption_factory(encryption_factory);
}

// Sometimes (i.e Merge) we want to prune files that don't make the
// filter and read the entire contents for files that do match the
// filter
Expand Down Expand Up @@ -743,9 +768,17 @@ impl TableProvider for DeltaTable {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
register_store(self.log_store(), session.runtime_env().clone());
if let Some(format_options) = &self.file_format_options {
format_options.update_session(session)?;
}
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session)
.with_parquet_options(
crate::table::file_format_options::to_table_parquet_options_from_ffo(
self.file_format_options.as_ref(),
),
)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand Down Expand Up @@ -776,6 +809,7 @@ pub struct DeltaTableProvider {
config: DeltaScanConfig,
schema: Arc<Schema>,
files: Option<Vec<Add>>,
file_format_options: Option<FileFormatRef>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, not needed anymore, since we can pass it through the DeltaScanConfig

}

impl DeltaTableProvider {
Expand All @@ -791,9 +825,15 @@ impl DeltaTableProvider {
log_store,
config,
files: None,
file_format_options: None,
})
}

pub fn with_file_format_options(mut self, file_format_options: Option<FileFormatRef>) -> Self {
self.file_format_options = file_format_options;
self
}

/// Define which files to consider while building a scan, for advanced usecases
pub fn with_files(mut self, files: Vec<Add>) -> DeltaTableProvider {
self.files = Some(files);
Expand Down Expand Up @@ -831,9 +871,17 @@ impl TableProvider for DeltaTableProvider {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
register_store(self.log_store.clone(), session.runtime_env().clone());
if let Some(format_options) = &self.file_format_options {
format_options.update_session(session)?;
}

let filter_expr = conjunction(filters.iter().cloned());

let table_parquet_options =
to_table_parquet_options_from_ffo(self.file_format_options.as_ref());

let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
.with_parquet_options(table_parquet_options)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand Down Expand Up @@ -875,9 +923,9 @@ impl TableProvider for DeltaTableProvider {
InsertOp::Append => SaveMode::Append,
InsertOp::Overwrite => SaveMode::Overwrite,
InsertOp::Replace => {
return Err(DataFusionError::Plan(format!(
"Replace operation is not supported for DeltaTableProvider"
)))
return Err(DataFusionError::Plan(
"Replace operation is not supported for DeltaTableProvider".to_string(),
))
}
};

Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/add_column.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! Add a new column to a table

use std::sync::Arc;

use delta_kernel::schema::StructType;
use futures::future::BoxFuture;
use itertools::Itertools;
use std::sync::Arc;

use super::{CustomExecuteHandler, Operation};
use crate::kernel::schema::merge_delta_struct;
Expand Down Expand Up @@ -130,6 +129,7 @@ impl std::future::IntoFuture for AddColumnBuilder {
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
None,
))
})
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/add_feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not change the function signature here and in the other builders

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you be more specific about what you are looking for? Checking the code base, I see 25 calls to this constructor, and in 10/25 of the cases, I need to pass file_format_options to maintain the needed settings. I guess I could eliminate the 15 cases where I pass None by splitting this into two named constructors...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to use DeltaTableConfig for this:

/// Configuration options for delta table
#[derive(Debug, Serialize, Deserialize, Clone, DeltaConfig)]
#[serde(rename_all = "camelCase")]
pub struct DeltaTableConfig {
/// Indicates whether DeltaTable should track files.
/// This defaults to `true`
///
/// Some append-only applications might have no need of tracking any files.
/// Hence, DeltaTable will be loaded with significant memory reduction.
pub require_files: bool,
/// Controls how many files to buffer from the commit log when updating the table.
/// This defaults to 4 * number of cpus
///
/// Setting a value greater than 1 results in concurrent calls to the storage api.
/// This can decrease latency if there are many files in the log since the
/// last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should
/// also be considered for optimal performance.
pub log_buffer_size: usize,
/// Control the number of records to read / process from the commit / checkpoint files
/// when processing record batches.
pub log_batch_size: usize,
#[serde(skip_serializing, skip_deserializing)]
#[delta(skip)]
/// When a runtime handler is provided, all IO tasks are spawn in that handle
pub io_runtime: Option<IORuntime>,
}

Copy link
Contributor Author

@corwinjoy corwinjoy Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I think you are suggesting that I add file_format_options to the config member of DeltaTable.
In fact, that was the approach I tried initially but I had to back that out and add this as a direct member for the following reasons:

  1. The file format options don't really seem to fit properly into DeltaTableConfig since these options seem to be more about managing the logfile.
  2. We need to preserve the formatting options when going from DeltaTable to DeltaOps and back. Right now, the config gets lost when new_with_state is called. See below where the config gets reset to default:
    pub(crate) fn new_with_state(log_store: LogStoreRef, state: DeltaTableState) -> Self {

    We need to preserve these settings throughout any chained operations. This means we still need an extra parameter in new_with_state to preserve any existings config. Also, I felt it was cleaner to create a new entry and directly set and pass it. Possibly we could move this into DeltaTableConfig but I think this may be more of a hindrance than a help.

@adamreeve It was a couple of months ago that we moved this, do you remember anything else from our discussion? See commit below:
666f0ba

Copy link
Collaborator

@ion-elgreco ion-elgreco Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Correct but they are related to loading the table, which file formats (encryption) should belong as well
  2. DeltaTableConfig tags along with the snapshot:
    /// Get the table config which is loaded with of the snapshot
    pub fn load_config(&self) -> &DeltaTableConfig {
    self.snapshot.load_config()
    }

so that shouldn't be an issue to allow it to stay there when you do new_with_state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I think it makes sense to move this here. I will investigate this week. The main issue is whether I have to support serialization out of the gate to make this work / at what points config gets reconstructed from serialized properties. The reason why is that serialization of the FileFormatOptions will be a bit tricky because:

  1. I'm not sure if TableOptions fully supports serialization.
  2. For direct encryption and decryption properties, we will need to modify the serialization to make sure that passwords don't get serialized.

))
})
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
None,
))
})
}
Expand Down
20 changes: 16 additions & 4 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
//! Command for creating a new delta table
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala

use std::collections::HashMap;
use std::sync::Arc;

use delta_kernel::schema::MetadataValue;
use futures::future::BoxFuture;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::log::*;
use uuid::Uuid;

Expand All @@ -21,6 +20,7 @@ use crate::logstore::LogStoreRef;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::table::builder::ensure_table_uri;
use crate::table::config::TableProperty;
use crate::table::file_format_options::FileFormatRef;
use crate::{DeltaTable, DeltaTableBuilder};

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -61,6 +61,7 @@ pub struct CreateBuilder {
storage_options: Option<HashMap<String, String>>,
actions: Vec<Action>,
log_store: Option<LogStoreRef>,
file_format_options: Option<FileFormatRef>,
configuration: HashMap<String, Option<String>>,
/// Additional information to add to the commit
commit_properties: CommitProperties,
Expand Down Expand Up @@ -98,6 +99,7 @@ impl CreateBuilder {
storage_options: None,
actions: Default::default(),
log_store: None,
file_format_options: None,
configuration: Default::default(),
commit_properties: CommitProperties::default(),
raise_if_key_not_exists: true,
Expand Down Expand Up @@ -238,6 +240,12 @@ impl CreateBuilder {
self
}

// Set format options for underlying table files
pub fn with_file_format_options(mut self, file_format_options: FileFormatRef) -> Self {
self.file_format_options = Some(file_format_options);
self
}

/// Set a custom execute handler, for pre and post execution
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
self.custom_execute_handler = Some(handler);
Expand All @@ -262,7 +270,11 @@ impl CreateBuilder {
let (storage_url, table) = if let Some(log_store) = self.log_store {
(
ensure_table_uri(log_store.root_uri())?.as_str().to_string(),
DeltaTable::new(log_store, Default::default()),
DeltaTable::new(
log_store,
Default::default(),
self.file_format_options.clone(),
),
)
} else {
let storage_url =
Expand Down
Loading
Loading