diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index ec9ffd9991..983e2e64ca 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -161,6 +161,9 @@ pub type LogStoreRef = Arc; static DELTA_LOG_PATH: LazyLock = LazyLock::new(|| Path::from("_delta_log")); +pub(crate) static DELTA_LOG_REGEX: LazyLock = + LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap()); + /// Return the [LogStoreRef] for the provided [Url] location /// /// This will use the built-in process global [crate::storage::ObjectStoreRegistry] by default @@ -637,9 +640,6 @@ impl<'de> Deserialize<'de> for LogStoreConfig { } } -static DELTA_LOG_REGEX: LazyLock = - LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap()); - /// Extract version from a file name in the delta log pub fn extract_version_from_filename(name: &str) -> Option { DELTA_LOG_REGEX diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 828e80617b..dda4124c1a 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -6,7 +6,7 @@ use url::Url; use arrow::compute::filter_record_batch; use arrow_array::{BooleanArray, RecordBatch}; -use chrono::Utc; +use chrono::{TimeZone, Utc}; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine_data::FilteredEngineData; use delta_kernel::last_checkpoint_hint::LastCheckpointHint; @@ -22,11 +22,14 @@ use tokio::task::spawn_blocking; use tracing::{debug, error, warn}; use uuid::Uuid; -use crate::logstore::{LogStore, LogStoreExt}; +use crate::logstore::{LogStore, LogStoreExt, DELTA_LOG_REGEX}; use crate::table::config::TablePropertiesExt as _; use crate::{open_table_with_version, DeltaTable}; use crate::{DeltaResult, DeltaTableError}; +static CHECKPOINT_REGEX: LazyLock = + LazyLock::new(|| Regex::new(r"_delta_log/(\d{20})\.(checkpoint).*$").unwrap()); + /// Creates checkpoint for a given table version, table state and object store pub(crate) async fn create_checkpoint_for( version: u64, @@ -181,62 +184,119 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup( Ok(()) } -/// Deletes all delta log commits that are older than the cutoff time -/// and less than the specified version. +/// Delete expired Delta log files up to a safe checkpoint boundary. +/// +/// This routine removes JSON commit files, in-progress JSON temp files, and +/// checkpoint files under `_delta_log/` that are both: +/// - older than the provided `cutoff_timestamp` (milliseconds since epoch), and +/// - strictly less than the provided `until_version`. +/// +/// Safety guarantee: +/// To avoid deleting files that might still be required to reconstruct the +/// table state at or before the requested cutoff, the function first identifies +/// the most recent checkpoint whose version is `<= until_version` and whose file +/// modification time is `<= cutoff_timestamp`. Only files strictly older than +/// this checkpoint (both by version and timestamp) are considered for deletion. +/// If no such checkpoint exists (including when there is no `_last_checkpoint`), +/// the function performs no deletions and returns `Ok(0)`. +/// +/// See also: https://github.com/delta-io/delta-rs/issues/3692 for background on +/// why cleanup must align to an existing checkpoint. pub async fn cleanup_expired_logs_for( - until_version: i64, + mut keep_version: i64, log_store: &dyn LogStore, cutoff_timestamp: i64, operation_id: Option, ) -> DeltaResult { - static DELTA_LOG_REGEX: LazyLock = LazyLock::new(|| { - Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint|json.tmp).*$").unwrap() - }); - - let object_store = log_store.object_store(None); + debug!("called cleanup_expired_logs_for"); + let object_store = log_store.object_store(operation_id); let log_path = log_store.log_path(); - let maybe_last_checkpoint = read_last_checkpoint(&object_store, log_path).await?; - let Some(last_checkpoint) = maybe_last_checkpoint else { + // List all log entries under _delta_log + let log_entries: Vec> = + object_store.list(Some(log_path)).collect().await; + + debug!("starting keep_version: {:?}", keep_version); + debug!( + "starting cutoff_timestamp: {:?}", + Utc.timestamp_millis_opt(cutoff_timestamp).unwrap() + ); + + // Step 1: Find min_retention_version among DELTA_LOG files with ts >= cutoff_timestamp + let min_retention_version = log_entries + .iter() + .filter_map(|m| m.as_ref().ok()) + .filter_map(|m| { + let path = m.location.as_ref(); + DELTA_LOG_REGEX + .captures(path) + .and_then(|caps| caps.get(1)) + .and_then(|v| v.as_str().parse::().ok()) + .map(|ver| (ver, m.last_modified.timestamp_millis())) + }) + .filter(|(_, ts)| *ts >= cutoff_timestamp) + .map(|(ver, _)| ver) + .min(); + + let min_retention_version = min_retention_version.unwrap_or(keep_version); + + // Step 2: Move keep_version down to the minimum version inside the retention period to make sure + // every version inside the retention period is kept. + keep_version = keep_version.min(min_retention_version); + + // Step 3: Find safe checkpoint with checkpoint_version <= keep_version (no ts restriction) + let safe_checkpoint_version_opt = log_entries + .iter() + .filter_map(|m| m.as_ref().ok()) + .filter_map(|m| { + let path = m.location.as_ref(); + CHECKPOINT_REGEX + .captures(path) + .and_then(|caps| caps.get(1)) + .and_then(|v| v.as_str().parse::().ok()) + }) + .filter(|ver| *ver <= keep_version) + .max(); + + // Exit if no safe_checkpoint file was found. + let Some(safe_checkpoint_version) = safe_checkpoint_version_opt else { + debug!( + "Not cleaning metadata files, could not find a checkpoint with version <= keep_version ({})", + keep_version + ); return Ok(0); }; - let until_version = i64::min(until_version, last_checkpoint.version as i64); + debug!("safe_checkpoint_version: {}", safe_checkpoint_version); + + // Step 4: Delete DELTA_LOG files where log_ver < safe_checkpoint_version && ts <= cutoff_timestamp + let locations = futures::stream::iter(log_entries.into_iter()) + .filter_map(|meta: Result| async move { + let meta = match meta { + Ok(m) => m, + Err(err) => { + error!("Error received while cleaning up expired logs: {err:?}"); + return None; + } + }; + let path_str = meta.location.as_ref(); + let captures = DELTA_LOG_REGEX.captures(path_str)?; + let ts = meta.last_modified.timestamp_millis(); + let log_ver_str = captures.get(1).unwrap().as_str(); + let Ok(log_ver) = log_ver_str.parse::() else { + return None; + }; + if log_ver < safe_checkpoint_version && ts <= cutoff_timestamp { + debug!("file to delete: {:?}", meta.location); + Some(Ok(meta.location)) + } else { + None + } + }) + .boxed(); - // Feed a stream of candidate deletion files directly into the delete_stream - // function to try to improve the speed of cleanup and reduce the need for - // intermediate memory. - let object_store = log_store.object_store(operation_id); let deleted = object_store - .delete_stream( - object_store - .list(Some(log_store.log_path())) - // This predicate function will filter out any locations that don't - // match the given timestamp range - .filter_map(|meta: Result| async move { - if meta.is_err() { - error!("Error received while cleaning up expired logs: {meta:?}"); - return None; - } - let meta = meta.unwrap(); - let ts = meta.last_modified.timestamp_millis(); - - match DELTA_LOG_REGEX.captures(meta.location.as_ref()) { - Some(captures) => { - let log_ver_str = captures.get(1).unwrap().as_str(); - let log_ver: i64 = log_ver_str.parse().unwrap(); - if log_ver < until_version && ts <= cutoff_timestamp { - // This location is ready to be deleted - Some(Ok(meta.location)) - } else { - None - } - } - None => None, - } - }) - .boxed(), - ) + .delete_stream(locations) .try_collect::>() .await?; @@ -244,29 +304,6 @@ pub async fn cleanup_expired_logs_for( Ok(deleted.len()) } -/// Try reading the `_last_checkpoint` file. -/// -/// Note that we typically want to ignore a missing/invalid `_last_checkpoint` file without failing -/// the read. Thus, the semantics of this function are to return `None` if the file is not found or -/// is invalid JSON. Unexpected/unrecoverable errors are returned as `Err` case and are assumed to -/// cause failure. -async fn read_last_checkpoint( - storage: &dyn ObjectStore, - log_path: &Path, -) -> DeltaResult> { - const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; - let file_path = log_path.child(LAST_CHECKPOINT_FILE_NAME); - let maybe_data = storage.get(&file_path).await; - let data = match maybe_data { - Ok(data) => data.bytes().await?, - Err(Error::NotFound { .. }) => return Ok(None), - Err(err) => return Err(err.into()), - }; - Ok(serde_json::from_slice(&data) - .inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}")) - .ok()) -} - #[cfg(test)] mod tests { use std::sync::Arc; @@ -285,6 +322,29 @@ mod tests { use crate::writer::test_utils::get_delta_schema; use crate::DeltaResult; + /// Try reading the `_last_checkpoint` file. + /// + /// Note that we typically want to ignore a missing/invalid `_last_checkpoint` file without failing + /// the read. Thus, the semantics of this function are to return `None` if the file is not found or + /// is invalid JSON. Unexpected/unrecoverable errors are returned as `Err` case and are assumed to + /// cause failure. + async fn read_last_checkpoint( + storage: &dyn ObjectStore, + log_path: &Path, + ) -> DeltaResult> { + const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; + let file_path = log_path.child(LAST_CHECKPOINT_FILE_NAME); + let maybe_data = storage.get(&file_path).await; + let data = match maybe_data { + Ok(data) => data.bytes().await?, + Err(Error::NotFound { .. }) => return Ok(None), + Err(err) => return Err(err.into()), + }; + Ok(serde_json::from_slice(&data) + .inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}")) + .ok()) + } + #[tokio::test] async fn test_create_checkpoint_for() { let table_schema = get_delta_schema(); diff --git a/crates/core/tests/checkpoint_writer.rs b/crates/core/tests/checkpoint_writer.rs index 2ccde70a78..06753aff8c 100644 --- a/crates/core/tests/checkpoint_writer.rs +++ b/crates/core/tests/checkpoint_writer.rs @@ -196,7 +196,7 @@ mod delete_expired_delta_log_in_checkpoint { Some(HashMap::from([ ( TableProperty::LogRetentionDuration.as_ref().into(), - Some("interval 10 minute".to_string()), + Some("interval 0 minute".to_string()), ), ( TableProperty::EnableExpiredLogCleanup.as_ref().into(), @@ -225,8 +225,8 @@ mod delete_expired_delta_log_in_checkpoint { // set last_modified set_file_last_modified(0, 25 * 60 * 1000); // 25 mins ago, should be deleted - set_file_last_modified(1, 15 * 60 * 1000); // 25 mins ago, should be deleted - set_file_last_modified(2, 5 * 60 * 1000); // 25 mins ago, should be kept + set_file_last_modified(1, 15 * 60 * 1000); // 15 mins ago, should be deleted + set_file_last_modified(2, 5 * 60 * 1000); // 5 mins ago, should be kept as last safe checkpoint table.load_version(0).await.expect("Cannot load version 0"); table.load_version(1).await.expect("Cannot load version 1"); @@ -267,6 +267,104 @@ mod delete_expired_delta_log_in_checkpoint { table.load_version(2).await.expect("Cannot load version 2"); } + // Test to verify that intermediate versions can still be loaded after the checkpoint is created. + // This is to verify the behavior of `cleanup_expired_logs_for` and its use of safe checkpoints. + #[tokio::test] + async fn test_delete_expired_logs_safe_checkpoint() { + // For additional tracing: + // let _ = pretty_env_logger::try_init(); + let mut table = fs_common::create_table( + "../test/tests/data/checkpoints_with_expired_logs/expired_with_checkpoint", + Some(HashMap::from([ + ( + TableProperty::LogRetentionDuration.as_ref().into(), + Some("interval 10 minute".to_string()), + ), + ( + TableProperty::EnableExpiredLogCleanup.as_ref().into(), + Some("true".to_string()), + ), + ])), + ) + .await; + + let table_path = table.table_uri(); + let set_file_last_modified = |version: usize, last_modified_millis: u64, suffix: &str| { + let path = format!("{table_path}_delta_log/{version:020}.{suffix}"); + let file = OpenOptions::new().write(true).open(path).unwrap(); + let last_modified = SystemTime::now().sub(Duration::from_millis(last_modified_millis)); + let times = FileTimes::new() + .set_modified(last_modified) + .set_accessed(last_modified); + file.set_times(times).unwrap(); + }; + + // create 4 commits + let a1 = fs_common::add(0); + let a2 = fs_common::add(0); + let a3 = fs_common::add(0); + let a4 = fs_common::add(0); + assert_eq!(1, fs_common::commit_add(&mut table, &a1).await); + assert_eq!(2, fs_common::commit_add(&mut table, &a2).await); + assert_eq!(3, fs_common::commit_add(&mut table, &a3).await); + assert_eq!(4, fs_common::commit_add(&mut table, &a4).await); + + // set last_modified + set_file_last_modified(0, 25 * 60 * 1000, "json"); // 25 mins ago, should be deleted + set_file_last_modified(1, 20 * 60 * 1000, "json"); // 20 mins ago, last safe checkpoint, should be kept + set_file_last_modified(2, 15 * 60 * 1000, "json"); // 15 mins ago, fails retention cutoff, but needed by v3 + set_file_last_modified(3, 6 * 60 * 1000, "json"); // 6 mins ago, should be kept by retention cutoff + set_file_last_modified(4, 5 * 60 * 1000, "json"); // 5 mins ago, should be kept by retention cutoff + + table.load_version(0).await.expect("Cannot load version 0"); + table.load_version(1).await.expect("Cannot load version 1"); + table.load_version(2).await.expect("Cannot load version 2"); + table.load_version(3).await.expect("Cannot load version 3"); + table.load_version(4).await.expect("Cannot load version 4"); + + // Create checkpoint for version 1 + checkpoints::create_checkpoint_from_table_uri_and_cleanup( + deltalake_core::ensure_table_uri(&table.table_uri()).unwrap(), + 1, + Some(false), + None, + ) + .await + .unwrap(); + + // Update checkpoint time for version 1 to be just after version 1 data + set_file_last_modified(1, 20 * 60 * 1000 - 10, "checkpoint.parquet"); + + // Checkpoint final version + checkpoints::create_checkpoint_from_table_uri_and_cleanup( + deltalake_core::ensure_table_uri(&table.table_uri()).unwrap(), + table.version().unwrap(), + None, + None, + ) + .await + .unwrap(); + + table.update().await.unwrap(); // make table to read the checkpoint + assert_eq!( + table + .snapshot() + .unwrap() + .file_paths_iter() + .collect::>(), + vec![ + ObjectStorePath::from(a4.path.as_ref()), + ObjectStorePath::from(a3.path.as_ref()), + ObjectStorePath::from(a2.path.as_ref()), + ObjectStorePath::from(a1.path.as_ref()), + ] + ); + + // Without going back to a safe checkpoint, previously loading version 3 would fail. + table.load_version(3).await.expect("Cannot load version 3"); + table.load_version(4).await.expect("Cannot load version 4"); + } + #[tokio::test] async fn test_not_delete_expired_logs() { let mut table = fs_common::create_table( diff --git a/crates/core/tests/integration_checkpoint.rs b/crates/core/tests/integration_checkpoint.rs index a446adf72c..92c6b6e512 100644 --- a/crates/core/tests/integration_checkpoint.rs +++ b/crates/core/tests/integration_checkpoint.rs @@ -106,15 +106,15 @@ async fn test_issue_1420_cleanup_expired_logs_for() -> DeltaResult<()> { writer.write(vec![json!({"id": 1})]).await?; writer.flush_and_commit(&mut table).await?; // v1 - let ts = Utc::now(); // use this ts for log retention expiry - sleep(Duration::from_secs(1)).await; - writer.write(vec![json!({"id": 2})]).await?; writer.flush_and_commit(&mut table).await?; // v2 assert_eq!(table.version(), Some(2)); create_checkpoint(&table, None).await.unwrap(); // v2.checkpoint.parquet + sleep(Duration::from_secs(1)).await; + let ts = Utc::now(); // use this ts for log retention expiry + // Should delete v1 but not v2 or v2.checkpoint.parquet cleanup_expired_logs_for( table.version().unwrap(), diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index 6abfc0e8b2..26ab4897d5 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -95,8 +95,9 @@ def test_cleanup_metadata(tmp_path: pathlib.Path, sample_table: Table): ) third_log_path = tmp_table_path / "_delta_log" / "00000000000000000002.json" - assert not first_log_path.exists() - assert not first_failed_log_path.exists() + # These first two files are kept because there is no safe checkpoint to make them obsolete + assert first_log_path.exists() + assert first_failed_log_path.exists() assert second_log_path.exists() assert third_log_path.exists() assert second_failed_log_path.exists() @@ -119,8 +120,9 @@ def test_cleanup_metadata_log_cleanup_hook(tmp_path: pathlib.Path, sample_table: ) third_log_path = tmp_table_path / "_delta_log" / "00000000000000000002.json" - assert not first_log_path.exists() - assert not first_failed_log_path.exists() + # These first two files are kept because there is no safe checkpoint to make them obsolete + assert first_log_path.exists() + assert first_failed_log_path.exists() assert second_log_path.exists() assert third_log_path.exists() assert second_failed_log_path.exists() diff --git a/python/tests/test_cleanup.py b/python/tests/test_cleanup.py new file mode 100644 index 0000000000..bdc98a31cf --- /dev/null +++ b/python/tests/test_cleanup.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 + +import os +import pathlib +import shutil + +import pytest +from arro3.core import Array, DataType, Table +from arro3.core import Field as ArrowField + +from deltalake import DeltaTable, write_deltalake + + +def clean_data_dir(data_path): + if os.path.exists(data_path): + try: + shutil.rmtree(data_path) + except Exception as e: + print(f"Error deleting directory {data_path}: {e}") + + +def print_log_dir(path): + # Show log contents + log_path = os.path.join(path, "_delta_log") + if os.path.exists(log_path): + print("Delta log contents:") + for file in sorted(os.listdir(log_path)): + print(f" {file}") + + +def valid_gc_data(version) -> Table: + id_col = ArrowField("id", DataType.int32(), nullable=True) + gc = ArrowField("gc", DataType.int32(), nullable=True).with_metadata( + {"delta.generationExpression": "10"} + ) + data = Table.from_pydict( + {"id": Array([version, version], type=id_col), "gc": Array([10, 10], type=gc)}, + ) + return data + + +@pytest.mark.pandas +def test_failed_cleanup(tmp_path: pathlib.Path): + data_path = tmp_path + clean_data_dir(data_path) + + # write 10 versions of the data + for i in range(10): + data = valid_gc_data(i) + write_deltalake( + data_path, + mode="overwrite", + data=data, + configuration={ + "delta.minWriterVersion": "7", + "delta.logRetentionDuration": "interval 0 day", + }, + ) + + # checkpoint final version + table = DeltaTable(data_path) + table.create_checkpoint() + + # show log contents + print("log contents before cleanup:") + print_log_dir(data_path) + + # Call cleanup metadata + table = DeltaTable(data_path, version=5) + # table.create_checkpoint() # Workaround, manually create checkpoint needed to load version >= 5 + table.cleanup_metadata() + + # show log contents + print("\n################################################") + print("log contents after cleanup:") + print_log_dir(data_path) + + # Load old version + table = DeltaTable(data_path, version=5) + df2 = table.to_pandas() + print("\n################################################") + print("Version 5 of the data:") + print(df2)