Skip to content

Commit 95e6a35

Browse files
corwinjoyadamreeve
authored andcommitted
Upgrade load_with_datetime to ignore any uncommited deltas in any subdirectory of delta_log.
Signed-off-by: Corwin Joy <[email protected]> Co-authored-by: Adam Reeve <[email protected]>
1 parent e985f95 commit 95e6a35

File tree

5 files changed

+36
-4
lines changed

5 files changed

+36
-4
lines changed

crates/core/src/logstore/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,14 +513,25 @@ pub async fn get_latest_version(
513513
// list files to find max version
514514
let version = async {
515515
let mut max_version: i64 = version_start;
516-
let prefix = Some(log_store.log_path());
516+
let prefix = log_store.log_path();
517517
let offset_path = commit_uri_from_version(max_version);
518518
let object_store = log_store.object_store(None);
519-
let mut files = object_store.list_with_offset(prefix, &offset_path);
519+
let mut files = object_store.list_with_offset(Some(prefix), &offset_path);
520520
let mut empty_stream = true;
521521

522522
while let Some(obj_meta) = files.next().await {
523523
let obj_meta = obj_meta?;
524+
let location_path: &Path = &obj_meta.location;
525+
let part_count = location_path.prefix_match(prefix).unwrap().count();
526+
if part_count > 1 {
527+
// Per the spec, ignore any files in subdirectories.
528+
// Spark may create these as uncommited transactions which we don't want
529+
//
530+
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#delta-log-entries
531+
// "Delta files are stored as JSON in a directory at the *root* of the table
532+
// named _delta_log, and ... make up the log of all changes that have occurred to a table."
533+
continue;
534+
}
524535
if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
525536
max_version = max(max_version, log_version);
526537
// also cache timestamp for version, for faster time-travel

crates/core/src/table/mod.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -561,13 +561,24 @@ impl DeltaTable {
561561
) -> Result<(), DeltaTableError> {
562562
let mut min_version: i64 = -1;
563563
let log_store = self.log_store();
564-
let prefix = Some(log_store.log_path());
564+
let prefix = log_store.log_path();
565565
let offset_path = commit_uri_from_version(min_version);
566566
let object_store = log_store.object_store(None);
567-
let mut files = object_store.list_with_offset(prefix, &offset_path);
567+
let mut files = object_store.list_with_offset(Some(prefix), &offset_path);
568568

569569
while let Some(obj_meta) = files.next().await {
570570
let obj_meta = obj_meta?;
571+
let location_path: Path = obj_meta.location.clone();
572+
let part_count = location_path.prefix_match(prefix).unwrap().count();
573+
if part_count > 1 {
574+
// Per the spec, ignore any files in subdirectories.
575+
// Spark may create these as uncommited transactions which we don't want
576+
//
577+
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#delta-log-entries
578+
// "Delta files are stored as JSON in a directory at the *root* of the table
579+
// named _delta_log, and ... make up the log of all changes that have occurred to a table."
580+
continue;
581+
}
571582
if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
572583
if min_version == -1 {
573584
min_version = log_version

crates/core/tests/time_travel.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use std::time::SystemTime;
55

66
#[tokio::test]
77
async fn time_travel_by_ds() {
8+
// test time travel on a table with an uncommited delta in a .tmp subfolder
9+
810
// git does not preserve mtime, so we need to manually set it in the test
911
let log_dir = "../test/tests/data/simple_table/_delta_log";
1012
let log_mtime_pair = vec![
@@ -13,6 +15,11 @@ async fn time_travel_by_ds() {
1315
("00000000000000000002.json", "2020-05-03T22:47:31-07:00"),
1416
("00000000000000000003.json", "2020-05-04T22:47:31-07:00"),
1517
("00000000000000000004.json", "2020-05-05T22:47:31-07:00"),
18+
// Final file is uncommitted by Spark and is in a .tmp subdir
19+
(
20+
".tmp/00000000000000000005.json",
21+
"2020-05-06T22:47:31-07:00",
22+
),
1623
];
1724
for (fname, ds) in log_mtime_pair {
1825
let ts: SystemTime = ds_to_ts(ds).into();
@@ -79,6 +86,7 @@ async fn time_travel_by_ds() {
7986
.unwrap();
8087
assert_eq!(table.version(), 4);
8188

89+
// Final append in .tmp subdir is uncommited and should be ignored
8290
table = deltalake_core::open_table_with_ds(
8391
"../test/tests/data/simple_table",
8492
"2020-05-25T22:47:31-07:00",
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{"commitInfo":{"timestamp":1587968626637,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"readVersion":4,"isBlindAppend":true}}
2+
{"add":{"path":"part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c001.snappy.parquet","partitionValues":{},"size":262,"modificationTime":1587968626600,"dataChange":true}}

0 commit comments

Comments
 (0)