Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ pub type LogStoreRef = Arc<dyn LogStore>;

static DELTA_LOG_PATH: LazyLock<Path> = LazyLock::new(|| Path::from("_delta_log"));

pub(crate) static DELTA_LOG_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap());

/// Return the [LogStoreRef] for the provided [Url] location
///
/// This will use the built-in process global [crate::storage::ObjectStoreRegistry] by default
Expand Down Expand Up @@ -637,9 +640,6 @@ impl<'de> Deserialize<'de> for LogStoreConfig {
}
}

static DELTA_LOG_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap());

/// Extract version from a file name in the delta log
pub fn extract_version_from_filename(name: &str) -> Option<i64> {
DELTA_LOG_REGEX
Expand Down
198 changes: 129 additions & 69 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use url::Url;

use arrow::compute::filter_record_batch;
use arrow_array::{BooleanArray, RecordBatch};
use chrono::Utc;
use chrono::{TimeZone, Utc};
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine_data::FilteredEngineData;
use delta_kernel::last_checkpoint_hint::LastCheckpointHint;
Expand All @@ -22,11 +22,14 @@ use tokio::task::spawn_blocking;
use tracing::{debug, error, warn};
use uuid::Uuid;

use crate::logstore::{LogStore, LogStoreExt};
use crate::logstore::{LogStore, LogStoreExt, DELTA_LOG_REGEX};
use crate::table::config::TablePropertiesExt as _;
use crate::{open_table_with_version, DeltaTable};
use crate::{DeltaResult, DeltaTableError};

static CHECKPOINT_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"_delta_log/(\d{20})\.(checkpoint).*$").unwrap());

/// Creates checkpoint for a given table version, table state and object store
pub(crate) async fn create_checkpoint_for(
version: u64,
Expand Down Expand Up @@ -181,92 +184,126 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
Ok(())
}

/// Deletes all delta log commits that are older than the cutoff time
/// and less than the specified version.
/// Delete expired Delta log files up to a safe checkpoint boundary.
///
/// This routine removes JSON commit files, in-progress JSON temp files, and
/// checkpoint files under `_delta_log/` that are both:
/// - older than the provided `cutoff_timestamp` (milliseconds since epoch), and
/// - strictly less than the provided `until_version`.
///
/// Safety guarantee:
/// To avoid deleting files that might still be required to reconstruct the
/// table state at or before the requested cutoff, the function first identifies
/// the most recent checkpoint whose version is `<= until_version` and whose file
/// modification time is `<= cutoff_timestamp`. Only files strictly older than
/// this checkpoint (both by version and timestamp) are considered for deletion.
/// If no such checkpoint exists (including when there is no `_last_checkpoint`),
/// the function performs no deletions and returns `Ok(0)`.
///
/// See also: https://github.com/delta-io/delta-rs/issues/3692 for background on
/// why cleanup must align to an existing checkpoint.
pub async fn cleanup_expired_logs_for(
until_version: i64,
mut keep_version: i64,
log_store: &dyn LogStore,
cutoff_timestamp: i64,
operation_id: Option<Uuid>,
) -> DeltaResult<usize> {
static DELTA_LOG_REGEX: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint|json.tmp).*$").unwrap()
});

let object_store = log_store.object_store(None);
debug!("called cleanup_expired_logs_for");
let object_store = log_store.object_store(operation_id);
let log_path = log_store.log_path();
let maybe_last_checkpoint = read_last_checkpoint(&object_store, log_path).await?;

let Some(last_checkpoint) = maybe_last_checkpoint else {
// List all log entries under _delta_log
let log_entries: Vec<Result<crate::ObjectMeta, _>> =
object_store.list(Some(log_path)).collect().await;

debug!("starting keep_version: {:?}", keep_version);
debug!(
"starting cutoff_timestamp: {:?}",
Utc.timestamp_millis_opt(cutoff_timestamp).unwrap()
);

// Step 1: Find min_retention_version among DELTA_LOG files with ts >= cutoff_timestamp
let min_retention_version = log_entries
.iter()
.filter_map(|m| m.as_ref().ok())
.filter_map(|m| {
let path = m.location.as_ref();
DELTA_LOG_REGEX
.captures(path)
.and_then(|caps| caps.get(1))
.and_then(|v| v.as_str().parse::<i64>().ok())
.map(|ver| (ver, m.last_modified.timestamp_millis()))
})
.filter(|(_, ts)| *ts >= cutoff_timestamp)
.map(|(ver, _)| ver)
.min();

let min_retention_version = min_retention_version.unwrap_or(keep_version);

// Step 2: Move keep_version down to the minimum version inside the retention period to make sure
// every version inside the retention period is kept.
keep_version = keep_version.min(min_retention_version);

// Step 3: Find safe checkpoint with checkpoint_version <= keep_version (no ts restriction)
let safe_checkpoint_version_opt = log_entries
.iter()
.filter_map(|m| m.as_ref().ok())
.filter_map(|m| {
let path = m.location.as_ref();
CHECKPOINT_REGEX
.captures(path)
.and_then(|caps| caps.get(1))
.and_then(|v| v.as_str().parse::<i64>().ok())
})
.filter(|ver| *ver <= keep_version)
.max();

// Exit if no safe_checkpoint file was found.
let Some(safe_checkpoint_version) = safe_checkpoint_version_opt else {
debug!(
"Not cleaning metadata files, could not find a checkpoint with version <= keep_version ({})",
keep_version
);
return Ok(0);
};

let until_version = i64::min(until_version, last_checkpoint.version as i64);
debug!("safe_checkpoint_version: {}", safe_checkpoint_version);

// Step 4: Delete DELTA_LOG files where log_ver < safe_checkpoint_version && ts <= cutoff_timestamp
let locations = futures::stream::iter(log_entries.into_iter())
.filter_map(|meta: Result<crate::ObjectMeta, _>| async move {
let meta = match meta {
Ok(m) => m,
Err(err) => {
error!("Error received while cleaning up expired logs: {err:?}");
return None;
}
};
let path_str = meta.location.as_ref();
let captures = DELTA_LOG_REGEX.captures(path_str)?;
let ts = meta.last_modified.timestamp_millis();
let log_ver_str = captures.get(1).unwrap().as_str();
let Ok(log_ver) = log_ver_str.parse::<i64>() else {
return None;
};
if log_ver < safe_checkpoint_version && ts <= cutoff_timestamp {
debug!("file to delete: {:?}", meta.location);
Some(Ok(meta.location))
} else {
None
}
})
.boxed();

// Feed a stream of candidate deletion files directly into the delete_stream
// function to try to improve the speed of cleanup and reduce the need for
// intermediate memory.
let object_store = log_store.object_store(operation_id);
let deleted = object_store
.delete_stream(
object_store
.list(Some(log_store.log_path()))
// This predicate function will filter out any locations that don't
// match the given timestamp range
.filter_map(|meta: Result<crate::ObjectMeta, _>| async move {
if meta.is_err() {
error!("Error received while cleaning up expired logs: {meta:?}");
return None;
}
let meta = meta.unwrap();
let ts = meta.last_modified.timestamp_millis();

match DELTA_LOG_REGEX.captures(meta.location.as_ref()) {
Some(captures) => {
let log_ver_str = captures.get(1).unwrap().as_str();
let log_ver: i64 = log_ver_str.parse().unwrap();
if log_ver < until_version && ts <= cutoff_timestamp {
// This location is ready to be deleted
Some(Ok(meta.location))
} else {
None
}
}
None => None,
}
})
.boxed(),
)
.delete_stream(locations)
.try_collect::<Vec<_>>()
.await?;

debug!("Deleted {} expired logs", deleted.len());
Ok(deleted.len())
}

/// Try reading the `_last_checkpoint` file.
///
/// Note that we typically want to ignore a missing/invalid `_last_checkpoint` file without failing
/// the read. Thus, the semantics of this function are to return `None` if the file is not found or
/// is invalid JSON. Unexpected/unrecoverable errors are returned as `Err` case and are assumed to
/// cause failure.
async fn read_last_checkpoint(
storage: &dyn ObjectStore,
log_path: &Path,
) -> DeltaResult<Option<LastCheckpointHint>> {
const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint";
let file_path = log_path.child(LAST_CHECKPOINT_FILE_NAME);
let maybe_data = storage.get(&file_path).await;
let data = match maybe_data {
Ok(data) => data.bytes().await?,
Err(Error::NotFound { .. }) => return Ok(None),
Err(err) => return Err(err.into()),
};
Ok(serde_json::from_slice(&data)
.inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}"))
.ok())
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand All @@ -285,6 +322,29 @@ mod tests {
use crate::writer::test_utils::get_delta_schema;
use crate::DeltaResult;

/// Try reading the `_last_checkpoint` file.
///
/// Note that we typically want to ignore a missing/invalid `_last_checkpoint` file without failing
/// the read. Thus, the semantics of this function are to return `None` if the file is not found or
/// is invalid JSON. Unexpected/unrecoverable errors are returned as `Err` case and are assumed to
/// cause failure.
async fn read_last_checkpoint(
storage: &dyn ObjectStore,
log_path: &Path,
) -> DeltaResult<Option<LastCheckpointHint>> {
const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint";
let file_path = log_path.child(LAST_CHECKPOINT_FILE_NAME);
let maybe_data = storage.get(&file_path).await;
let data = match maybe_data {
Ok(data) => data.bytes().await?,
Err(Error::NotFound { .. }) => return Ok(None),
Err(err) => return Err(err.into()),
};
Ok(serde_json::from_slice(&data)
.inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}"))
.ok())
}

#[tokio::test]
async fn test_create_checkpoint_for() {
let table_schema = get_delta_schema();
Expand Down
104 changes: 101 additions & 3 deletions crates/core/tests/checkpoint_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ mod delete_expired_delta_log_in_checkpoint {
Some(HashMap::from([
(
TableProperty::LogRetentionDuration.as_ref().into(),
Some("interval 10 minute".to_string()),
Some("interval 0 minute".to_string()),
),
(
TableProperty::EnableExpiredLogCleanup.as_ref().into(),
Expand Down Expand Up @@ -225,8 +225,8 @@ mod delete_expired_delta_log_in_checkpoint {

// set last_modified
set_file_last_modified(0, 25 * 60 * 1000); // 25 mins ago, should be deleted
set_file_last_modified(1, 15 * 60 * 1000); // 25 mins ago, should be deleted
set_file_last_modified(2, 5 * 60 * 1000); // 25 mins ago, should be kept
set_file_last_modified(1, 15 * 60 * 1000); // 15 mins ago, should be deleted
set_file_last_modified(2, 5 * 60 * 1000); // 5 mins ago, should be kept as last safe checkpoint

table.load_version(0).await.expect("Cannot load version 0");
table.load_version(1).await.expect("Cannot load version 1");
Expand Down Expand Up @@ -267,6 +267,104 @@ mod delete_expired_delta_log_in_checkpoint {
table.load_version(2).await.expect("Cannot load version 2");
}

// Test to verify that intermediate versions can still be loaded after the checkpoint is created.
// This is to verify the behavior of `cleanup_expired_logs_for` and its use of safe checkpoints.
#[tokio::test]
async fn test_delete_expired_logs_safe_checkpoint() {
// For additional tracing:
// let _ = pretty_env_logger::try_init();
let mut table = fs_common::create_table(
"../test/tests/data/checkpoints_with_expired_logs/expired_with_checkpoint",
Some(HashMap::from([
(
TableProperty::LogRetentionDuration.as_ref().into(),
Some("interval 10 minute".to_string()),
),
(
TableProperty::EnableExpiredLogCleanup.as_ref().into(),
Some("true".to_string()),
),
])),
)
.await;

let table_path = table.table_uri();
let set_file_last_modified = |version: usize, last_modified_millis: u64, suffix: &str| {
let path = format!("{table_path}_delta_log/{version:020}.{suffix}");
let file = OpenOptions::new().write(true).open(path).unwrap();
let last_modified = SystemTime::now().sub(Duration::from_millis(last_modified_millis));
let times = FileTimes::new()
.set_modified(last_modified)
.set_accessed(last_modified);
file.set_times(times).unwrap();
};

// create 4 commits
let a1 = fs_common::add(0);
let a2 = fs_common::add(0);
let a3 = fs_common::add(0);
let a4 = fs_common::add(0);
assert_eq!(1, fs_common::commit_add(&mut table, &a1).await);
assert_eq!(2, fs_common::commit_add(&mut table, &a2).await);
assert_eq!(3, fs_common::commit_add(&mut table, &a3).await);
assert_eq!(4, fs_common::commit_add(&mut table, &a4).await);

// set last_modified
set_file_last_modified(0, 25 * 60 * 1000, "json"); // 25 mins ago, should be deleted
set_file_last_modified(1, 20 * 60 * 1000, "json"); // 20 mins ago, last safe checkpoint, should be kept
set_file_last_modified(2, 15 * 60 * 1000, "json"); // 15 mins ago, fails retention cutoff, but needed by v3
set_file_last_modified(3, 6 * 60 * 1000, "json"); // 6 mins ago, should be kept by retention cutoff
set_file_last_modified(4, 5 * 60 * 1000, "json"); // 5 mins ago, should be kept by retention cutoff

table.load_version(0).await.expect("Cannot load version 0");
table.load_version(1).await.expect("Cannot load version 1");
table.load_version(2).await.expect("Cannot load version 2");
table.load_version(3).await.expect("Cannot load version 3");
table.load_version(4).await.expect("Cannot load version 4");

// Create checkpoint for version 1
checkpoints::create_checkpoint_from_table_uri_and_cleanup(
deltalake_core::ensure_table_uri(&table.table_uri()).unwrap(),
1,
Some(false),
None,
)
.await
.unwrap();

// Update checkpoint time for version 1 to be just after version 1 data
set_file_last_modified(1, 20 * 60 * 1000 - 10, "checkpoint.parquet");

// Checkpoint final version
checkpoints::create_checkpoint_from_table_uri_and_cleanup(
deltalake_core::ensure_table_uri(&table.table_uri()).unwrap(),
table.version().unwrap(),
None,
None,
)
.await
.unwrap();

table.update().await.unwrap(); // make table to read the checkpoint
assert_eq!(
table
.snapshot()
.unwrap()
.file_paths_iter()
.collect::<Vec<_>>(),
vec![
ObjectStorePath::from(a4.path.as_ref()),
ObjectStorePath::from(a3.path.as_ref()),
ObjectStorePath::from(a2.path.as_ref()),
ObjectStorePath::from(a1.path.as_ref()),
]
);

// Without going back to a safe checkpoint, previously loading version 3 would fail.
table.load_version(3).await.expect("Cannot load version 3");
table.load_version(4).await.expect("Cannot load version 4");
}

#[tokio::test]
async fn test_not_delete_expired_logs() {
let mut table = fs_common::create_table(
Expand Down
Loading
Loading