From 399f626e4d10ce1784d0f24b23de34baa7e88e3a Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Mon, 8 Sep 2025 18:39:55 -0700 Subject: [PATCH 1/8] fix: use a safe checkpoint when cleaning up metadata Inside the function table.cleanup_metadata look for the last checkpoint before the requested minimum cutoff time and version. Only delete files before that checkpoint. This is to fix the bug below and make sure that versions inside the retention period are still loadable. Signed-off-by: Corwin Joy --- crates/core/src/protocol/checkpoints.rs | 189 +++++++++++++------- crates/core/tests/checkpoint_writer.rs | 101 ++++++++++- crates/core/tests/integration_checkpoint.rs | 6 +- python/tests/test_checkpoint.py | 10 +- 4 files changed, 232 insertions(+), 74 deletions(-) diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index aaaa02e16a..df5ab004f1 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -6,7 +6,7 @@ use url::Url; use arrow::compute::filter_record_batch; use arrow_array::{BooleanArray, RecordBatch}; -use chrono::Utc; +use chrono::{TimeZone, Utc}; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine_data::FilteredEngineData; use delta_kernel::last_checkpoint_hint::LastCheckpointHint; @@ -179,62 +179,127 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup( Ok(()) } -/// Deletes all delta log commits that are older than the cutoff time -/// and less than the specified version. +/// Delete expired Delta log files up to a safe checkpoint boundary. +/// +/// This routine removes JSON commit files, in-progress JSON temp files, and +/// checkpoint files under `_delta_log/` that are both: +/// - older than the provided `cutoff_timestamp` (milliseconds since epoch), and +/// - strictly less than the provided `until_version`. +/// +/// Safety guarantee: +/// To avoid deleting files that might still be required to reconstruct the +/// table state at or before the requested cutoff, the function first identifies +/// the most recent checkpoint whose version is `<= until_version` and whose file +/// modification time is `<= cutoff_timestamp`. Only files strictly older than +/// this checkpoint (both by version and timestamp) are considered for deletion. +/// If no such checkpoint exists (including when there is no `_last_checkpoint`), +/// the function performs no deletions and returns `Ok(0)`. +/// +/// See also: https://github.com/delta-io/delta-rs/issues/3692 for background on +/// why cleanup must align to an existing checkpoint. pub async fn cleanup_expired_logs_for( - until_version: i64, + mut keep_version: i64, log_store: &dyn LogStore, cutoff_timestamp: i64, operation_id: Option, ) -> DeltaResult { + debug!("called cleanup_expired_logs_for"); static DELTA_LOG_REGEX: LazyLock = LazyLock::new(|| { Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint|json.tmp).*$").unwrap() }); + static CHECKPOINT_REGEX: LazyLock = + LazyLock::new(|| Regex::new(r"_delta_log/(\d{20})\.(checkpoint).*$").unwrap()); let object_store = log_store.object_store(None); let log_path = log_store.log_path(); - let maybe_last_checkpoint = read_last_checkpoint(&object_store, log_path).await?; - let Some(last_checkpoint) = maybe_last_checkpoint else { + // List all log entries under _delta_log + let log_entries: Vec> = + object_store.list(Some(log_path)).collect().await; + + debug!("starting keep_version: {:?}", keep_version); + debug!( + "starting cutoff_timestamp: {:?}", + Utc.timestamp_millis_opt(cutoff_timestamp).unwrap() + ); + + // Step 1: Find min_retention_version among DELTA_LOG files with ts >= cutoff_timestamp + let min_retention_version = log_entries + .iter() + .filter_map(|m| m.as_ref().ok()) + .filter_map(|m| { + let path = m.location.as_ref(); + DELTA_LOG_REGEX + .captures(path) + .and_then(|caps| caps.get(1)) + .and_then(|v| v.as_str().parse::().ok()) + .map(|ver| (ver, m.last_modified.timestamp_millis())) + }) + .filter(|(_, ts)| *ts >= cutoff_timestamp) + .map(|(ver, _)| ver) + .min(); + + let min_retention_version = min_retention_version.unwrap_or(keep_version); + + // Step 2: Move keep_version down to the minimum version inside the retention period to make sure + // every version inside the retention period is kept. + keep_version = keep_version.min(min_retention_version); + + // Step 3: Find safe checkpoint with checkpoint_version <= keep_version (no ts restriction) + let safe_checkpoint_version_opt = log_entries + .iter() + .filter_map(|m| m.as_ref().ok()) + .filter_map(|m| { + let path = m.location.as_ref(); + CHECKPOINT_REGEX + .captures(path) + .and_then(|caps| caps.get(1)) + .and_then(|v| v.as_str().parse::().ok()) + }) + .filter(|ver| *ver <= keep_version) + .max(); + + // Exit if no safe_checkpoint file was found. + let Some(safe_checkpoint_version) = safe_checkpoint_version_opt else { + debug!( + "Not cleaning metadata files, could not find a checkpoint with version <= keep_version ({})", + keep_version + ); return Ok(0); }; - let until_version = i64::min(until_version, last_checkpoint.version as i64); + debug!("safe_checkpoint_version: {}", safe_checkpoint_version); - // Feed a stream of candidate deletion files directly into the delete_stream - // function to try to improve the speed of cleanup and reduce the need for - // intermediate memory. + // Step 4: Delete DELTA_LOG files where log_ver < safe_checkpoint_version && ts <= cutoff_timestamp let object_store = log_store.object_store(operation_id); + + let locations = futures::stream::iter(log_entries.into_iter()) + .filter_map(|meta: Result| async move { + let meta = match meta { + Ok(m) => m, + Err(err) => { + error!("Error received while cleaning up expired logs: {err:?}"); + return None; + } + }; + let path_str = meta.location.as_ref(); + let captures = DELTA_LOG_REGEX.captures(path_str)?; + let ts = meta.last_modified.timestamp_millis(); + let log_ver_str = captures.get(1).unwrap().as_str(); + let Ok(log_ver) = log_ver_str.parse::() else { + return None; + }; + if log_ver < safe_checkpoint_version && ts <= cutoff_timestamp { + debug!("file to delete: {:?}", meta.location); + Some(Ok(meta.location)) + } else { + None + } + }) + .boxed(); + let deleted = object_store - .delete_stream( - object_store - .list(Some(log_store.log_path())) - // This predicate function will filter out any locations that don't - // match the given timestamp range - .filter_map(|meta: Result| async move { - if meta.is_err() { - error!("Error received while cleaning up expired logs: {meta:?}"); - return None; - } - let meta = meta.unwrap(); - let ts = meta.last_modified.timestamp_millis(); - - match DELTA_LOG_REGEX.captures(meta.location.as_ref()) { - Some(captures) => { - let log_ver_str = captures.get(1).unwrap().as_str(); - let log_ver: i64 = log_ver_str.parse().unwrap(); - if log_ver < until_version && ts <= cutoff_timestamp { - // This location is ready to be deleted - Some(Ok(meta.location)) - } else { - None - } - } - None => None, - } - }) - .boxed(), - ) + .delete_stream(locations) .try_collect::>() .await?; @@ -242,29 +307,6 @@ pub async fn cleanup_expired_logs_for( Ok(deleted.len()) } -/// Try reading the `_last_checkpoint` file. -/// -/// Note that we typically want to ignore a missing/invalid `_last_checkpoint` file without failing -/// the read. Thus, the semantics of this function are to return `None` if the file is not found or -/// is invalid JSON. Unexpected/unrecoverable errors are returned as `Err` case and are assumed to -/// cause failure. -async fn read_last_checkpoint( - storage: &dyn ObjectStore, - log_path: &Path, -) -> DeltaResult> { - const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; - let file_path = log_path.child(LAST_CHECKPOINT_FILE_NAME); - let maybe_data = storage.get(&file_path).await; - let data = match maybe_data { - Ok(data) => data.bytes().await?, - Err(Error::NotFound { .. }) => return Ok(None), - Err(err) => return Err(err.into()), - }; - Ok(serde_json::from_slice(&data) - .inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}")) - .ok()) -} - #[cfg(test)] mod tests { use std::sync::Arc; @@ -283,6 +325,29 @@ mod tests { use crate::writer::test_utils::get_delta_schema; use crate::DeltaResult; + /// Try reading the `_last_checkpoint` file. + /// + /// Note that we typically want to ignore a missing/invalid `_last_checkpoint` file without failing + /// the read. Thus, the semantics of this function are to return `None` if the file is not found or + /// is invalid JSON. Unexpected/unrecoverable errors are returned as `Err` case and are assumed to + /// cause failure. + async fn read_last_checkpoint( + storage: &dyn ObjectStore, + log_path: &Path, + ) -> DeltaResult> { + const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; + let file_path = log_path.child(LAST_CHECKPOINT_FILE_NAME); + let maybe_data = storage.get(&file_path).await; + let data = match maybe_data { + Ok(data) => data.bytes().await?, + Err(Error::NotFound { .. }) => return Ok(None), + Err(err) => return Err(err.into()), + }; + Ok(serde_json::from_slice(&data) + .inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}")) + .ok()) + } + #[tokio::test] async fn test_create_checkpoint_for() { let table_schema = get_delta_schema(); diff --git a/crates/core/tests/checkpoint_writer.rs b/crates/core/tests/checkpoint_writer.rs index 74ba66115c..74a7664389 100644 --- a/crates/core/tests/checkpoint_writer.rs +++ b/crates/core/tests/checkpoint_writer.rs @@ -194,11 +194,10 @@ mod delete_expired_delta_log_in_checkpoint { let mut table = fs_common::create_table( "../test/tests/data/checkpoints_with_expired_logs/expired", Some(hashmap! { - TableProperty::LogRetentionDuration.as_ref().into() => Some("interval 10 minute".to_string()), + TableProperty::LogRetentionDuration.as_ref().into() => Some("interval 0 minute".to_string()), TableProperty::EnableExpiredLogCleanup.as_ref().into() => Some("true".to_string()) }), - ) - .await; + ).await; let table_path = table.table_uri(); let set_file_last_modified = |version: usize, last_modified_millis: u64| { @@ -219,8 +218,8 @@ mod delete_expired_delta_log_in_checkpoint { // set last_modified set_file_last_modified(0, 25 * 60 * 1000); // 25 mins ago, should be deleted - set_file_last_modified(1, 15 * 60 * 1000); // 25 mins ago, should be deleted - set_file_last_modified(2, 5 * 60 * 1000); // 25 mins ago, should be kept + set_file_last_modified(1, 15 * 60 * 1000); // 15 mins ago, should be deleted + set_file_last_modified(2, 5 * 60 * 1000); // 5 mins ago, should be kept as last safe checkpoint table.load_version(0).await.expect("Cannot load version 0"); table.load_version(1).await.expect("Cannot load version 1"); @@ -261,6 +260,98 @@ mod delete_expired_delta_log_in_checkpoint { table.load_version(2).await.expect("Cannot load version 2"); } + // Test to verify that intermediate versions can still be loaded after the checkpoint is created. + // This is to verify the behavior of `cleanup_expired_logs_for` and its use of safe checkpoints. + #[tokio::test] + async fn test_delete_expired_logs_safe_checkpoint() { + // For additional tracing: + // let _ = pretty_env_logger::try_init(); + let mut table = fs_common::create_table( + "../test/tests/data/checkpoints_with_expired_logs/expired_with_checkpoint", + Some(hashmap! { + TableProperty::LogRetentionDuration.as_ref().into() => Some("interval 10 minute".to_string()), + TableProperty::EnableExpiredLogCleanup.as_ref().into() => Some("true".to_string()) + }), + ) + .await; + + let table_path = table.table_uri(); + let set_file_last_modified = |version: usize, last_modified_millis: u64, suffix: &str| { + let path = format!("{table_path}_delta_log/{version:020}.{suffix}"); + let file = OpenOptions::new().write(true).open(path).unwrap(); + let last_modified = SystemTime::now().sub(Duration::from_millis(last_modified_millis)); + let times = FileTimes::new() + .set_modified(last_modified) + .set_accessed(last_modified); + file.set_times(times).unwrap(); + }; + + // create 4 commits + let a1 = fs_common::add(0); + let a2 = fs_common::add(0); + let a3 = fs_common::add(0); + let a4 = fs_common::add(0); + assert_eq!(1, fs_common::commit_add(&mut table, &a1).await); + assert_eq!(2, fs_common::commit_add(&mut table, &a2).await); + assert_eq!(3, fs_common::commit_add(&mut table, &a3).await); + assert_eq!(4, fs_common::commit_add(&mut table, &a4).await); + + // set last_modified + set_file_last_modified(0, 25 * 60 * 1000, "json"); // 25 mins ago, should be deleted + set_file_last_modified(1, 20 * 60 * 1000, "json"); // 20 mins ago, last safe checkpoint, should be kept + set_file_last_modified(2, 15 * 60 * 1000, "json"); // 15 mins ago, fails retention cutoff, but needed by v3 + set_file_last_modified(3, 6 * 60 * 1000, "json"); // 6 mins ago, should be kept by retention cutoff + set_file_last_modified(4, 5 * 60 * 1000, "json"); // 5 mins ago, should be kept by retention cutoff + + table.load_version(0).await.expect("Cannot load version 0"); + table.load_version(1).await.expect("Cannot load version 1"); + table.load_version(2).await.expect("Cannot load version 2"); + table.load_version(3).await.expect("Cannot load version 3"); + table.load_version(4).await.expect("Cannot load version 4"); + + // Create checkpoint for version 1 + checkpoints::create_checkpoint_from_table_uri_and_cleanup( + deltalake_core::ensure_table_uri(&table.table_uri()).unwrap(), + 1, + Some(false), + None, + ) + .await + .unwrap(); + + // Update checkpoint time for version 1 to be just after version 1 data + set_file_last_modified(1, 20 * 60 * 1000 - 10, "checkpoint.parquet"); + + // Checkpoint final version + checkpoints::create_checkpoint_from_table_uri_and_cleanup( + deltalake_core::ensure_table_uri(&table.table_uri()).unwrap(), + table.version().unwrap(), + None, + None, + ) + .await + .unwrap(); + + table.update().await.unwrap(); // make table to read the checkpoint + assert_eq!( + table + .snapshot() + .unwrap() + .file_paths_iter() + .collect::>(), + vec![ + ObjectStorePath::from(a4.path.as_ref()), + ObjectStorePath::from(a3.path.as_ref()), + ObjectStorePath::from(a2.path.as_ref()), + ObjectStorePath::from(a1.path.as_ref()), + ] + ); + + // Without going back to a safe checkpoint, previously loading version 3 would fail. + table.load_version(3).await.expect("Cannot load version 3"); + table.load_version(4).await.expect("Cannot load version 4"); + } + #[tokio::test] async fn test_not_delete_expired_logs() { let mut table = fs_common::create_table( diff --git a/crates/core/tests/integration_checkpoint.rs b/crates/core/tests/integration_checkpoint.rs index a446adf72c..92c6b6e512 100644 --- a/crates/core/tests/integration_checkpoint.rs +++ b/crates/core/tests/integration_checkpoint.rs @@ -106,15 +106,15 @@ async fn test_issue_1420_cleanup_expired_logs_for() -> DeltaResult<()> { writer.write(vec![json!({"id": 1})]).await?; writer.flush_and_commit(&mut table).await?; // v1 - let ts = Utc::now(); // use this ts for log retention expiry - sleep(Duration::from_secs(1)).await; - writer.write(vec![json!({"id": 2})]).await?; writer.flush_and_commit(&mut table).await?; // v2 assert_eq!(table.version(), Some(2)); create_checkpoint(&table, None).await.unwrap(); // v2.checkpoint.parquet + sleep(Duration::from_secs(1)).await; + let ts = Utc::now(); // use this ts for log retention expiry + // Should delete v1 but not v2 or v2.checkpoint.parquet cleanup_expired_logs_for( table.version().unwrap(), diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index 6abfc0e8b2..26ab4897d5 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -95,8 +95,9 @@ def test_cleanup_metadata(tmp_path: pathlib.Path, sample_table: Table): ) third_log_path = tmp_table_path / "_delta_log" / "00000000000000000002.json" - assert not first_log_path.exists() - assert not first_failed_log_path.exists() + # These first two files are kept because there is no safe checkpoint to make them obsolete + assert first_log_path.exists() + assert first_failed_log_path.exists() assert second_log_path.exists() assert third_log_path.exists() assert second_failed_log_path.exists() @@ -119,8 +120,9 @@ def test_cleanup_metadata_log_cleanup_hook(tmp_path: pathlib.Path, sample_table: ) third_log_path = tmp_table_path / "_delta_log" / "00000000000000000002.json" - assert not first_log_path.exists() - assert not first_failed_log_path.exists() + # These first two files are kept because there is no safe checkpoint to make them obsolete + assert first_log_path.exists() + assert first_failed_log_path.exists() assert second_log_path.exists() assert third_log_path.exists() assert second_failed_log_path.exists() From 99d733d9bf37888a5b9427528c1720428874ea41 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 22 Sep 2025 13:44:31 +0000 Subject: [PATCH 2/8] chore: introduce a regression test and some minor cleanup Signed-off-by: R. Tyler Croy --- crates/core/src/logstore/mod.rs | 6 +-- crates/core/src/protocol/checkpoints.rs | 15 ++---- python/tests/test_cleanup.py | 63 +++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 13 deletions(-) create mode 100644 python/tests/test_cleanup.py diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index ec9ffd9991..983e2e64ca 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -161,6 +161,9 @@ pub type LogStoreRef = Arc; static DELTA_LOG_PATH: LazyLock = LazyLock::new(|| Path::from("_delta_log")); +pub(crate) static DELTA_LOG_REGEX: LazyLock = + LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap()); + /// Return the [LogStoreRef] for the provided [Url] location /// /// This will use the built-in process global [crate::storage::ObjectStoreRegistry] by default @@ -637,9 +640,6 @@ impl<'de> Deserialize<'de> for LogStoreConfig { } } -static DELTA_LOG_REGEX: LazyLock = - LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap()); - /// Extract version from a file name in the delta log pub fn extract_version_from_filename(name: &str) -> Option { DELTA_LOG_REGEX diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index df5ab004f1..b1df260238 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -22,11 +22,14 @@ use tokio::task::spawn_blocking; use tracing::{debug, error, warn}; use uuid::Uuid; -use crate::logstore::{LogStore, LogStoreExt}; +use crate::logstore::{LogStore, LogStoreExt, DELTA_LOG_REGEX}; use crate::table::config::TablePropertiesExt as _; use crate::{open_table_with_version, DeltaTable}; use crate::{DeltaResult, DeltaTableError}; +static CHECKPOINT_REGEX: LazyLock = + LazyLock::new(|| Regex::new(r"_delta_log/(\d{20})\.(checkpoint).*$").unwrap()); + /// Creates checkpoint for a given table version, table state and object store pub(crate) async fn create_checkpoint_for( version: u64, @@ -204,13 +207,7 @@ pub async fn cleanup_expired_logs_for( operation_id: Option, ) -> DeltaResult { debug!("called cleanup_expired_logs_for"); - static DELTA_LOG_REGEX: LazyLock = LazyLock::new(|| { - Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint|json.tmp).*$").unwrap() - }); - static CHECKPOINT_REGEX: LazyLock = - LazyLock::new(|| Regex::new(r"_delta_log/(\d{20})\.(checkpoint).*$").unwrap()); - - let object_store = log_store.object_store(None); + let object_store = log_store.object_store(operation_id); let log_path = log_store.log_path(); // List all log entries under _delta_log @@ -271,8 +268,6 @@ pub async fn cleanup_expired_logs_for( debug!("safe_checkpoint_version: {}", safe_checkpoint_version); // Step 4: Delete DELTA_LOG files where log_ver < safe_checkpoint_version && ts <= cutoff_timestamp - let object_store = log_store.object_store(operation_id); - let locations = futures::stream::iter(log_entries.into_iter()) .filter_map(|meta: Result| async move { let meta = match meta { diff --git a/python/tests/test_cleanup.py b/python/tests/test_cleanup.py new file mode 100644 index 0000000000..f2502f2a6f --- /dev/null +++ b/python/tests/test_cleanup.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 + +import pathlib +import pytest +import sys +from deltalake import DeltaTable, write_deltalake +import pandas as pd +import shutil +import os +import traceback + +def clean_data_dir(data_path): + if os.path.exists(data_path): + try: + shutil.rmtree(data_path) + except Exception as e: + print(f"Error deleting directory {data_path}: {e}") + + +def print_log_dir(path): + # Show log contents + log_path = os.path.join(path, "_delta_log") + if os.path.exists(log_path): + print("Delta log contents:") + for file in sorted(os.listdir(log_path)): + print(f" {file}") + +@pytest.mark.pandas +def test_failed_cleanup(tmp_path: pathlib.Path): + data_path = tmp_path + clean_data_dir(data_path) + + # write 10 versions of the data + for i in range(10): + ids = range(i*10, i*10+10) + strings = [f"str_{i}" for i in range(10)] + df = pd.DataFrame({"id": ids, "value": strings}) + write_deltalake(data_path, df, mode="overwrite", configuration={"delta.logRetentionDuration": "interval 0 day"}) + + # checkpoint final version + table = DeltaTable(data_path) + table.create_checkpoint() + + # show log contents + print("log contents before cleanup:") + print_log_dir(data_path) + + # Call cleanup metadata + table = DeltaTable(data_path, version=5) + # table.create_checkpoint() # Workaround, manually create checkpoint needed to load version >= 5 + table.cleanup_metadata() + + # show log contents + print("\n################################################") + print("log contents after cleanup:") + print_log_dir(data_path) + + # Load old version + table = DeltaTable(data_path, version=5) + df2 = table.to_pandas() + print("\n################################################") + print("Version 5 of the data:") + print(df2) From fbc100c33f9602902dc08f00260abd2e2edab372 Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Wed, 24 Sep 2025 16:21:50 -0700 Subject: [PATCH 3/8] Merge main and update tests as needed Signed-off-by: Corwin Joy --- crates/core/tests/checkpoint_writer.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/crates/core/tests/checkpoint_writer.rs b/crates/core/tests/checkpoint_writer.rs index 1d6b4f4f7c..94afa1bd86 100644 --- a/crates/core/tests/checkpoint_writer.rs +++ b/crates/core/tests/checkpoint_writer.rs @@ -188,7 +188,6 @@ mod delete_expired_delta_log_in_checkpoint { use ::object_store::path::Path as ObjectStorePath; use deltalake_core::table::config::TableProperty; use deltalake_core::*; - use maplit::hashmap; #[tokio::test] async fn test_delete_expired_logs() { @@ -276,10 +275,16 @@ mod delete_expired_delta_log_in_checkpoint { // let _ = pretty_env_logger::try_init(); let mut table = fs_common::create_table( "../test/tests/data/checkpoints_with_expired_logs/expired_with_checkpoint", - Some(hashmap! { - TableProperty::LogRetentionDuration.as_ref().into() => Some("interval 10 minute".to_string()), - TableProperty::EnableExpiredLogCleanup.as_ref().into() => Some("true".to_string()) - }), + Some(HashMap::from([ + ( + TableProperty::LogRetentionDuration.as_ref().into(), + Some("interval 10 minute".to_string()), + ), + ( + TableProperty::EnableExpiredLogCleanup.as_ref().into(), + Some("true".to_string()), + ), + ])), ) .await; From cabfd69c3bc322f5f88551efadbded142807c502 Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Thu, 25 Sep 2025 01:21:39 -0700 Subject: [PATCH 4/8] cargo fmt Signed-off-by: Corwin Joy --- crates/core/tests/checkpoint_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/tests/checkpoint_writer.rs b/crates/core/tests/checkpoint_writer.rs index 94afa1bd86..06753aff8c 100644 --- a/crates/core/tests/checkpoint_writer.rs +++ b/crates/core/tests/checkpoint_writer.rs @@ -286,7 +286,7 @@ mod delete_expired_delta_log_in_checkpoint { ), ])), ) - .await; + .await; let table_path = table.table_uri(); let set_file_last_modified = |version: usize, last_modified_millis: u64, suffix: &str| { From 160de9041010914e3d73704254875c42bdf6a22f Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Thu, 25 Sep 2025 01:27:22 -0700 Subject: [PATCH 5/8] Apply python ruff format Signed-off-by: Corwin Joy --- python/tests/test_cleanup.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/python/tests/test_cleanup.py b/python/tests/test_cleanup.py index f2502f2a6f..4235337161 100644 --- a/python/tests/test_cleanup.py +++ b/python/tests/test_cleanup.py @@ -9,6 +9,7 @@ import os import traceback + def clean_data_dir(data_path): if os.path.exists(data_path): try: @@ -25,6 +26,7 @@ def print_log_dir(path): for file in sorted(os.listdir(log_path)): print(f" {file}") + @pytest.mark.pandas def test_failed_cleanup(tmp_path: pathlib.Path): data_path = tmp_path @@ -32,10 +34,15 @@ def test_failed_cleanup(tmp_path: pathlib.Path): # write 10 versions of the data for i in range(10): - ids = range(i*10, i*10+10) + ids = range(i * 10, i * 10 + 10) strings = [f"str_{i}" for i in range(10)] df = pd.DataFrame({"id": ids, "value": strings}) - write_deltalake(data_path, df, mode="overwrite", configuration={"delta.logRetentionDuration": "interval 0 day"}) + write_deltalake( + data_path, + df, + mode="overwrite", + configuration={"delta.logRetentionDuration": "interval 0 day"}, + ) # checkpoint final version table = DeltaTable(data_path) From 6541c98a7043fa0d3de6a993bff7114d6c20fc28 Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Thu, 25 Sep 2025 13:34:42 -0700 Subject: [PATCH 6/8] ruff check and fix Signed-off-by: Corwin Joy --- python/tests/test_cleanup.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/tests/test_cleanup.py b/python/tests/test_cleanup.py index 4235337161..d06227ae0f 100644 --- a/python/tests/test_cleanup.py +++ b/python/tests/test_cleanup.py @@ -1,13 +1,13 @@ #!/usr/bin/env python3 +import os import pathlib +import shutil + +import pandas as pd import pytest -import sys + from deltalake import DeltaTable, write_deltalake -import pandas as pd -import shutil -import os -import traceback def clean_data_dir(data_path): From cb186951cf0a1ef00481822dede01ef4accc641f Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Mon, 29 Sep 2025 14:20:42 -0700 Subject: [PATCH 7/8] Change test_cleanup.py to not use pandas to support CI test with no pandas Signed-off-by: Corwin Joy --- python/tests/test_cleanup.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/python/tests/test_cleanup.py b/python/tests/test_cleanup.py index d06227ae0f..4c8d648b3c 100644 --- a/python/tests/test_cleanup.py +++ b/python/tests/test_cleanup.py @@ -4,8 +4,9 @@ import pathlib import shutil -import pandas as pd import pytest +from arro3.core import Array, DataType, Table +from arro3.core import Field as ArrowField from deltalake import DeltaTable, write_deltalake @@ -27,21 +28,33 @@ def print_log_dir(path): print(f" {file}") +@pytest.fixture +def valid_gc_data() -> Table: + id_col = ArrowField("id", DataType.int32(), nullable=True) + gc = ArrowField("gc", DataType.int32(), nullable=True).with_metadata( + {"delta.generationExpression": "10"} + ) + data = Table.from_pydict( + {"id": Array([1, 2], type=id_col), "gc": Array([10, 10], type=gc)}, + ) + return data + + @pytest.mark.pandas -def test_failed_cleanup(tmp_path: pathlib.Path): +def test_failed_cleanup(tmp_path: pathlib.Path, valid_gc_data): data_path = tmp_path clean_data_dir(data_path) # write 10 versions of the data for i in range(10): - ids = range(i * 10, i * 10 + 10) - strings = [f"str_{i}" for i in range(10)] - df = pd.DataFrame({"id": ids, "value": strings}) write_deltalake( data_path, - df, mode="overwrite", - configuration={"delta.logRetentionDuration": "interval 0 day"}, + data=valid_gc_data, + configuration={ + "delta.minWriterVersion": "7", + "delta.logRetentionDuration": "interval 0 day", + }, ) # checkpoint final version From 071147913f3d4b4a19eff31b7dee270585b7238e Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Mon, 29 Sep 2025 14:39:32 -0700 Subject: [PATCH 8/8] Modify test_cleanup.py to make table data correspond to version to make it clear the correct version is retrieved Signed-off-by: Corwin Joy --- python/tests/test_cleanup.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/tests/test_cleanup.py b/python/tests/test_cleanup.py index 4c8d648b3c..bdc98a31cf 100644 --- a/python/tests/test_cleanup.py +++ b/python/tests/test_cleanup.py @@ -28,29 +28,29 @@ def print_log_dir(path): print(f" {file}") -@pytest.fixture -def valid_gc_data() -> Table: +def valid_gc_data(version) -> Table: id_col = ArrowField("id", DataType.int32(), nullable=True) gc = ArrowField("gc", DataType.int32(), nullable=True).with_metadata( {"delta.generationExpression": "10"} ) data = Table.from_pydict( - {"id": Array([1, 2], type=id_col), "gc": Array([10, 10], type=gc)}, + {"id": Array([version, version], type=id_col), "gc": Array([10, 10], type=gc)}, ) return data @pytest.mark.pandas -def test_failed_cleanup(tmp_path: pathlib.Path, valid_gc_data): +def test_failed_cleanup(tmp_path: pathlib.Path): data_path = tmp_path clean_data_dir(data_path) # write 10 versions of the data for i in range(10): + data = valid_gc_data(i) write_deltalake( data_path, mode="overwrite", - data=valid_gc_data, + data=data, configuration={ "delta.minWriterVersion": "7", "delta.logRetentionDuration": "interval 0 day",