From 838d189aae5bcbb81cfca300af8e2f401b33dc2b Mon Sep 17 00:00:00 2001 From: Manish Sogiyawar Date: Fri, 10 Oct 2025 10:06:34 +0100 Subject: [PATCH] fix: Protect recently written uncommitted files in vacuum Full mode This change prevents race conditions where concurrent writer's uncommitted files could be deleted before the transaction is committed. Now files that are not referenced in the log but are younger than the retention period will be protected from deletion during vacuum operations in Full mode. Added test to verify the behavior of protecting recent uncommitted files. Signed-off-by: Manish Sogiyawar --- crates/core/src/operations/vacuum.rs | 103 +++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/crates/core/src/operations/vacuum.rs b/crates/core/src/operations/vacuum.rs index c977824a1a..4920edf7f8 100644 --- a/crates/core/src/operations/vacuum.rs +++ b/crates/core/src/operations/vacuum.rs @@ -316,6 +316,18 @@ impl VacuumBuilder { // If the file is not an expired tombstone and we have gotten to here with a // VacuumMode::Full then it should be added to the deletion plan if !expired_tombstones.contains(obj_meta.location.as_ref()) { + // For files without tombstones (uncommitted or orphaned files), + // check their physical age to protect recently written files from deletion. + // This prevents race conditions where a concurrent writer's uncommitted files + // could be deleted before the transaction is committed. + let file_age_millis = now_millis - obj_meta.last_modified.timestamp_millis(); + if file_age_millis < retention_period.num_milliseconds() { + debug!( + "The file {:?} is not in the log but too recent , protecting from vacuum", + &obj_meta.location, + ); + continue; + } if self.mode == VacuumMode::Lite { debug!("The file {:?} was not referenced in a log file, but VacuumMode::Lite means it will not be vacuumed", &obj_meta.location); continue; @@ -798,4 +810,95 @@ mod tests { assert_eq!(result.files_deleted, empty); Ok(()) } + + /// Mock clock for testing time-dependent vacuum behavior + #[derive(Debug, Clone)] + struct MockClock { + timestamp_millis: i64, + } + + impl MockClock { + fn new(timestamp_millis: i64) -> Self { + Self { timestamp_millis } + } + } + + impl Clock for MockClock { + fn current_timestamp_millis(&self) -> i64 { + self.timestamp_millis + } + } + + /// Test that recently written uncommitted files are protected from deletion in Full mode + /// This tests the fix for the race condition where concurrent writer's files could be deleted + #[tokio::test] + async fn test_vacuum_full_protects_recent_uncommitted_files() -> DeltaResult<()> { + use chrono::{DateTime, Utc}; + use object_store::GetResultPayload; + + let store = InMemory::new(); + let source = LocalFileSystem::new_with_prefix("../test/tests/data/simple_table").unwrap(); + let mut stream = source.list(None); + + while let Some(Ok(entity)) = stream.next().await { + let mut contents = vec![]; + match source.get(&entity.location).await.unwrap().payload { + GetResultPayload::File(mut fd, _path) => { + fd.read_to_end(&mut contents).unwrap(); + } + _ => panic!("We should only be dealing in files!"), + } + let content = bytes::Bytes::from(contents); + store + .put(&entity.location, PutPayload::from_bytes(content)) + .await + .unwrap(); + } + + // Add a "recently written" orphaned file that simulates an uncommitted file + let recent_file_path = object_store::path::Path::from("uncommitted-recent.parquet"); + store + .put( + &recent_file_path, + PutPayload::from_bytes(bytes::Bytes::from("test data")), + ) + .await + .unwrap(); + + let mut table = crate::DeltaTableBuilder::from_valid_uri("memory:///") + .unwrap() + .with_storage_backend(Arc::new(store), url::Url::parse("memory:///").unwrap()) + .build() + .unwrap(); + table.load().await.unwrap(); + + // Set current time to 10 days after epoch + let current_time = DateTime::from_timestamp(10 * 24 * 3600, 0) + .unwrap() + .timestamp_millis(); + let mock_clock = Arc::new(MockClock::new(current_time)); + + // Run vacuum with 7-day retention in Full mode + // The recent file should NOT be deleted because it's too new + let (_table, result) = VacuumBuilder::new( + table.log_store(), + table.snapshot().unwrap().snapshot.clone(), + ) + .with_retention_period(Duration::days(7)) + .with_dry_run(true) + .with_mode(VacuumMode::Full) + .with_enforce_retention_duration(false) + .with_clock(mock_clock) + .await + .unwrap(); + + // The recent uncommitted file should NOT be in the deletion list + assert!( + !result.files_deleted.contains(&recent_file_path.to_string()), + "Recent uncommitted file should be protected from deletion, but found in deletion list: {:?}", + result.files_deleted + ); + + Ok(()) + } }