@@ -1812,13 +1812,7 @@ impl CayenneTableProvider {
18121812 // fails with NotFound. Sleeping `OLD_SNAPSHOT_CLEANUP_GRACE` before
18131813 // deleting lets every plan that began under the old listing table
18141814 // finish opening its files.
1815- const OLD_SNAPSHOT_CLEANUP_GRACE : std:: time:: Duration = std:: time:: Duration :: from_secs ( 30 ) ;
1816-
1817- // Collect protected snapshot IDs to preserve during cleanup
1818- let protected_snapshot_ids: HashSet < String > = {
1819- let guard = self . protected_snapshots . read ( ) ;
1820- guard. keys ( ) . cloned ( ) . collect ( )
1821- } ;
1815+ const OLD_SNAPSHOT_CLEANUP_GRACE : std:: time:: Duration = std:: time:: Duration :: from_secs ( 120 ) ;
18221816
18231817 if self . table_metadata . path . starts_with ( "s3://" ) {
18241818 // S3 cleanup uses `self.cleanup_old_snapshots_s3` which holds
@@ -1827,6 +1821,13 @@ impl CayenneTableProvider {
18271821 // `OLD_SNAPSHOT_CLEANUP_GRACE` only delays the next compaction
18281822 // cycle, not user writes or scans.
18291823 tokio:: time:: sleep ( OLD_SNAPSHOT_CLEANUP_GRACE ) . await ;
1824+ // Read the LIVE protected set after the grace period. During the
1825+ // sleep, CDC writers may have created new protected snapshots that
1826+ // must not be deleted.
1827+ let protected_snapshot_ids: HashSet < String > = {
1828+ let guard = self . protected_snapshots . read ( ) ;
1829+ guard. keys ( ) . cloned ( ) . collect ( )
1830+ } ;
18301831 if let Err ( err) = self
18311832 . cleanup_old_snapshots_s3 ( current_snapshot, & protected_snapshot_ids)
18321833 . await
@@ -1840,8 +1841,19 @@ impl CayenneTableProvider {
18401841 let table_path = self . table_metadata . path . clone ( ) ;
18411842 let table_id = self . table_metadata . table_id . clone ( ) ;
18421843 let current_snapshot = current_snapshot. to_string ( ) ;
1844+ let protected_snapshots = Arc :: clone ( & self . protected_snapshots ) ;
18431845 tokio:: spawn ( async move {
18441846 tokio:: time:: sleep ( OLD_SNAPSHOT_CLEANUP_GRACE ) . await ;
1847+ // Read the LIVE protected set after the grace period. During the
1848+ // sleep, CDC writers may have created new protected snapshots
1849+ // that must not be deleted. Capturing the set before the sleep
1850+ // caused a race: compaction clears `protected_snapshots` at
1851+ // commit time, new CDC writes re-populate it, then the stale
1852+ // (empty) captured set causes cleanup to delete them.
1853+ let protected_snapshot_ids: HashSet < String > = {
1854+ let guard = protected_snapshots. read ( ) ;
1855+ guard. keys ( ) . cloned ( ) . collect ( )
1856+ } ;
18451857 let _ = tokio:: task:: spawn_blocking ( move || {
18461858 if let Err ( e) = Self :: cleanup_old_snapshots_blocking (
18471859 & table_path,
@@ -2592,6 +2604,21 @@ impl CayenneTableProvider {
25922604 protected_snapshot_ids. len( )
25932605 ) ;
25942606
2607+ // Parse the current snapshot UUID7 unix timestamp. Directories with a
2608+ // UUID7 timestamp >= this might be in-flight writes that started after compaction committed
2609+ // but haven't added themselves to `protected_snapshots` yet.
2610+ let current_snapshot_unix = uuid:: Uuid :: parse_str ( current_snapshot_id)
2611+ . ok ( )
2612+ . and_then ( |u| u. get_timestamp ( ) )
2613+ . map ( |ts| ts. to_unix ( ) ) ;
2614+
2615+ if current_snapshot_unix. is_none ( ) {
2616+ tracing:: warn!(
2617+ "Unable to extract UUID7 timestamp from current snapshot '{}'; in-flight write protection disabled" ,
2618+ current_snapshot_id
2619+ ) ;
2620+ }
2621+
25952622 // Read all entries in the table directory using blocking I/O
25962623 let entries =
25972624 std:: fs:: read_dir ( & table_dir) . map_err ( |source| CatalogError :: IoError { source } ) ?;
@@ -2623,6 +2650,28 @@ impl CayenneTableProvider {
26232650 continue ;
26242651 }
26252652
2653+ // Skip directories whose UUID7 timestamp is >= the current snapshot.
2654+ // These might be in-flight writes. Deleting them would cause the writer's final rename to fail with ENOENT.
2655+ if let Some ( current_unix) = current_snapshot_unix {
2656+ let dir_unix = uuid:: Uuid :: parse_str ( snapshot_id)
2657+ . ok ( )
2658+ . and_then ( |u| u. get_timestamp ( ) )
2659+ . map ( |ts| ts. to_unix ( ) ) ;
2660+
2661+ match dir_unix {
2662+ Some ( ts) if ts >= current_unix => {
2663+ tracing:: debug!( "Keeping snapshot: {snapshot_id} (newer than current)" ) ;
2664+ continue ;
2665+ }
2666+ None => {
2667+ tracing:: warn!(
2668+ "Unable to extract UUID7 timestamp from snapshot '{snapshot_id}'" ,
2669+ ) ;
2670+ }
2671+ _ => { }
2672+ }
2673+ }
2674+
26262675 // Delete the old snapshot directory using blocking I/O
26272676 tracing:: info!(
26282677 "Deleting old snapshot directory for table {}: {}" ,
@@ -8125,6 +8174,7 @@ mod tests {
81258174 use rstest:: rstest;
81268175 use std:: collections:: HashMap ;
81278176 use std:: sync:: Arc ;
8177+ use tempfile:: TempDir ;
81288178 use test_framework:: arrow_record_batch_gen:: * ;
81298179
81308180 #[ test]
@@ -9202,4 +9252,102 @@ mod tests {
92029252 Ok ( completed) => panic ! ( "read fence acquired despite held write fence: {completed:?}" ) ,
92039253 }
92049254 }
9255+
9256+ // =================================
9257+ // UUID7 snapshot timestamp parsing
9258+ // =================================
9259+
9260+ #[ test]
9261+ fn uuid7_snapshot_timestamp_is_extractable_and_ordered ( ) {
9262+ // Simulate two snapshot IDs created at different times via Uuid::now_v7().
9263+ let older = uuid:: Uuid :: now_v7 ( ) ;
9264+ // Advance the embedded timestamp by creating a second UUID slightly later.
9265+ std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 10 ) ) ;
9266+ let newer = uuid:: Uuid :: now_v7 ( ) ;
9267+
9268+ let ts_older = older
9269+ . get_timestamp ( )
9270+ . expect ( "UUID v7 should have an extractable timestamp" )
9271+ . to_unix ( ) ;
9272+ let ts_newer = newer
9273+ . get_timestamp ( )
9274+ . expect ( "UUID v7 should have an extractable timestamp" )
9275+ . to_unix ( ) ;
9276+
9277+ assert ! (
9278+ ts_older <= ts_newer,
9279+ "older UUID7 timestamp must be <= newer UUID7 timestamp"
9280+ ) ;
9281+
9282+ // Verify round-trip through string representation (as used by cleanup).
9283+ let older_str = older. to_string ( ) ;
9284+ let newer_str = newer. to_string ( ) ;
9285+
9286+ let parsed_older_ts = uuid:: Uuid :: parse_str ( & older_str)
9287+ . expect ( "valid UUID string" )
9288+ . get_timestamp ( )
9289+ . expect ( "parsed UUID v7 should yield a timestamp" )
9290+ . to_unix ( ) ;
9291+ let parsed_newer_ts = uuid:: Uuid :: parse_str ( & newer_str)
9292+ . expect ( "valid UUID string" )
9293+ . get_timestamp ( )
9294+ . expect ( "parsed UUID v7 should yield a timestamp" )
9295+ . to_unix ( ) ;
9296+
9297+ assert ! (
9298+ parsed_older_ts <= parsed_newer_ts,
9299+ "timestamp ordering must survive string round-trip"
9300+ ) ;
9301+ }
9302+
9303+ #[ test]
9304+ fn cleanup_skips_snapshots_newer_than_current ( ) {
9305+ let tmp = TempDir :: new ( ) . expect ( "create temp dir" ) ;
9306+ let table_path = tmp. path ( ) . to_str ( ) . expect ( "valid UTF-8 path" ) ;
9307+ let table_id = uuid:: Uuid :: now_v7 ( ) . to_string ( ) ;
9308+
9309+ // Create the table directory.
9310+ let table_dir = tmp. path ( ) . join ( & table_id) ;
9311+ std:: fs:: create_dir_all ( & table_dir) . expect ( "create table dir" ) ;
9312+
9313+ // Create 3 snapshot directories:
9314+ // - old_snapshot (older than current) → should be deleted
9315+ // - current_snapshot → should be kept
9316+ // - newer_snapshot (newer than current, simulating in-flight write) → should be kept
9317+ let old_snapshot = uuid:: Uuid :: now_v7 ( ) . to_string ( ) ;
9318+ std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 2 ) ) ;
9319+ let current_snapshot = uuid:: Uuid :: now_v7 ( ) . to_string ( ) ;
9320+ std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 2 ) ) ;
9321+ let newer_snapshot = uuid:: Uuid :: now_v7 ( ) . to_string ( ) ;
9322+
9323+ std:: fs:: create_dir ( table_dir. join ( & old_snapshot) ) . expect ( "create old snapshot dir" ) ;
9324+ std:: fs:: create_dir ( table_dir. join ( & current_snapshot) ) . expect ( "create current dir" ) ;
9325+ std:: fs:: create_dir ( table_dir. join ( & newer_snapshot) ) . expect ( "create newer dir" ) ;
9326+
9327+ let protected: HashSet < String > = HashSet :: new ( ) ;
9328+
9329+ CayenneTableProvider :: cleanup_old_snapshots_blocking (
9330+ table_path,
9331+ & table_id,
9332+ & current_snapshot,
9333+ & protected,
9334+ )
9335+ . expect ( "cleanup should succeed" ) ;
9336+
9337+ // old_snapshot should be deleted
9338+ assert ! (
9339+ !table_dir. join( & old_snapshot) . exists( ) ,
9340+ "old snapshot should be deleted"
9341+ ) ;
9342+ // current_snapshot should be kept
9343+ assert ! (
9344+ table_dir. join( & current_snapshot) . exists( ) ,
9345+ "current snapshot must be preserved"
9346+ ) ;
9347+ // newer_snapshot should be kept (in-flight write protection)
9348+ assert ! (
9349+ table_dir. join( & newer_snapshot) . exists( ) ,
9350+ "snapshot newer than current must be preserved (in-flight write)"
9351+ ) ;
9352+ }
92059353}
0 commit comments