Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object_store = { version = "0.12.1" }
parquet = { version = "56.0.0" }

# datafusion
datafusion = "50.0.0"
datafusion = { version = "50.0.0" , features = ["parquet_encryption", "default"]}
datafusion-ffi = "50.0.0"
datafusion-proto = "50.0.0"

Expand All @@ -70,7 +70,7 @@ tempfile = { version = "3" }
uuid = { version = "1" }

# runtime / async
async-trait = { version = "0.1" }
async-trait = { version = "0.1.89" }
futures = { version = "0.3" }
tokio = { version = "1" }

Expand Down
8 changes: 7 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ arrow-ord = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true, features = ["serde"] }
arrow-select = { workspace = true }
parquet = { workspace = true, features = ["async", "object_store"] }
parquet = { workspace = true, features = ["async", "object_store", "encryption"] }
object_store = { workspace = true }

# datafusion
Expand Down Expand Up @@ -94,6 +94,8 @@ datatest-stable = "0.3"
deltalake-test = { path = "../test" }
dotenvy = "0"
fs_extra = "1.2.0"
parquet-key-management = { version = "0.4.0", features = ["_test_utils", "datafusion"] }
paste = "1"
pretty_assertions = "1.2.1"
pretty_env_logger = "0.5.0"
rstest = { version = "0.26.1" }
Expand Down Expand Up @@ -145,6 +147,10 @@ required-features = ["datafusion"]
name = "command_vacuum"
required-features = ["datafusion"]

[[test]]
name = "commands_with_encryption"
required-features = ["datafusion"]

[[test]]
name = "commit_info_format"
required-features = ["datafusion"]
Expand Down
16 changes: 14 additions & 2 deletions crates/core/src/delta_datafusion/find_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::delta_datafusion::{
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Add, EagerSnapshot};
use crate::logstore::LogStoreRef;
use crate::table::file_format_options::{to_table_parquet_options_from_ffo, FileFormatRef};

#[derive(Debug, Hash, Eq, PartialEq)]
/// Representing the result of the [find_files] function.
Expand All @@ -45,6 +46,7 @@ pub(crate) async fn find_files(
snapshot: &EagerSnapshot,
log_store: LogStoreRef,
session: &dyn Session,
file_format_options: Option<&FileFormatRef>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed anymore, since you can access the snapshot.load_config() to access the file_format_options

predicate: Option<Expr>,
) -> DeltaResult<FindFiles> {
let current_metadata = snapshot.metadata();
Expand All @@ -71,8 +73,14 @@ pub(crate) async fn find_files(
Span::current().record("candidate_count", result.candidates.len());
Ok(result)
} else {
let candidates =
find_files_scan(snapshot, log_store, session, predicate.to_owned()).await?;
let candidates = find_files_scan(
snapshot,
log_store,
session,
file_format_options,
predicate.to_owned(),
)
.await?;

let result = FindFiles {
candidates,
Expand Down Expand Up @@ -221,6 +229,7 @@ async fn find_files_scan(
snapshot: &EagerSnapshot,
log_store: LogStoreRef,
session: &dyn Session,
file_format_options: Option<&FileFormatRef>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here it's already available in the snapshot. so we can defer at latest stage to grab these from the load_config

expression: Expr,
) -> DeltaResult<Vec<Add>> {
let candidate_map: HashMap<String, Add> = snapshot
Expand Down Expand Up @@ -250,10 +259,13 @@ async fn find_files_scan(
// Add path column
used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);

let table_parquet_options = to_table_parquet_options_from_ffo(file_format_options);
Copy link
Collaborator

@ion-elgreco ion-elgreco Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer we Impl Into here between these two structs.


let scan = DeltaScanBuilder::new(snapshot, log_store, session)
.with_filter(Some(expression.clone()))
.with_projection(Some(&used_columns))
.with_scan_config(scan_config)
.with_parquet_options(table_parquet_options)
.build()
.await?;
let scan = Arc::new(scan);
Expand Down
56 changes: 52 additions & 4 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::kernel::{Action, Add, EagerSnapshot, Remove};
use crate::operations::write::writer::{DeltaWriter, WriterConfig};
use crate::operations::write::WriterStatsConfig;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::table::file_format_options::{to_table_parquet_options_from_ffo, FileFormatRef};
use crate::{ensure_table_uri, DeltaTable};
use crate::{logstore::LogStoreRef, DeltaResult, DeltaTableError};

Expand Down Expand Up @@ -373,6 +374,7 @@ pub(crate) struct DeltaScanBuilder<'a> {
limit: Option<usize>,
files: Option<&'a [Add]>,
config: Option<DeltaScanConfig>,
parquet_options: Option<TableParquetOptions>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we remove this here. We can move all this logic into:

DeltaScanConfigBuilder.build(), there we can introspect the TableLoadConfig and set the parquetOptions on the DeltaScanConfig

}

impl<'a> DeltaScanBuilder<'a> {
Expand All @@ -390,9 +392,15 @@ impl<'a> DeltaScanBuilder<'a> {
limit: None,
files: None,
config: None,
parquet_options: None,
}
}

pub fn with_parquet_options(mut self, parquet_options: Option<TableParquetOptions>) -> Self {
self.parquet_options = parquet_options;
self
}

pub fn with_filter(mut self, filter: Option<Expr>) -> Self {
self.filter = filter;
self
Expand Down Expand Up @@ -643,13 +651,30 @@ impl<'a> DeltaScanBuilder<'a> {

let stats = stats.unwrap_or(Statistics::new_unknown(&schema));

let parquet_options = TableParquetOptions {
global: self.session.config().options().execution.parquet.clone(),
..Default::default()
};
let parquet_options = self
.parquet_options
.unwrap_or_else(|| self.session.table_options().parquet.clone());

// We have to set the encryption factory on the ParquetSource based on the Parquet options,
// as this is usually handled by the ParquetFormat type in DataFusion,
// which is not used in delta-rs.
let encryption_factory = parquet_options
.crypto
.factory_id
.as_ref()
.map(|factory_id| {
self.session
.runtime_env()
.parquet_encryption_factory(factory_id)
})
.transpose()?;

let mut file_source = ParquetSource::new(parquet_options);

if let Some(encryption_factory) = encryption_factory {
file_source = file_source.with_encryption_factory(encryption_factory);
}

// Sometimes (i.e Merge) we want to prune files that don't make the
// filter and read the entire contents for files that do match the
// filter
Expand Down Expand Up @@ -730,9 +755,17 @@ impl TableProvider for DeltaTable {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
register_store(self.log_store(), session.runtime_env().as_ref());
if let Some(format_options) = &self.config.file_format_options {
format_options.update_session(session)?;
}
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(self.snapshot()?.snapshot(), self.log_store(), session)
.with_parquet_options(
crate::table::file_format_options::to_table_parquet_options_from_ffo(
self.config.file_format_options.as_ref(),
),
)
Comment on lines +758 to +768
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed since this will happen inside the DeltaScanBuilder with my suggestion above

.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand Down Expand Up @@ -763,6 +796,7 @@ pub struct DeltaTableProvider {
config: DeltaScanConfig,
schema: Arc<Schema>,
files: Option<Vec<Add>>,
file_format_options: Option<FileFormatRef>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, not needed anymore, since we can pass it through the DeltaScanConfig

}

impl DeltaTableProvider {
Expand All @@ -778,9 +812,15 @@ impl DeltaTableProvider {
log_store,
config,
files: None,
file_format_options: None,
})
}

pub fn with_file_format_options(mut self, file_format_options: Option<FileFormatRef>) -> Self {
self.file_format_options = file_format_options;
self
}

/// Define which files to consider while building a scan, for advanced usecases
pub fn with_files(mut self, files: Vec<Add>) -> DeltaTableProvider {
self.files = Some(files);
Expand Down Expand Up @@ -818,9 +858,17 @@ impl TableProvider for DeltaTableProvider {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
register_store(self.log_store.clone(), session.runtime_env().as_ref());
if let Some(format_options) = &self.file_format_options {
format_options.update_session(session)?;
}

let filter_expr = conjunction(filters.iter().cloned());

let table_parquet_options =
to_table_parquet_options_from_ffo(self.file_format_options.as_ref());

let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
.with_parquet_options(table_parquet_options)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/operations/add_column.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! Add a new column to a table

use std::sync::Arc;

use delta_kernel::schema::StructType;
use futures::future::BoxFuture;
use itertools::Itertools;
use std::sync::Arc;

use super::{CustomExecuteHandler, Operation};
use crate::kernel::schema::merge_delta_struct;
Expand Down
18 changes: 13 additions & 5 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
//! Command for creating a new delta table
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala

use std::collections::HashMap;
use std::sync::Arc;

use delta_kernel::schema::MetadataValue;
use futures::future::BoxFuture;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::log::*;
use uuid::Uuid;

Expand All @@ -21,7 +20,7 @@ use crate::logstore::LogStoreRef;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::table::builder::ensure_table_uri;
use crate::table::config::TableProperty;
use crate::{DeltaTable, DeltaTableBuilder};
use crate::{DeltaTable, DeltaTableBuilder, DeltaTableConfig};

#[derive(thiserror::Error, Debug)]
enum CreateError {
Expand Down Expand Up @@ -61,6 +60,7 @@ pub struct CreateBuilder {
storage_options: Option<HashMap<String, String>>,
actions: Vec<Action>,
log_store: Option<LogStoreRef>,
table_config: DeltaTableConfig,
configuration: HashMap<String, Option<String>>,
/// Additional information to add to the commit
commit_properties: CommitProperties,
Expand Down Expand Up @@ -98,6 +98,7 @@ impl CreateBuilder {
storage_options: None,
actions: Default::default(),
log_store: None,
table_config: DeltaTableConfig::default(),
configuration: Default::default(),
commit_properties: CommitProperties::default(),
raise_if_key_not_exists: true,
Expand Down Expand Up @@ -238,6 +239,12 @@ impl CreateBuilder {
self
}

/// Set configuration options for the table
pub fn with_table_config(mut self, table_config: DeltaTableConfig) -> Self {
self.table_config = table_config;
self
}

/// Set a custom execute handler, for pre and post execution
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
self.custom_execute_handler = Some(handler);
Expand All @@ -262,14 +269,15 @@ impl CreateBuilder {
let (storage_url, table) = if let Some(log_store) = self.log_store {
(
ensure_table_uri(log_store.root_uri())?.as_str().to_string(),
DeltaTable::new(log_store, Default::default()),
DeltaTable::new(log_store, self.table_config.clone()),
)
} else {
let storage_url =
ensure_table_uri(self.location.clone().ok_or(CreateError::MissingLocation)?)?;
(
storage_url.as_str().to_string(),
DeltaTableBuilder::from_uri(storage_url)?
.with_table_config(self.table_config.clone())
.with_storage_options(self.storage_options.clone().unwrap_or_default())
.build()?,
)
Expand Down
Loading
Loading