diff --git a/crates/core/src/operations/vacuum.rs b/crates/core/src/operations/vacuum.rs index c977824a1a..4920edf7f8 100644 --- a/crates/core/src/operations/vacuum.rs +++ b/crates/core/src/operations/vacuum.rs @@ -316,6 +316,18 @@ impl VacuumBuilder { // If the file is not an expired tombstone and we have gotten to here with a // VacuumMode::Full then it should be added to the deletion plan if !expired_tombstones.contains(obj_meta.location.as_ref()) { + // For files without tombstones (uncommitted or orphaned files), + // check their physical age to protect recently written files from deletion. + // This prevents race conditions where a concurrent writer's uncommitted files + // could be deleted before the transaction is committed. + let file_age_millis = now_millis - obj_meta.last_modified.timestamp_millis(); + if file_age_millis < retention_period.num_milliseconds() { + debug!( + "The file {:?} is not in the log but too recent , protecting from vacuum", + &obj_meta.location, + ); + continue; + } if self.mode == VacuumMode::Lite { debug!("The file {:?} was not referenced in a log file, but VacuumMode::Lite means it will not be vacuumed", &obj_meta.location); continue; @@ -798,4 +810,95 @@ mod tests { assert_eq!(result.files_deleted, empty); Ok(()) } + + /// Mock clock for testing time-dependent vacuum behavior + #[derive(Debug, Clone)] + struct MockClock { + timestamp_millis: i64, + } + + impl MockClock { + fn new(timestamp_millis: i64) -> Self { + Self { timestamp_millis } + } + } + + impl Clock for MockClock { + fn current_timestamp_millis(&self) -> i64 { + self.timestamp_millis + } + } + + /// Test that recently written uncommitted files are protected from deletion in Full mode + /// This tests the fix for the race condition where concurrent writer's files could be deleted + #[tokio::test] + async fn test_vacuum_full_protects_recent_uncommitted_files() -> DeltaResult<()> { + use chrono::{DateTime, Utc}; + use object_store::GetResultPayload; + + let store = InMemory::new(); + let source = LocalFileSystem::new_with_prefix("../test/tests/data/simple_table").unwrap(); + let mut stream = source.list(None); + + while let Some(Ok(entity)) = stream.next().await { + let mut contents = vec![]; + match source.get(&entity.location).await.unwrap().payload { + GetResultPayload::File(mut fd, _path) => { + fd.read_to_end(&mut contents).unwrap(); + } + _ => panic!("We should only be dealing in files!"), + } + let content = bytes::Bytes::from(contents); + store + .put(&entity.location, PutPayload::from_bytes(content)) + .await + .unwrap(); + } + + // Add a "recently written" orphaned file that simulates an uncommitted file + let recent_file_path = object_store::path::Path::from("uncommitted-recent.parquet"); + store + .put( + &recent_file_path, + PutPayload::from_bytes(bytes::Bytes::from("test data")), + ) + .await + .unwrap(); + + let mut table = crate::DeltaTableBuilder::from_valid_uri("memory:///") + .unwrap() + .with_storage_backend(Arc::new(store), url::Url::parse("memory:///").unwrap()) + .build() + .unwrap(); + table.load().await.unwrap(); + + // Set current time to 10 days after epoch + let current_time = DateTime::from_timestamp(10 * 24 * 3600, 0) + .unwrap() + .timestamp_millis(); + let mock_clock = Arc::new(MockClock::new(current_time)); + + // Run vacuum with 7-day retention in Full mode + // The recent file should NOT be deleted because it's too new + let (_table, result) = VacuumBuilder::new( + table.log_store(), + table.snapshot().unwrap().snapshot.clone(), + ) + .with_retention_period(Duration::days(7)) + .with_dry_run(true) + .with_mode(VacuumMode::Full) + .with_enforce_retention_duration(false) + .with_clock(mock_clock) + .await + .unwrap(); + + // The recent uncommitted file should NOT be in the deletion list + assert!( + !result.files_deleted.contains(&recent_file_path.to_string()), + "Recent uncommitted file should be protected from deletion, but found in deletion list: {:?}", + result.files_deleted + ); + + Ok(()) + } }