Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,9 @@ pub struct CommitInfo {
#[serde(skip_serializing_if = "Option::is_none")]
pub isolation_level: Option<IsolationLevel>,

/// TODO
/// A flag indicating if the commit is a blind append.
/// A blind append is a write operation with mode Append that does not include any Remove actions.
/// https://books.japila.pl/delta-lake-internals/OptimisticTransactionImpl/?h=blind+app#isBlindAppend
#[serde(skip_serializing_if = "Option::is_none")]
pub is_blind_append: Option<bool>,

Expand Down
15 changes: 15 additions & 0 deletions crates/core/src/kernel/transaction/conflict_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,21 @@ impl<'a> ConflictChecker<'a> {
fn check_for_deleted_files_against_current_txn_read_files(
&self,
) -> Result<(), CommitConflictError> {
// For blind append writes, do not consider delete-read conflicts, since the
// transaction did not logically read any prior files.
let is_blind_append = self
.txn_info
.actions
.iter()
.find_map(|a| match a {
Action::CommitInfo(ci) => ci.is_blind_append,
_ => None,
})
.unwrap_or(false);
if is_blind_append {
return Ok(());
}

// Fail if files have been deleted that the txn read.
let read_file_path: HashSet<String> = self
.txn_info
Expand Down
11 changes: 10 additions & 1 deletion crates/core/src/kernel/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ use crate::kernel::{Action, CommitInfo, EagerSnapshot, Metadata, Protocol, Trans
use crate::logstore::ObjectStoreRef;
use crate::logstore::{CommitOrBytes, LogStoreRef};
use crate::operations::CustomExecuteHandler;
use crate::protocol::DeltaOperation;
use crate::protocol::{cleanup_expired_logs_for, create_checkpoint_for};
use crate::protocol::{DeltaOperation, SaveMode};
use crate::table::config::TablePropertiesExt as _;
use crate::table::state::DeltaTableState;
use crate::{crate_version, DeltaResult};
Expand Down Expand Up @@ -307,6 +307,15 @@ impl CommitData {
) -> Self {
if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
let mut commit_info = operation.get_commit_info();
// Determine if this commit is a blind append. A blind append is defined as a
// WRITE operation with mode Append that does not include any Remove actions.
let is_blind_append = matches!(
&operation,
DeltaOperation::Write { mode: SaveMode::Append, .. }
) && !actions.iter().any(|a| matches!(a, Action::Remove(_)));
if is_blind_append {
commit_info.is_blind_append = Some(true);
}
commit_info.timestamp = Some(Utc::now().timestamp_millis());
app_metadata.insert(
"clientVersion".to_string(),
Expand Down
3 changes: 3 additions & 0 deletions crates/core/tests/commit_info_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,8 @@ async fn test_commit_info_engine_info() -> Result<(), Box<dyn Error>> {
let engine_info = last_commit.engine_info.as_ref().unwrap();
assert_eq!(engine_info, &format!("delta-rs:{}", crate_version()));

// verify blind append is flagged for append writes with no removes
assert_eq!(last_commit.is_blind_append, Some(true));

Ok(())
}
Loading