Skip to content

[Kernel][InCommitTimestamp][WIP] Support InCommitTimestamp-based time travel #4483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from

Conversation

dhruvarya-db
Copy link
Collaborator

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

How was this patch tested?

Does this PR introduce any user-facing changes?

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approach looks reasonable... but we'll need a lot of good unit testing to validate something this complex (code inspection by reviewers won't be enough)

if (ictEnablementCommit.getTimestamp() <= timestamp) {
// The target commit is in the ICT range.
long latestSnapshotTimestamp = latestSnapshot.getTimestamp(engine);
if (latestSnapshotTimestamp <= timestamp) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an illegal scenario, the query needs to fail.

Reason: It's unsafe to do timestamp as-of for timestamps later than the latest commit. We don't know when (or if) a later commit might land, and also don't know what ICT that commit might have. It's possible a later time travel request for the same timestamp might find a newer commit and we get a non-repeatable read.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We explicitly handle this scenario later on here: https://github.com/delta-io/delta/pull/4483/files#diff-07b47ed7d50294a001f579299ebd0e7d2328a991babcf6ba23af3c9cacdf8cdcR148. This just ensures that we have a timestamp for the exception message.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich I have addressed all the binary search-specific comments in https://github.com/delta-io/delta/pull/4581/files

Comment on lines +134 to +135
.orElse(
commits.get(0)); // This is only returned if canReturnEarliestCommit (see below)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: weird indentation?

Comment on lines +165 to +168
.takeWhile(
fs ->
FileNames.getFileVersionOpt(new Path(fs.getPath())).orElse(-1L)
< endCommit.getVersion())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
.takeWhile(
fs ->
FileNames.getFileVersionOpt(new Path(fs.getPath())).orElse(-1L)
< endCommit.getVersion())
.takeWhile(fs -> {
long version = FileNames.getFileVersionOpt(new Path(fs.getPath())).orElse(-1L)
version < endCommit.getVersion()
})

upperBoundVersion = narrowerBounds._2;
if (upperBoundVersion == lowerBoundVersion + 1) {
// We have a single commit in the range.
return pivotCommit;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the pivot guaranteed to lie between the newly narrowed bounds?
Are we claiming the invariant pivotCommit.version() == lowerBoundVersion holds in this case?

Function<Long, Long> indexToValueMapper) {
long start = lowerBoundInclusive;
long end = upperBoundExclusive;
Tuple2<Long, Long> result = null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we guarantee we never return null? The caller doesn't seem to be checking for that case?

long curIdx = searchStartEnd + iterationDirection;
for (long i = 1;
curIdx > lowerBound && curIdx < upperBound;
curIdx = Math.round(searchStartEnd + iterationDirection * (Math.pow(2, ++i) - 1))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems really complex. I don't think we need pow and round here? Can we not start with i=0 and then:

curIdx += (iterationDirection << i++)

(-1 << 5 is -32, just like 1 << 5 is 32)

@dhruvarya-db
Copy link
Collaborator Author

Continued in #4581

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants