Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ pub type LogStoreRef = Arc<dyn LogStore>;

static DELTA_LOG_PATH: LazyLock<Path> = LazyLock::new(|| Path::from("_delta_log"));

pub(crate) static DELTA_LOG_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap());

/// Return the [LogStoreRef] for the provided [Url] location
///
/// This will use the built-in process global [crate::storage::ObjectStoreRegistry] by default
Expand Down Expand Up @@ -637,9 +640,6 @@ impl<'de> Deserialize<'de> for LogStoreConfig {
}
}

static DELTA_LOG_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"(\d{20})\.(json|checkpoint(\.\d+)?\.parquet)$").unwrap());

/// Extract version from a file name in the delta log
pub fn extract_version_from_filename(name: &str) -> Option<i64> {
DELTA_LOG_REGEX
Expand Down
198 changes: 129 additions & 69 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use url::Url;

use arrow::compute::filter_record_batch;
use arrow_array::{BooleanArray, RecordBatch};
use chrono::Utc;
use chrono::{TimeZone, Utc};
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine_data::FilteredEngineData;
use delta_kernel::last_checkpoint_hint::LastCheckpointHint;
Expand All @@ -22,11 +22,14 @@ use tokio::task::spawn_blocking;
use tracing::{debug, error, warn};
use uuid::Uuid;

use crate::logstore::{LogStore, LogStoreExt};
use crate::logstore::{LogStore, LogStoreExt, DELTA_LOG_REGEX};
use crate::table::config::TablePropertiesExt as _;
use crate::{open_table_with_version, DeltaTable};
use crate::{DeltaResult, DeltaTableError};

static CHECKPOINT_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"_delta_log/(\d{20})\.(checkpoint).*$").unwrap());

/// Creates checkpoint for a given table version, table state and object store
pub(crate) async fn create_checkpoint_for(
version: u64,
Expand Down Expand Up @@ -181,92 +184,126 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
Ok(())
}

/// Deletes all delta log commits that are older than the cutoff time
/// and less than the specified version.
/// Delete expired Delta log files up to a safe checkpoint boundary.
///
/// This routine removes JSON commit files, in-progress JSON temp files, and
/// checkpoint files under `_delta_log/` that are both:
/// - older than the provided `cutoff_timestamp` (milliseconds since epoch), and
/// - strictly less than the provided `until_version`.
///
/// Safety guarantee:
/// To avoid deleting files that might still be required to reconstruct the
/// table state at or before the requested cutoff, the function first identifies
/// the most recent checkpoint whose version is `<= until_version` and whose file
/// modification time is `<= cutoff_timestamp`. Only files strictly older than
/// this checkpoint (both by version and timestamp) are considered for deletion.
/// If no such checkpoint exists (including when there is no `_last_checkpoint`),
/// the function performs no deletions and returns `Ok(0)`.
///
/// See also: https://github.com/delta-io/delta-rs/issues/3692 for background on
/// why cleanup must align to an existing checkpoint.
pub async fn cleanup_expired_logs_for(
until_version: i64,
mut keep_version: i64,
log_store: &dyn LogStore,
cutoff_timestamp: i64,
operation_id: Option<Uuid>,
) -> DeltaResult<usize> {
static DELTA_LOG_REGEX: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint|json.tmp).*$").unwrap()
});

let object_store = log_store.object_store(None);
debug!("called cleanup_expired_logs_for");
let object_store = log_store.object_store(operation_id);
let log_path = log_store.log_path();
let maybe_last_checkpoint = read_last_checkpoint(&object_store, log_path).await?;

let Some(last_checkpoint) = maybe_last_checkpoint else {
// List all log entries under _delta_log
let log_entries: Vec<Result<crate::ObjectMeta, _>> =
object_store.list(Some(log_path)).collect().await;

debug!("starting keep_version: {:?}", keep_version);
debug!(
"starting cutoff_timestamp: {:?}",
Utc.timestamp_millis_opt(cutoff_timestamp).unwrap()
);

// Step 1: Find min_retention_version among DELTA_LOG files with ts >= cutoff_timestamp
let min_retention_version = log_entries
.iter()
.filter_map(|m| m.as_ref().ok())
.filter_map(|m| {
let path = m.location.as_ref();
DELTA_LOG_REGEX
.captures(path)
.and_then(|caps| caps.get(1))
.and_then(|v| v.as_str().parse::<i64>().ok())
.map(|ver| (ver, m.last_modified.timestamp_millis()))
})
.filter(|(_, ts)| *ts >= cutoff_timestamp)
.map(|(ver, _)| ver)
.min();

let min_retention_version = min_retention_version.unwrap_or(keep_version);

// Step 2: Move keep_version down to the minimum version inside the retention period to make sure
// every version inside the retention period is kept.
keep_version = keep_version.min(min_retention_version);

// Step 3: Find safe checkpoint with checkpoint_version <= keep_version (no ts restriction)
let safe_checkpoint_version_opt = log_entries
.iter()
.filter_map(|m| m.as_ref().ok())
.filter_map(|m| {
let path = m.location.as_ref();
CHECKPOINT_REGEX
.captures(path)
.and_then(|caps| caps.get(1))
.and_then(|v| v.as_str().parse::<i64>().ok())
})
.filter(|ver| *ver <= keep_version)
.max();

// Exit if no safe_checkpoint file was found.
let Some(safe_checkpoint_version) = safe_checkpoint_version_opt else {
debug!(
"Not cleaning metadata files, could not find a checkpoint with version <= keep_version ({})",
keep_version
);
return Ok(0);
};

let until_version = i64::min(until_version, last_checkpoint.version as i64);
debug!("safe_checkpoint_version: {}", safe_checkpoint_version);

// Step 4: Delete DELTA_LOG files where log_ver < safe_checkpoint_version && ts <= cutoff_timestamp
let locations = futures::stream::iter(log_entries.into_iter())
.filter_map(|meta: Result<crate::ObjectMeta, _>| async move {
let meta = match meta {
Ok(m) => m,
Err(err) => {
error!("Error received while cleaning up expired logs: {err:?}");
return None;
}
};
let path_str = meta.location.as_ref();
let captures = DELTA_LOG_REGEX.captures(path_str)?;
let ts = meta.last_modified.timestamp_millis();
let log_ver_str = captures.get(1).unwrap().as_str();
let Ok(log_ver) = log_ver_str.parse::<i64>() else {
return None;
};
if log_ver < safe_checkpoint_version && ts <= cutoff_timestamp {
debug!("file to delete: {:?}", meta.location);
Some(Ok(meta.location))
} else {
None
}
})
.boxed();

// Feed a stream of candidate deletion files directly into the delete_stream
// function to try to improve the speed of cleanup and reduce the need for
// intermediate memory.
let object_store = log_store.object_store(operation_id);
let deleted = object_store
.delete_stream(
object_store
.list(Some(log_store.log_path()))
// This predicate function will filter out any locations that don't
// match the given timestamp range
.filter_map(|meta: Result<crate::ObjectMeta, _>| async move {
if meta.is_err() {
error!("Error received while cleaning up expired logs: {meta:?}");
return None;
}
let meta = meta.unwrap();
let ts = meta.last_modified.timestamp_millis();

match DELTA_LOG_REGEX.captures(meta.location.as_ref()) {
Some(captures) => {
let log_ver_str = captures.get(1).unwrap().as_str();
let log_ver: i64 = log_ver_str.parse().unwrap();
if log_ver < until_version && ts <= cutoff_timestamp {
// This location is ready to be deleted
Some(Ok(meta.location))
} else {
None
}
}
None => None,
}
})
.boxed(),
)
.delete_stream(locations)
.try_collect::<Vec<_>>()
.await?;

debug!("Deleted {} expired logs", deleted.len());
Ok(deleted.len())
}

/// Try reading the `_last_checkpoint` file.
///
/// Note that we typically want to ignore a missing/invalid `_last_checkpoint` file without failing
/// the read. Thus, the semantics of this function are to return `None` if the file is not found or
/// is invalid JSON. Unexpected/unrecoverable errors are returned as `Err` case and are assumed to
/// cause failure.
async fn read_last_checkpoint(
storage: &dyn ObjectStore,
log_path: &Path,
) -> DeltaResult<Option<LastCheckpointHint>> {
const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint";
let file_path = log_path.child(LAST_CHECKPOINT_FILE_NAME);
let maybe_data = storage.get(&file_path).await;
let data = match maybe_data {
Ok(data) => data.bytes().await?,
Err(Error::NotFound { .. }) => return Ok(None),
Err(err) => return Err(err.into()),
};
Ok(serde_json::from_slice(&data)
.inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}"))
.ok())
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand All @@ -285,6 +322,29 @@ mod tests {
use crate::writer::test_utils::get_delta_schema;
use crate::DeltaResult;

/// Try reading the `_last_checkpoint` file.
///
/// Note that we typically want to ignore a missing/invalid `_last_checkpoint` file without failing
/// the read. Thus, the semantics of this function are to return `None` if the file is not found or
/// is invalid JSON. Unexpected/unrecoverable errors are returned as `Err` case and are assumed to
/// cause failure.
async fn read_last_checkpoint(
storage: &dyn ObjectStore,
log_path: &Path,
) -> DeltaResult<Option<LastCheckpointHint>> {
const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint";
let file_path = log_path.child(LAST_CHECKPOINT_FILE_NAME);
let maybe_data = storage.get(&file_path).await;
let data = match maybe_data {
Ok(data) => data.bytes().await?,
Err(Error::NotFound { .. }) => return Ok(None),
Err(err) => return Err(err.into()),
};
Ok(serde_json::from_slice(&data)
.inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}"))
.ok())
}

#[tokio::test]
async fn test_create_checkpoint_for() {
let table_schema = get_delta_schema();
Expand Down
104 changes: 101 additions & 3 deletions crates/core/tests/checkpoint_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ mod delete_expired_delta_log_in_checkpoint {
Some(HashMap::from([
(
TableProperty::LogRetentionDuration.as_ref().into(),
Some("interval 10 minute".to_string()),
Some("interval 0 minute".to_string()),
),
(
TableProperty::EnableExpiredLogCleanup.as_ref().into(),
Expand Down Expand Up @@ -225,8 +225,8 @@ mod delete_expired_delta_log_in_checkpoint {

// set last_modified
set_file_last_modified(0, 25 * 60 * 1000); // 25 mins ago, should be deleted
set_file_last_modified(1, 15 * 60 * 1000); // 25 mins ago, should be deleted
set_file_last_modified(2, 5 * 60 * 1000); // 25 mins ago, should be kept
set_file_last_modified(1, 15 * 60 * 1000); // 15 mins ago, should be deleted
set_file_last_modified(2, 5 * 60 * 1000); // 5 mins ago, should be kept as last safe checkpoint

table.load_version(0).await.expect("Cannot load version 0");
table.load_version(1).await.expect("Cannot load version 1");
Expand Down Expand Up @@ -267,6 +267,104 @@ mod delete_expired_delta_log_in_checkpoint {
table.load_version(2).await.expect("Cannot load version 2");
}

// Test to verify that intermediate versions can still be loaded after the checkpoint is created.
// This is to verify the behavior of `cleanup_expired_logs_for` and its use of safe checkpoints.
#[tokio::test]
async fn test_delete_expired_logs_safe_checkpoint() {
// For additional tracing:
// let _ = pretty_env_logger::try_init();
let mut table = fs_common::create_table(
"../test/tests/data/checkpoints_with_expired_logs/expired_with_checkpoint",
Some(HashMap::from([
(
TableProperty::LogRetentionDuration.as_ref().into(),
Some("interval 10 minute".to_string()),
),
(
TableProperty::EnableExpiredLogCleanup.as_ref().into(),
Some("true".to_string()),
),
])),
)
.await;

let table_path = table.table_uri();
let set_file_last_modified = |version: usize, last_modified_millis: u64, suffix: &str| {
let path = format!("{table_path}_delta_log/{version:020}.{suffix}");
let file = OpenOptions::new().write(true).open(path).unwrap();
let last_modified = SystemTime::now().sub(Duration::from_millis(last_modified_millis));
let times = FileTimes::new()
.set_modified(last_modified)
.set_accessed(last_modified);
file.set_times(times).unwrap();
};

// create 4 commits
let a1 = fs_common::add(0);
let a2 = fs_common::add(0);
let a3 = fs_common::add(0);
let a4 = fs_common::add(0);
assert_eq!(1, fs_common::commit_add(&mut table, &a1).await);
assert_eq!(2, fs_common::commit_add(&mut table, &a2).await);
assert_eq!(3, fs_common::commit_add(&mut table, &a3).await);
assert_eq!(4, fs_common::commit_add(&mut table, &a4).await);

// set last_modified
set_file_last_modified(0, 25 * 60 * 1000, "json"); // 25 mins ago, should be deleted
set_file_last_modified(1, 20 * 60 * 1000, "json"); // 20 mins ago, last safe checkpoint, should be kept
set_file_last_modified(2, 15 * 60 * 1000, "json"); // 15 mins ago, fails retention cutoff, but needed by v3
set_file_last_modified(3, 6 * 60 * 1000, "json"); // 6 mins ago, should be kept by retention cutoff
set_file_last_modified(4, 5 * 60 * 1000, "json"); // 5 mins ago, should be kept by retention cutoff

table.load_version(0).await.expect("Cannot load version 0");
table.load_version(1).await.expect("Cannot load version 1");
table.load_version(2).await.expect("Cannot load version 2");
table.load_version(3).await.expect("Cannot load version 3");
table.load_version(4).await.expect("Cannot load version 4");

// Create checkpoint for version 1
checkpoints::create_checkpoint_from_table_uri_and_cleanup(
deltalake_core::ensure_table_uri(&table.table_uri()).unwrap(),
1,
Some(false),
None,
)
.await
.unwrap();

// Update checkpoint time for version 1 to be just after version 1 data
set_file_last_modified(1, 20 * 60 * 1000 - 10, "checkpoint.parquet");

// Checkpoint final version
checkpoints::create_checkpoint_from_table_uri_and_cleanup(
deltalake_core::ensure_table_uri(&table.table_uri()).unwrap(),
table.version().unwrap(),
None,
None,
)
.await
.unwrap();

table.update().await.unwrap(); // make table to read the checkpoint
assert_eq!(
table
.snapshot()
.unwrap()
.file_paths_iter()
.collect::<Vec<_>>(),
vec![
ObjectStorePath::from(a4.path.as_ref()),
ObjectStorePath::from(a3.path.as_ref()),
ObjectStorePath::from(a2.path.as_ref()),
ObjectStorePath::from(a1.path.as_ref()),
]
);

// Without going back to a safe checkpoint, previously loading version 3 would fail.
table.load_version(3).await.expect("Cannot load version 3");
table.load_version(4).await.expect("Cannot load version 4");
}

#[tokio::test]
async fn test_not_delete_expired_logs() {
let mut table = fs_common::create_table(
Expand Down
Loading
Loading