Skip to content

Conversation

@OussamaSaoudi-db
Copy link
Collaborator

@OussamaSaoudi-db OussamaSaoudi-db commented Nov 15, 2024

What changes are proposed in this pull request?

This introduces two methods to construct LogSegment. The first is constructing LogSegment for Snapshots using LogSegment::for_snapshot. The second constructs LogSegment for the upcoming TableChanges type.

This PR also refactors log listing functions to reduce duplication in the code. We do so by creating a function get_parsed_log_files_iter to list, filter, and parse log files.

This adds a test function to test-utils called delta_path_for_multipart_checkpoint. This function can be used to create a multipart checkpoint path.

This replaces the changes proposed in #457

How was this change tested?

This change introduces tests for the following:

  • reading log with out of date checkpoint hint
  • reading log with up to date checkpoint hint
  • creating snapshot log segment without a checkpoint hint
  • Creating snapshot with a multi-part checkpoint
  • Multipart checkpoint with incorrect number of parts fails.
  • creating snapshot with a start checkpoint and an end time travel version
  • Creating a snapshot with a checkpoint hint higher than the time travel version
  • Creating log segments for table changes
  • Checking contiguity of the log is always preserved.
  • Checking that for_table_changes fails when the start version > end_version

This PR also adds an ignored test that checks for desired behaviour. The test build_snapshot_with_missing_checkpoint_part_no_hint checks that an incomplete checkpoint is not used in a LogSegment. A checkpoint is incomplete if it does not have all the parts specified in LogPathFileType::MultiPartCheckpoint.num_parts.

Copy link
Member

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushing comments

Comment on lines 111 to 114
// Commit file versions must be greater than the most recent checkpoint version if it exists
if let Some(checkpoint_file) = checkpoint_parts.first() {
sorted_commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we put this in LogSegment::try_new

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CDF constructor doesn't even have checkpoint files, does it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of try_new as a function that simply validates instead of processing commit/checkpoint files.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich You're right, we filter out all checkpoints.

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much cleaner than the old code.

let version_eff = sorted_commit_files
.last()
.or(checkpoint_parts.first())
.ok_or(Error::MissingVersion)? // TODO: A more descriptive error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means there are no files at all... so not just a missing version.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to use an Error::generic here, or create an new error variant?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems fine to do generic error for now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"no files in log segment" or EmptyLogSegment new error?

Comment on lines 111 to 114
// Commit file versions must be greater than the most recent checkpoint version if it exists
if let Some(checkpoint_file) = checkpoint_parts.first() {
sorted_commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CDF constructor doesn't even have checkpoint files, does it?

latest_checkpoint.version
);
}
if checkpoint_parts.len() != checkpoint_metadata.parts.unwrap_or(1) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an incorrect check (somebody noticed and filed a bug I think). It wrongly fails if a stale checkpoint hint gives a different checkpoint part count than the checkpoint we actually found.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, the whole checkpoint listing logic is buggy and needs a rewrite, so maybe just leave it alone and we can land the fixes separately, e.g. #322

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular one is actually on me. I'd wanted to check that checkpoint parts are correct ,but the case you point out fails. Thx for flagging it!

I do agree we should revisit log segment and give it a more thorough look and give it the time it deserves.

/// Sorted commit files in the log segment
pub commit_files: Vec<ParsedLogPath>,
/// Sorted commit files in the log segment (ascending)
pub sorted_commit_files: Vec<ParsedLogPath>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty these used to be sorted in reverse order; did log replay get updated to use a reverse iterator?
(and do we have any tests that could catch such a bug?)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: yes, the action iterator was updated with a rev call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverse iterator is in use. I don't have a test for checking that it's reverse sorted because that would require that we replay. This needs a read schema, engine, and some engine data wrangling. This seems out of scope for this PR.

/// checkpoint files in the log segment.
pub checkpoint_files: Vec<ParsedLogPath>,
/// Sorted commit files in the log segment (ascending)
pub sorted_commit_files: Vec<ParsedLogPath>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: Would it be more self-describing to call this ascending_commit_files?
(your call, just a wild late-night idea)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm I agree more self-describing but knee-jerk reaction is it's generally less readable? no strong opinion

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks quite good in general.

Two main questions:

  1. Should we avoid moving code between files in this PR (either do it before or after)?
  2. How to handle ParsedLogPath "error" cases (see comment)


// Check that the provided version is less than or equal to the end version if it exists
let lte_end_version = move |version: u64| {
end_version.is_none() || end_version.is_some_and(|end_version| version <= end_version)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we probably don't want to take a hard dep on 1.82 so soon (and not for something so minor)

pub end_version: Version,
pub log_root: Url,
/// Sorted commit files in the log segment (ascending)
pub sorted_commit_files: Vec<ParsedLogPath>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rescuing #495 (comment)

qq: Would it be more self-describing to call this ascending_commit_files?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can rename it to that 👍

Comment on lines 69 to 70
require!(checkpoint_file.version + 1 == commit_file.version,
Error::generic(format!("Expected commit file version {} to be next version to checkpoint file version {}", commit_file.version, checkpoint_file.version )))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird indent? cargo fmt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, maybe the message can be less wordy, e.g

"Gap between checkpoint version {} and next commit {}"

let version_eff = sorted_commit_files
.last()
.or(checkpoint_parts.first())
.ok_or(Error::MissingVersion)? // TODO: A more descriptive error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rescuing #495 (comment)

Would it make more sense to use an Error::generic here, or create an new error variant?

Generic seems fine for now.

We may want a tracking issue to make a new error class to cover all missing file situations we might encounter during log listing?

Ok(fs_client
.list_from(&start_from)?
.map(|meta| ParsedLogPath::try_from(meta?))
// TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rescuing #495 (comment)

Do we even use or care about .crc files? They seem to be something pyspark associates with every write? Would we ever try to read one?

Comment on lines 288 to 291
.filter_map_ok(identity)
.take_while(move |path_res| match path_res {
Ok(path) => end_version.is_none_or(|end_version| path.version <= end_version),
Err(_) => true,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rescuing #495 (comment)

Hmm is it a good idea to be filtering out errors though?

This is a good question we should probably track as a separate discussion/issue:

The ParsedLogPath code recognizes four general classes of files:

  1. "Known" versioned files, e.g. Ok(Some(SinglePartCheckpoint))
  2. "Unknown" versioned files that should be ignored, Ok(Some(Unknown))
  3. Non-versioned files that should be ignored, Ok(None)
  4. Versioned files that resemble a known type but which fail to parse, Err(_)

The Delta spec doesn't specifically say what to do with unrecognized file types, but it is pretty clear that we should ignore unrecognized things in general:

Since breaking changes must be accompanied by an increase in the protocol version recorded in a table or by the addition of a table feature, clients can assume that unrecognized actions, fields, and/or metadata domains are never required in order to correctly interpret the transaction log. Clients must ignore such unrecognized fields, and should not produce an error when reading a table that contains unrecognized fields.

... and some Delta table features like v2 checkpoints and log compaction rely on older clients to ignore unrecognized files/dirs.

So then the question is whether file names like these:

# hex instead of decimal
00000000deadbeef.commit.json  

# bogus part numbering
00000000000000000000.checkpoint.0000000010.0000000000.parquet 

# v2 checkpoint, as seen by a client that doesn't understand that feature
00000000000000000010.checkpoint.80a083e8-7026-4e79-81be-64bd76c43a11.json

# compacted log file, as seen by a client that doesn't understand that feature
00000000000000000004.00000000000000000006.compacted.json

... are simply "unrecognized files" or actual errors to care about. Your code is treating them like actual errors; @zachschuermann is suggesting to just filter them out.

Maybe we should take the middle ground and filter them out with a warn! so there's at least a possibility to know they happened?

Copy link
Collaborator

@scovich scovich Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: The original code (before this PR) was propagating file name parsing errors, so we should probably keep that behavior for now and file an issue for separate follow-up?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea agree with the last comment -- let's keep behavior and file a follow up (and if we want to add a warn! for some other cases that's fine too)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx for the context @scovich!! Agreed that it's something to address. No need to fail if we don't have to. I'll talk to @zachschuermann and get an issue up

Copy link
Member

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few comments but LGTM

/// between versions `start_version` (inclusive) and `end_version` (inclusive). If no `end_version`
/// is specified it will be the most recent version by default.
#[allow(unused)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Ok(fs_client
.list_from(&start_from)?
.map(|meta| ParsedLogPath::try_from(meta?))
// TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OussamaSaoudi-db can you create an issue for follow-up?

Ok(fs_client
.list_from(&start_from)?
.map(|meta| ParsedLogPath::try_from(meta?))
// TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like #496 ? thanks!

#[ignore]
#[test]
fn build_snapshot_with_missing_checkpoint_part_no_hint() {
// TODO(Oussam): Hande checkpoints correctly so that this test passes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow up issue?

Comment on lines 91 to 96
// get an ObjectStore path for a checkpoint file, based on version, part number, and total number of parts
pub fn delta_path_for_multipart_checkpoint(version: u64, part_num: u64, num_parts: u64) -> Path {
let path =
format!("_delta_log/{version:020}.checkpoint.{part_num:010}.{num_parts:010}.parquet");
Path::from(path.as_str())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this should go here or just in some test-only code right now? can always pull it out if we need it here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree test-only code is best for now. If we did want it in prod code, it would belong in path.rs, not here? And it should probably involve LogFileType, e.g.

fn delta_file_name(version: Version, ty: LogFileType) -> String {
    let suffix = match ty {
          ...
        MultiPartCheckpoint { part_num, num_parts } => {
            format!("checkpoint.{part_num:010}.{num_parts:010}.parquet")
        }
          ...
    };
    format!("{version:020}.{suffix}")
}

(best done as a follow-up)

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few nits, but pretty much ready to merge!

/// between versions `start_version` (inclusive) and `end_version` (inclusive). If no `end_version`
/// is specified it will be the most recent version by default.
#[allow(unused)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Comment on lines 91 to 96
// get an ObjectStore path for a checkpoint file, based on version, part number, and total number of parts
pub fn delta_path_for_multipart_checkpoint(version: u64, part_num: u64, num_parts: u64) -> Path {
let path =
format!("_delta_log/{version:020}.checkpoint.{part_num:010}.{num_parts:010}.parquet");
Path::from(path.as_str())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree test-only code is best for now. If we did want it in prod code, it would belong in path.rs, not here? And it should probably involve LogFileType, e.g.

fn delta_file_name(version: Version, ty: LogFileType) -> String {
    let suffix = match ty {
          ...
        MultiPartCheckpoint { part_num, num_parts } => {
            format!("checkpoint.{part_num:010}.{num_parts:010}.parquet")
        }
          ...
    };
    format!("{version:020}.{suffix}")
}

(best done as a follow-up)

@OussamaSaoudi-db OussamaSaoudi-db merged commit 3e7ad45 into delta-io:main Nov 15, 2024
19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

breaking-change Change that require a major version bump

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants