Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions crates/core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,18 @@ impl VacuumBuilder {
// If the file is not an expired tombstone and we have gotten to here with a
// VacuumMode::Full then it should be added to the deletion plan
if !expired_tombstones.contains(obj_meta.location.as_ref()) {
// For files without tombstones (uncommitted or orphaned files),
// check their physical age to protect recently written files from deletion.
// This prevents race conditions where a concurrent writer's uncommitted files
// could be deleted before the transaction is committed.
let file_age_millis = now_millis - obj_meta.last_modified.timestamp_millis();
if file_age_millis < retention_period.num_milliseconds() {
debug!(
"The file {:?} is not in the log but too recent , protecting from vacuum",
&obj_meta.location,
);
continue;
}
if self.mode == VacuumMode::Lite {
debug!("The file {:?} was not referenced in a log file, but VacuumMode::Lite means it will not be vacuumed", &obj_meta.location);
continue;
Expand Down Expand Up @@ -798,4 +810,95 @@ mod tests {
assert_eq!(result.files_deleted, empty);
Ok(())
}

/// Mock clock for testing time-dependent vacuum behavior
#[derive(Debug, Clone)]
struct MockClock {
timestamp_millis: i64,
}

impl MockClock {
fn new(timestamp_millis: i64) -> Self {
Self { timestamp_millis }
}
}

impl Clock for MockClock {
fn current_timestamp_millis(&self) -> i64 {
self.timestamp_millis
}
}

/// Test that recently written uncommitted files are protected from deletion in Full mode
/// This tests the fix for the race condition where concurrent writer's files could be deleted
#[tokio::test]
async fn test_vacuum_full_protects_recent_uncommitted_files() -> DeltaResult<()> {
use chrono::{DateTime, Utc};
use object_store::GetResultPayload;

let store = InMemory::new();
let source = LocalFileSystem::new_with_prefix("../test/tests/data/simple_table").unwrap();
let mut stream = source.list(None);

while let Some(Ok(entity)) = stream.next().await {
let mut contents = vec![];
match source.get(&entity.location).await.unwrap().payload {
GetResultPayload::File(mut fd, _path) => {
fd.read_to_end(&mut contents).unwrap();
}
_ => panic!("We should only be dealing in files!"),
}
let content = bytes::Bytes::from(contents);
store
.put(&entity.location, PutPayload::from_bytes(content))
.await
.unwrap();
}

// Add a "recently written" orphaned file that simulates an uncommitted file
let recent_file_path = object_store::path::Path::from("uncommitted-recent.parquet");
store
.put(
&recent_file_path,
PutPayload::from_bytes(bytes::Bytes::from("test data")),
)
.await
.unwrap();

let mut table = crate::DeltaTableBuilder::from_valid_uri("memory:///")
.unwrap()
.with_storage_backend(Arc::new(store), url::Url::parse("memory:///").unwrap())
.build()
.unwrap();
table.load().await.unwrap();

// Set current time to 10 days after epoch
let current_time = DateTime::from_timestamp(10 * 24 * 3600, 0)
.unwrap()
.timestamp_millis();
let mock_clock = Arc::new(MockClock::new(current_time));

// Run vacuum with 7-day retention in Full mode
// The recent file should NOT be deleted because it's too new
let (_table, result) = VacuumBuilder::new(
table.log_store(),
table.snapshot().unwrap().snapshot.clone(),
)
.with_retention_period(Duration::days(7))
.with_dry_run(true)
.with_mode(VacuumMode::Full)
.with_enforce_retention_duration(false)
.with_clock(mock_clock)
.await
.unwrap();

// The recent uncommitted file should NOT be in the deletion list
assert!(
!result.files_deleted.contains(&recent_file_path.to_string()),
"Recent uncommitted file should be protected from deletion, but found in deletion list: {:?}",
result.files_deleted
);

Ok(())
}
}
Loading