Skip to content

fix: ignore temp log entries #3423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ async fn list_log_files_with_checkpoint(
let version_prefix = format!("{:020}", cp.version);
let start_from = log_root.child(version_prefix.as_str());

// QUESTION: DOES THIS NEED TO BE FILTERED TO ELIMINATE temp subdirs?
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function, plus the one below may need to be fixed as well to ignore temp subdirectories but I could use some feedback here + pointers to the associated unit test.

let files = fs_client
.list_with_offset(Some(log_root), &start_from)
.try_collect::<Vec<_>>()
Expand Down Expand Up @@ -542,6 +543,7 @@ pub(super) async fn list_log_files(
let mut commit_files = Vec::with_capacity(25);
let mut checkpoint_files = Vec::with_capacity(10);

// QUESTION: Does this need to be filtered to exclude subdirs?
for meta in fs_client
.list_with_offset(Some(log_root), &start_from)
.try_collect::<Vec<_>>()
Expand Down
15 changes: 13 additions & 2 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,14 +534,25 @@ pub async fn get_latest_version(
// list files to find max version
let version = async {
let mut max_version: i64 = version_start;
let prefix = Some(log_store.log_path());
let prefix = log_store.log_path();
let offset_path = commit_uri_from_version(max_version);
let object_store = log_store.object_store(None);
let mut files = object_store.list_with_offset(prefix, &offset_path);
let mut files = object_store.list_with_offset(Some(prefix), &offset_path);
let mut empty_stream = true;

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
let location_path: &Path = &obj_meta.location;
let part_count = location_path.prefix_match(prefix).unwrap().count();
if part_count > 1 {
// Per the spec, ignore any files in subdirectories.
// Spark may create these as uncommited transactions which we don't want
//
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#delta-log-entries
// "Delta files are stored as JSON in a directory at the *root* of the table
// named _delta_log, and ... make up the log of all changes that have occurred to a table."
continue;
}
if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
max_version = max(max_version, log_version);
// also cache timestamp for version, for faster time-travel
Expand Down
15 changes: 13 additions & 2 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,13 +564,24 @@ impl DeltaTable {
) -> Result<(), DeltaTableError> {
let mut min_version: i64 = -1;
let log_store = self.log_store();
let prefix = Some(log_store.log_path());
let prefix = log_store.log_path();
let offset_path = commit_uri_from_version(min_version);
let object_store = log_store.object_store(None);
let mut files = object_store.list_with_offset(prefix, &offset_path);
let mut files = object_store.list_with_offset(Some(prefix), &offset_path);

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
let location_path: Path = obj_meta.location.clone();
let part_count = location_path.prefix_match(prefix).unwrap().count();
if part_count > 1 {
// Per the spec, ignore any files in subdirectories.
// Spark may create these as uncommited transactions which we don't want
//
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#delta-log-entries
// "Delta files are stored as JSON in a directory at the *root* of the table
// named _delta_log, and ... make up the log of all changes that have occurred to a table."
continue;
}
if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
if min_version == -1 {
min_version = log_version
Expand Down
8 changes: 8 additions & 0 deletions crates/core/tests/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use std::time::SystemTime;

#[tokio::test]
async fn time_travel_by_ds() {
// test time travel on a table with an uncommited delta in a .tmp subfolder

// git does not preserve mtime, so we need to manually set it in the test
let log_dir = "../test/tests/data/simple_table/_delta_log";
let log_mtime_pair = vec![
Expand All @@ -13,6 +15,11 @@ async fn time_travel_by_ds() {
("00000000000000000002.json", "2020-05-03T22:47:31-07:00"),
("00000000000000000003.json", "2020-05-04T22:47:31-07:00"),
("00000000000000000004.json", "2020-05-05T22:47:31-07:00"),
// Final file is uncommitted by Spark and is in a .tmp subdir

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this fix ignores any sub-directory, the test should also contain a testcase of a sub-directory not starting with a dot.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need a second test case? The name of the subdirectory does not matter, any and all files in subdirectories are ignored. I only used the name .tmp to clarify what is going on / better match what spark does.

(
".tmp/00000000000000000005.json",
"2020-05-06T22:47:31-07:00",
),
];
for (fname, ds) in log_mtime_pair {
let ts: SystemTime = ds_to_ts(ds).into();
Expand Down Expand Up @@ -79,6 +86,7 @@ async fn time_travel_by_ds() {
.unwrap();
assert_eq!(table.version(), 4);

// Final append in .tmp subdir is uncommited and should be ignored
table = deltalake_core::open_table_with_ds(
"../test/tests/data/simple_table",
"2020-05-25T22:47:31-07:00",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1587968626637,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":4,"isBlindAppend":true}}
{"add":{"path":"part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c001.snappy.parquet","partitionValues":{},"size":262,"modificationTime":1587968626600,"dataChange":true}}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the pending operation in the .tmp subfolder + parquet file below.
Essentially just copied from existing transactions.

Binary file not shown.