Skip to content

[Kernel][InCommitTimestamp][WIP] Support InCommitTimestamp-based time travel #4483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
package io.delta.kernel.internal;

import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.TableConfig.*;
import static io.delta.kernel.internal.fs.Path.getName;

import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.checkpoints.CheckpointInstance;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.InCommitTimestampUtils;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
Expand Down Expand Up @@ -63,6 +67,7 @@ private DeltaHistoryManager() {}
*/
public static Commit getActiveCommitAtTimestamp(
Engine engine,
SnapshotImpl latestSnapshot,
Path logPath,
long timestamp,
boolean mustBeRecreatable,
Expand All @@ -75,32 +80,175 @@ public static Commit getActiveCommitAtTimestamp(
? getEarliestRecreatableCommit(engine, logPath)
: getEarliestDeltaFile(engine, logPath);

// Search for the commit
List<Commit> commits = getCommits(engine, logPath, earliestVersion);
Commit commit =
lastCommitBeforeOrAtTimestamp(commits, timestamp)
.orElse(commits.get(0)); // This is only returned if canReturnEarliestCommit (see below)
Commit placeholderEarliestCommit = new Commit(earliestVersion, -1L /* timestamp */);
Commit ictEnablementCommit = getICTEnablementCommit(latestSnapshot, placeholderEarliestCommit);
Commit searchResult;
if (ictEnablementCommit.getTimestamp() <= timestamp) {
// The target commit is in the ICT range.
long latestSnapshotTimestamp = latestSnapshot.getTimestamp(engine);
if (latestSnapshotTimestamp <= timestamp) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an illegal scenario, the query needs to fail.

Reason: It's unsafe to do timestamp as-of for timestamps later than the latest commit. We don't know when (or if) a later commit might land, and also don't know what ICT that commit might have. It's possible a later time travel request for the same timestamp might find a newer commit and we get a non-repeatable read.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We explicitly handle this scenario later on here: https://github.com/delta-io/delta/pull/4483/files#diff-07b47ed7d50294a001f579299ebd0e7d2328a991babcf6ba23af3c9cacdf8cdcR148. This just ensures that we have a timestamp for the exception message.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich I have addressed all the binary search-specific comments in https://github.com/delta-io/delta/pull/4581/files

// We just proved we should use the latest snapshot
searchResult = new Commit(latestSnapshot.getVersion(), latestSnapshotTimestamp);
} else {
// start ICT search over [earliest available ICT version, latestVersion)
boolean ictEnabledForEntireWindow = (ictEnablementCommit.version <= earliestVersion);
long searchWindowLowerBound =
ictEnabledForEntireWindow
? placeholderEarliestCommit.getVersion()
: ictEnablementCommit.getVersion();
try {
searchResult =
getActiveCommitAtTimeFromICTRange(
timestamp,
searchWindowLowerBound,
new Commit(latestSnapshot.getVersion(), latestSnapshotTimestamp),
engine,
latestSnapshot.getLogPath());
} catch (IOException e) {
throw new RuntimeException(
"There was an error while reading a historical commit while performing a timestamp"
+ "based lookup. This can happen when the commit log is corrupted or when "
+ "there is a parallel operation like metadata cleanup that is deleting "
+ "commits. Please retry the query.",
e);
}
}
} else {
// ICT was NOT enabled as-of the requested time
if (ictEnablementCommit.version <= earliestVersion) {
// We're searching for a non-ICT time but the non-ICT commits are all missing.
// If `canReturnEarliestCommit` is `false`, we need the details of the
// earliest commit to populate the timestampBeforeFirstAvailableCommit
// error correctly.
// Else, when `canReturnEarliestCommit` is `true`, the earliest commit
// is the desired result.
long ict =
CommitInfo.getRequiredInCommitTimestampFromFile(
engine, logPath, placeholderEarliestCommit.getVersion());
searchResult = new Commit(placeholderEarliestCommit.getVersion(), ict);
} else {
// start non-ICT linear search over [earliestVersion, ictEnablementVersion)
List<Commit> commits = getCommits(engine, logPath, earliestVersion);
searchResult =
lastCommitBeforeOrAtTimestamp(commits, timestamp)
.orElse(
commits.get(0)); // This is only returned if canReturnEarliestCommit (see below)
Comment on lines +134 to +135
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: weird indentation?

}
}

// If timestamp is before the earliest commit
if (commit.timestamp > timestamp && !canReturnEarliestCommit) {
if (searchResult.timestamp > timestamp && !canReturnEarliestCommit) {
throw DeltaErrors.timestampBeforeFirstAvailableCommit(
logPath.getParent().toString(), /* use dataPath */
timestamp,
commits.get(0).timestamp,
commits.get(0).version);
searchResult.timestamp,
searchResult.version);
}
// If timestamp is after the last commit of the table
if (commit == commits.get(commits.size() - 1)
&& commit.timestamp < timestamp
&& !canReturnLastCommit) {
if (searchResult.timestamp < timestamp && !canReturnLastCommit) {
throw DeltaErrors.timestampAfterLatestCommit(
logPath.getParent().toString(), /* use dataPath */
timestamp,
commit.timestamp,
commit.version);
searchResult.timestamp,
searchResult.version);
}

return commit;
return searchResult;
}

private static Optional<Long> getInitialCommitVersionForICTSearch(
long searchTimestamp, long startCommitVersion, Commit endCommit, Engine engine, Path logPath)
throws IOException {
long listingStartVersion = Math.max(startCommitVersion, endCommit.getVersion() - 1000);
try (CloseableIterator<Commit> commits =
listFrom(engine, logPath, listingStartVersion)
.takeWhile(
fs ->
FileNames.getFileVersionOpt(new Path(fs.getPath())).orElse(-1L)
< endCommit.getVersion())
Comment on lines +165 to +168
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
.takeWhile(
fs ->
FileNames.getFileVersionOpt(new Path(fs.getPath())).orElse(-1L)
< endCommit.getVersion())
.takeWhile(fs -> {
long version = FileNames.getFileVersionOpt(new Path(fs.getPath())).orElse(-1L)
version < endCommit.getVersion()
})

.filter(fs -> FileNames.isCommitFile(getName(fs.getPath())))
.map(fs -> new Commit(FileNames.deltaVersion(fs.getPath()), fs.getModificationTime()))
.takeWhile(commit -> commit.getTimestamp() <= searchTimestamp)) {
if (commits.hasNext()) {
List<Commit> commitsList = commits.toInMemoryList();
return Optional.of(commitsList.get(commitsList.size() - 1).getVersion());
} else {
// All commits in the range have modTimes greater than the search timestamp.
return Optional.empty();
}
}
}

private static Commit getActiveCommitAtTimeFromICTRange(
long searchTimestamp,
long startCommitVersion,
Commit endCommitExclusive,
Engine engine,
Path logPath)
throws IOException {
// Find the pivot commit version. This should be pretty close to the target commit.
Optional<Long> pivotVersionOpt =
getInitialCommitVersionForICTSearch(
searchTimestamp, startCommitVersion, endCommitExclusive, engine, logPath);
long lowerBoundVersion = startCommitVersion,
upperBoundVersion = endCommitExclusive.getVersion();
if (pivotVersionOpt.isPresent()) {
// We have potentially narrowed down the search space using modTime.
long pivotVersion = pivotVersionOpt.get();
long pivotICT =
CommitInfo.getRequiredInCommitTimestampFromFile(engine, logPath, pivotVersion);
Commit pivotCommit = new Commit(pivotVersion, pivotICT);
// In most cases, the target commit should be pretty close to the pivot commit.
if (pivotICT == searchTimestamp) {
return pivotCommit;
}
boolean searchLeft = pivotICT > searchTimestamp;
Tuple2<Long, Long> narrowerBounds =
InCommitTimestampUtils.getNarrowSearchBoundsUsingExponentialSearch(
searchTimestamp,
pivotVersion,
searchLeft ? lowerBoundVersion : upperBoundVersion,
version -> CommitInfo.getRequiredInCommitTimestampFromFile(engine, logPath, version),
searchLeft /* reversed */);
lowerBoundVersion = narrowerBounds._1;
upperBoundVersion = narrowerBounds._2;
if (upperBoundVersion == lowerBoundVersion + 1) {
// We have a single commit in the range.
return pivotCommit;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the pivot guaranteed to lie between the newly narrowed bounds?
Are we claiming the invariant pivotCommit.version() == lowerBoundVersion holds in this case?

}
}
// Now we have a range of commits to search through. We can use binary search to find the
// commit that is closest to the search timestamp.
Tuple2<Long, Long> greatestLowerBound =
InCommitTimestampUtils.greatestLowerBound(
searchTimestamp,
lowerBoundVersion,
upperBoundVersion,
version -> CommitInfo.getRequiredInCommitTimestampFromFile(engine, logPath, version));
return new Commit(greatestLowerBound._1, greatestLowerBound._2);
}

private static Commit getICTEnablementCommit(
SnapshotImpl snapshot, Commit placeholderEarliestCommit) {
Metadata metadata = snapshot.getMetadata();
if (!IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata)) {
// Pretend ICT will be enabled after the latest version and requested timestamp.
// This will force us to use the non-ICT search path.
return new Commit(snapshot.getVersion() + 1, Long.MAX_VALUE);
}
Optional<Long> enablementTimestampOpt =
IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.fromMetadata(metadata);
Optional<Long> enablementVersionOpt =
IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.fromMetadata(metadata);
if (enablementTimestampOpt.isPresent() && enablementVersionOpt.isPresent()) {
return new Commit(enablementVersionOpt.get(), enablementTimestampOpt.get());
} else if (!enablementTimestampOpt.isPresent() && !enablementVersionOpt.isPresent()) {
// This means that ICT has been enabled for the entire history.
return placeholderEarliestCommit;
} else {
throw new IllegalStateException(
"Both enablement version and timestamp should be present or absent together.");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC
throws TableNotFoundException {
SnapshotQueryContext snapshotContext =
SnapshotQueryContext.forTimestampSnapshot(tablePath, millisSinceEpochUTC);
SnapshotImpl latestSnapshot = (SnapshotImpl) getLatestSnapshot(engine);
try {
return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC, snapshotContext);
return snapshotManager.getSnapshotForTimestamp(
engine, latestSnapshot, millisSinceEpochUTC, snapshotContext);
} catch (Exception e) {
recordSnapshotErrorReport(engine, snapshotContext, e);
throw e;
Expand Down Expand Up @@ -260,8 +262,10 @@ protected Path getLogPath() {
* @throws TableNotFoundException if no delta table is found
*/
public long getVersionBeforeOrAtTimestamp(Engine engine, long millisSinceEpochUTC) {
SnapshotImpl latestSnapshot = (SnapshotImpl) getLatestSnapshot(engine);
return DeltaHistoryManager.getActiveCommitAtTimestamp(
engine,
latestSnapshot,
getLogPath(),
millisSinceEpochUTC,
false, /* mustBeRecreatable */
Expand Down Expand Up @@ -295,9 +299,11 @@ public long getVersionBeforeOrAtTimestamp(Engine engine, long millisSinceEpochUT
* @throws TableNotFoundException if no delta table is found
*/
public long getVersionAtOrAfterTimestamp(Engine engine, long millisSinceEpochUTC) {
SnapshotImpl latestSnapshot = (SnapshotImpl) getLatestSnapshot(engine);
DeltaHistoryManager.Commit commit =
DeltaHistoryManager.getActiveCommitAtTimestamp(
engine,
latestSnapshot,
getLogPath(),
millisSinceEpochUTC,
false, /* mustBeRecreatable */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,10 @@ public static Optional<CommitInfo> getCommitInfoOpt(Engine engine, Path logPath,
logger.info("No commit info found for commit of version {}", version);
return Optional.empty();
}

public static long getRequiredInCommitTimestampFromFile(
Engine engine, Path logPath, long version) {
return getRequiredInCommitTimestamp(
getCommitInfoOpt(engine, logPath, version), String.valueOf(version), logPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ public SnapshotImpl getSnapshotAt(
* @throws InvalidTableException if the table is in an invalid state
*/
public Snapshot getSnapshotForTimestamp(
Engine engine, long millisSinceEpochUTC, SnapshotQueryContext snapshotContext)
Engine engine,
SnapshotImpl latestSnapshot,
long millisSinceEpochUTC,
SnapshotQueryContext snapshotContext)
throws TableNotFoundException {
long versionToRead =
snapshotContext
Expand All @@ -121,6 +124,7 @@ public Snapshot getSnapshotForTimestamp(
() ->
DeltaHistoryManager.getActiveCommitAtTimestamp(
engine,
latestSnapshot,
logPath,
millisSinceEpochUTC,
true /* mustBeRecreatable */,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.delta.kernel.utils.FileStatus;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;

public final class FileNames {
Expand Down Expand Up @@ -111,15 +112,24 @@ public static DeltaLogFileType determineFileType(FileStatus file) {
* upgrade.
*/
public static long getFileVersion(Path path) {
Optional<Long> fileVersionOpt = getFileVersionOpt(path);
if (fileVersionOpt.isPresent()) {
return fileVersionOpt.get();
} else {
throw new IllegalArgumentException(
String.format("Unexpected file type found in transaction log: %s", path));
}
}

public static Optional<Long> getFileVersionOpt(Path path) {
if (isCheckpointFile(path.getName())) {
return checkpointVersion(path);
return Optional.of(checkpointVersion(path));
} else if (isCommitFile(path.getName())) {
return deltaVersion(path);
return Optional.of(deltaVersion(path));
} else if (isChecksumFile(path.getName())) {
return checksumVersion(path);
return Optional.of(checksumVersion(path));
} else {
throw new IllegalArgumentException(
String.format("Unexpected file type found in transaction log: %s", path));
return Optional.empty();
}
}

Expand Down
Loading
Loading