From 070249a2760f0aadbc2da71bfeb7da277ca17637 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 16 Oct 2025 13:04:56 +0000 Subject: [PATCH] feat: allow RecordBatchWriter to pass through pass-through-commit-properties I have seen some production use-cases which need to disable log cleanup or other per-commit tweaks. Signed-off-by: R. Tyler Croy --- crates/core/src/kernel/transaction/mod.rs | 9 ++++ crates/core/src/writer/mod.rs | 63 ++++++++++++++++++++--- crates/core/src/writer/record_batch.rs | 16 +++++- 3 files changed, 81 insertions(+), 7 deletions(-) diff --git a/crates/core/src/kernel/transaction/mod.rs b/crates/core/src/kernel/transaction/mod.rs index e2b963bbed..5b5654e74f 100644 --- a/crates/core/src/kernel/transaction/mod.rs +++ b/crates/core/src/kernel/transaction/mod.rs @@ -880,6 +880,15 @@ pub struct FinalizedCommit { pub metrics: Metrics, } +impl std::fmt::Debug for FinalizedCommit { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FinalizedCommit") + .field("version", &self.version) + .field("metrics", &self.metrics) + .finish() + } +} + impl FinalizedCommit { /// The new table state after a commit pub fn snapshot(&self) -> DeltaTableState { diff --git a/crates/core/src/writer/mod.rs b/crates/core/src/writer/mod.rs index 156e43d4b6..5d82749aa1 100644 --- a/crates/core/src/writer/mod.rs +++ b/crates/core/src/writer/mod.rs @@ -5,6 +5,7 @@ use async_trait::async_trait; use object_store::Error as ObjectStoreError; use parquet::errors::ParquetError; use serde_json::Value; +use tracing::log::*; use crate::errors::DeltaTableError; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; @@ -152,7 +153,7 @@ pub trait DeltaWriter { /// and commit the changes to the Delta log, creating a new table version. async fn flush_and_commit(&mut self, table: &mut DeltaTable) -> Result { let adds: Vec<_> = self.flush().await?.drain(..).map(Action::Add).collect(); - flush_and_commit(adds, table).await + flush_and_commit(adds, table, None).await } } @@ -160,6 +161,7 @@ pub trait DeltaWriter { pub(crate) async fn flush_and_commit( adds: Vec, table: &mut DeltaTable, + commit_properties: Option, ) -> Result { let snapshot = table.snapshot()?; let partition_cols = snapshot.metadata().partition_columns().clone(); @@ -174,11 +176,60 @@ pub(crate) async fn flush_and_commit( predicate: None, }; - let version = CommitBuilder::from(CommitProperties::default()) + let finalized = CommitBuilder::from(commit_properties.unwrap_or_default()) .with_actions(adds) .build(Some(snapshot), table.log_store.clone(), operation) - .await? - .version(); - table.update().await?; - Ok(version) + .await?; + table.state = Some(finalized.snapshot()); + Ok(finalized.version()) +} + +#[cfg(test)] +mod tests { + use delta_kernel::schema::DataType; + + use super::*; + use crate::{DeltaOps, DeltaResult}; + use pretty_assertions::assert_ne; + + /// This test doesn't have a great way to _validate_ that logs are not cleaned up as part of + /// the second commit. + /// + /// Instead I just added some prints in the [PostCommit] logic to validate that the property + /// was getting pulled through correctly in the non-default case. + /// + /// The _ideal_ testing scenario would probably be to propagate metrics out of + /// [flush_and_commit] but that's an API change we isn't desirable at the moment + #[tokio::test] + async fn test_flush_and_commit() -> DeltaResult<()> { + let mut table = DeltaOps::new_in_memory() + .create() + .with_table_name("my_table") + .with_column( + "id", + DataType::Primitive(delta_kernel::schema::PrimitiveType::Long), + true, + None, + ) + .with_configuration_property( + crate::TableProperty::LogRetentionDuration, + Some("interval 0 days"), + ) + .await?; + + let add = Add::default(); + let actions = vec![Action::Add(add)]; + let first_version = flush_and_commit(actions, &mut table, None).await?; + + let add = Add::default(); + let actions = vec![Action::Add(add)]; + + let properties = CommitProperties::default().with_cleanup_expired_logs(Some(false)); + let second_version = flush_and_commit(actions, &mut table, Some(properties)).await?; + assert_ne!( + second_version, first_version, + "flush_and_commit did not create a version apparently?" + ); + Ok(()) + } } diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index cb82778fc6..4e49dbd7dc 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -31,6 +31,7 @@ use super::utils::{ use super::{DeltaWriter, DeltaWriterError, WriteMode}; use crate::errors::DeltaTableError; use crate::kernel::schema::merge_arrow_schema; +use crate::kernel::transaction::CommitProperties; use crate::kernel::MetadataExt as _; use crate::kernel::{scalars::ScalarExt, Action, Add, PartitionsExt}; use crate::logstore::ObjectStoreRetryExt; @@ -49,6 +50,7 @@ pub struct RecordBatchWriter { arrow_writers: HashMap, num_indexed_cols: DataSkippingNumIndexedCols, stats_columns: Option>, + commit_properties: Option, } impl std::fmt::Debug for RecordBatchWriter { @@ -103,9 +105,20 @@ impl RecordBatchWriter { stats_columns: configuration .get("delta.dataSkippingStatsColumns") .map(|v| v.split(',').map(|s| s.to_string()).collect()), + commit_properties: None, }) } + /// Add the [CommitProperties] to the [RecordBatchWriter] to be used when the writer flushes + /// the write into storage. + /// + /// This can be useful for situations where slight modifications to the commit behavior are + /// required. + pub fn with_commit_properties(mut self, properties: CommitProperties) -> Self { + self.commit_properties = Some(properties); + self + } + /// Creates a [`RecordBatchWriter`] to write data to provided Delta Table pub fn for_table(table: &DeltaTable) -> Result { // Initialize an arrow schema ref from the delta table schema @@ -142,6 +155,7 @@ impl RecordBatchWriter { stats_columns: configuration .get("delta.dataSkippingStatsColumns") .map(|v| v.split(',').map(|s| s.to_string()).collect()), + commit_properties: None, }) } @@ -306,7 +320,7 @@ impl DeltaWriter for RecordBatchWriter { let metadata = current_meta.with_schema(&schema)?; adds.push(Action::Metadata(metadata)); } - super::flush_and_commit(adds, table).await + super::flush_and_commit(adds, table, self.commit_properties.clone()).await } }