diff --git a/crates/core/src/kernel/models/actions.rs b/crates/core/src/kernel/models/actions.rs index 328c211b01..4363cd1d50 100644 --- a/crates/core/src/kernel/models/actions.rs +++ b/crates/core/src/kernel/models/actions.rs @@ -970,7 +970,9 @@ pub struct CommitInfo { #[serde(skip_serializing_if = "Option::is_none")] pub isolation_level: Option, - /// TODO + /// A flag indicating if the commit is a blind append. + /// A blind append is a write operation with mode Append that does not include any Remove actions. + /// https://books.japila.pl/delta-lake-internals/OptimisticTransactionImpl/?h=blind+app#isBlindAppend #[serde(skip_serializing_if = "Option::is_none")] pub is_blind_append: Option, diff --git a/crates/core/src/kernel/transaction/conflict_checker.rs b/crates/core/src/kernel/transaction/conflict_checker.rs index 4a4868e92e..3619ccc909 100644 --- a/crates/core/src/kernel/transaction/conflict_checker.rs +++ b/crates/core/src/kernel/transaction/conflict_checker.rs @@ -513,6 +513,21 @@ impl<'a> ConflictChecker<'a> { fn check_for_deleted_files_against_current_txn_read_files( &self, ) -> Result<(), CommitConflictError> { + // For blind append writes, do not consider delete-read conflicts, since the + // transaction did not logically read any prior files. + let is_blind_append = self + .txn_info + .actions + .iter() + .find_map(|a| match a { + Action::CommitInfo(ci) => ci.is_blind_append, + _ => None, + }) + .unwrap_or(false); + if is_blind_append { + return Ok(()); + } + // Fail if files have been deleted that the txn read. let read_file_path: HashSet = self .txn_info diff --git a/crates/core/src/kernel/transaction/mod.rs b/crates/core/src/kernel/transaction/mod.rs index ccd011a496..6cd63c1ead 100644 --- a/crates/core/src/kernel/transaction/mod.rs +++ b/crates/core/src/kernel/transaction/mod.rs @@ -96,8 +96,8 @@ use crate::kernel::{Action, CommitInfo, EagerSnapshot, Metadata, Protocol, Trans use crate::logstore::ObjectStoreRef; use crate::logstore::{CommitOrBytes, LogStoreRef}; use crate::operations::CustomExecuteHandler; -use crate::protocol::DeltaOperation; use crate::protocol::{cleanup_expired_logs_for, create_checkpoint_for}; +use crate::protocol::{DeltaOperation, SaveMode}; use crate::table::config::TablePropertiesExt as _; use crate::table::state::DeltaTableState; use crate::{crate_version, DeltaResult}; @@ -307,6 +307,15 @@ impl CommitData { ) -> Self { if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) { let mut commit_info = operation.get_commit_info(); + // Determine if this commit is a blind append. A blind append is defined as a + // WRITE operation with mode Append that does not include any Remove actions. + let is_blind_append = matches!( + &operation, + DeltaOperation::Write { mode: SaveMode::Append, .. } + ) && !actions.iter().any(|a| matches!(a, Action::Remove(_))); + if is_blind_append { + commit_info.is_blind_append = Some(true); + } commit_info.timestamp = Some(Utc::now().timestamp_millis()); app_metadata.insert( "clientVersion".to_string(), diff --git a/crates/core/tests/commit_info_format.rs b/crates/core/tests/commit_info_format.rs index 3c450ec8c1..a9bc37d881 100644 --- a/crates/core/tests/commit_info_format.rs +++ b/crates/core/tests/commit_info_format.rs @@ -39,5 +39,8 @@ async fn test_commit_info_engine_info() -> Result<(), Box> { let engine_info = last_commit.engine_info.as_ref().unwrap(); assert_eq!(engine_info, &format!("delta-rs:{}", crate_version())); + // verify blind append is flagged for append writes with no removes + assert_eq!(last_commit.is_blind_append, Some(true)); + Ok(()) }