Skip to content

Commit 96c2408

Browse files
Add support for blind appends
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent 1cdec0d commit 96c2408

File tree

4 files changed

+31
-2
lines changed

4 files changed

+31
-2
lines changed

crates/core/src/kernel/models/actions.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,9 @@ pub struct CommitInfo {
970970
#[serde(skip_serializing_if = "Option::is_none")]
971971
pub isolation_level: Option<IsolationLevel>,
972972

973-
/// TODO
973+
/// A flag indicating if the commit is a blind append.
974+
/// A blind append is a write operation with mode Append that does not include any Remove actions.
975+
/// https://books.japila.pl/delta-lake-internals/OptimisticTransactionImpl/?h=blind+app#isBlindAppend
974976
#[serde(skip_serializing_if = "Option::is_none")]
975977
pub is_blind_append: Option<bool>,
976978

crates/core/src/kernel/transaction/conflict_checker.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,21 @@ impl<'a> ConflictChecker<'a> {
513513
fn check_for_deleted_files_against_current_txn_read_files(
514514
&self,
515515
) -> Result<(), CommitConflictError> {
516+
// For blind append writes, do not consider delete-read conflicts, since the
517+
// transaction did not logically read any prior files.
518+
let is_blind_append = self
519+
.txn_info
520+
.actions
521+
.iter()
522+
.find_map(|a| match a {
523+
Action::CommitInfo(ci) => ci.is_blind_append,
524+
_ => None,
525+
})
526+
.unwrap_or(false);
527+
if is_blind_append {
528+
return Ok(());
529+
}
530+
516531
// Fail if files have been deleted that the txn read.
517532
let read_file_path: HashSet<String> = self
518533
.txn_info

crates/core/src/kernel/transaction/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,8 @@ use crate::kernel::{Action, CommitInfo, EagerSnapshot, Metadata, Protocol, Trans
9696
use crate::logstore::ObjectStoreRef;
9797
use crate::logstore::{CommitOrBytes, LogStoreRef};
9898
use crate::operations::CustomExecuteHandler;
99-
use crate::protocol::DeltaOperation;
10099
use crate::protocol::{cleanup_expired_logs_for, create_checkpoint_for};
100+
use crate::protocol::{DeltaOperation, SaveMode};
101101
use crate::table::config::TablePropertiesExt as _;
102102
use crate::table::state::DeltaTableState;
103103
use crate::{crate_version, DeltaResult};
@@ -307,6 +307,15 @@ impl CommitData {
307307
) -> Self {
308308
if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
309309
let mut commit_info = operation.get_commit_info();
310+
// Determine if this commit is a blind append. A blind append is defined as a
311+
// WRITE operation with mode Append that does not include any Remove actions.
312+
let is_blind_append = matches!(
313+
&operation,
314+
DeltaOperation::Write { mode: SaveMode::Append, .. }
315+
) && !actions.iter().any(|a| matches!(a, Action::Remove(_)));
316+
if is_blind_append {
317+
commit_info.is_blind_append = Some(true);
318+
}
310319
commit_info.timestamp = Some(Utc::now().timestamp_millis());
311320
app_metadata.insert(
312321
"clientVersion".to_string(),

crates/core/tests/commit_info_format.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,8 @@ async fn test_commit_info_engine_info() -> Result<(), Box<dyn Error>> {
3939
let engine_info = last_commit.engine_info.as_ref().unwrap();
4040
assert_eq!(engine_info, &format!("delta-rs:{}", crate_version()));
4141

42+
// verify blind append is flagged for append writes with no removes
43+
assert_eq!(last_commit.is_blind_append, Some(true));
44+
4245
Ok(())
4346
}

0 commit comments

Comments
 (0)