Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
package io.delta.kernel.internal;

import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.TableConfig.*;
import static io.delta.kernel.internal.fs.Path.getName;

import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.actions.CommitInfo;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.checkpoints.CheckpointInstance;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.InCommitTimestampUtils;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
Expand Down Expand Up @@ -63,6 +67,7 @@ private DeltaHistoryManager() {}
*/
public static Commit getActiveCommitAtTimestamp(
Engine engine,
SnapshotImpl latestSnapshot,
Path logPath,
long timestamp,
boolean mustBeRecreatable,
Expand All @@ -75,32 +80,154 @@ public static Commit getActiveCommitAtTimestamp(
? getEarliestRecreatableCommit(engine, logPath)
: getEarliestDeltaFile(engine, logPath);

// Search for the commit
List<Commit> commits = getCommits(engine, logPath, earliestVersion);
Commit commit =
lastCommitBeforeOrAtTimestamp(commits, timestamp)
.orElse(commits.get(0)); // This is only returned if canReturnEarliestCommit (see below)
Commit placeholderEarliestCommit = new Commit(earliestVersion, -1L /* timestamp */);
Commit ictEnablementCommit = getICTEnablementCommit(latestSnapshot, placeholderEarliestCommit);
Commit searchResult;
if (ictEnablementCommit.getTimestamp() <= timestamp) {
// The target commit is in the ICT range.
long latestSnapshotTimestamp = latestSnapshot.getTimestamp(engine);
if (latestSnapshotTimestamp <= timestamp) {
// We just proved we should use the latest snapshot
// Note that if `latestSnapshotTimestamp` is less than `timestamp`, we only
// return this search result if `canReturnLastCommit` is true.
// If `canReturnLastCommit` is false, we still need this commit to
// throw the timestampAfterLatestCommit error.
searchResult = new Commit(latestSnapshot.getVersion(), latestSnapshotTimestamp);
} else {
// start ICT search over [earliest available ICT version, latestVersion)
boolean ictEnabledForEntireWindow = (ictEnablementCommit.version <= earliestVersion);
long searchWindowLowerBound =
ictEnabledForEntireWindow
? placeholderEarliestCommit.getVersion()
: ictEnablementCommit.getVersion();
try {
searchResult =
getActiveCommitAtTimeFromICTRange(
timestamp,
searchWindowLowerBound,
latestSnapshot.getVersion(),
engine,
latestSnapshot.getLogPath());
} catch (IOException e) {
throw new RuntimeException(
"There was an error while reading a historical commit while performing a timestamp-"
+ "based lookup. This usually happens when there is a parallel operation like "
+ "metadata cleanup that is deleting commits. Please retry the query.",
e);
}
}
} else {
// ICT was NOT enabled as-of the requested time
if (ictEnablementCommit.version <= earliestVersion) {
// We're searching for a non-ICT time but the non-ICT commits are all missing.
// If `canReturnEarliestCommit` is `false`, we need the details of the
// earliest commit to populate the timestampBeforeFirstAvailableCommit
// error correctly.
// Else, when `canReturnEarliestCommit` is `true`, the earliest commit
// is the desired result.
long ict =
CommitInfo.getRequiredInCommitTimestampFromFile(
engine, logPath, placeholderEarliestCommit.getVersion());
searchResult = new Commit(placeholderEarliestCommit.getVersion(), ict);
} else {
// start non-ICT linear search over [earliestVersion, )
List<Commit> commits = getCommits(engine, logPath, earliestVersion);
searchResult =
lastCommitBeforeOrAtTimestamp(commits, timestamp)
.orElse(
commits.get(0)); // This is only returned if canReturnEarliestCommit (see below)
}
}

// If timestamp is before the earliest commit
if (commit.timestamp > timestamp && !canReturnEarliestCommit) {
if (searchResult.timestamp > timestamp && !canReturnEarliestCommit) {
throw DeltaErrors.timestampBeforeFirstAvailableCommit(
logPath.getParent().toString(), /* use dataPath */
timestamp,
commits.get(0).timestamp,
commits.get(0).version);
searchResult.timestamp,
searchResult.version);
}
// If timestamp is after the last commit of the table
if (commit == commits.get(commits.size() - 1)
&& commit.timestamp < timestamp
if (searchResult.version == latestSnapshot.getVersion()
&& searchResult.timestamp < timestamp
&& !canReturnLastCommit) {
throw DeltaErrors.timestampAfterLatestCommit(
logPath.getParent().toString(), /* use dataPath */
timestamp,
commit.timestamp,
commit.version);
searchResult.timestamp,
searchResult.version);
}

return commit;
return searchResult;
}

/**
* Finds the commit with the latest in-commit timestamp that is less than or equal to the
* searchTimestamp. All commits from `startCommitVersionInclusive` till
* `endCommitVersionInclusive` must have ICT enabled. Also, this method assumes that we have
* already proven that `searchTimestamp` is in the given range.
*/
private static Commit getActiveCommitAtTimeFromICTRange(
long searchTimestamp,
long startCommitVersionInclusive,
long endCommitVersionInclusive,
Engine engine,
Path logPath)
throws IOException {
// Now we have a range of commits to search through. We can use binary search to find the
// commit that is closest to the search timestamp.
Optional<Tuple2<Long, Long>> greatestLowerBoundOpt =
InCommitTimestampUtils.greatestLowerBound(
searchTimestamp,
startCommitVersionInclusive,
endCommitVersionInclusive,
version -> CommitInfo.getRequiredInCommitTimestampFromFile(engine, logPath, version));
// This indicates that the search timestamp is less than the earliest commit.
if (!greatestLowerBoundOpt.isPresent()) {
long startIct =
CommitInfo.getRequiredInCommitTimestampFromFile(
engine, logPath, startCommitVersionInclusive);
return new Commit(startCommitVersionInclusive, startIct);
}
Tuple2<Long, Long> greatestLowerBound = greatestLowerBoundOpt.get();
return new Commit(greatestLowerBound._1, greatestLowerBound._2);
}

/**
* Gets the commit that enabled in-commit timestamps.
*
* @param snapshot The latest snapshot of the table. This is used to determine when in-commit
* timestamps were enabled.
* @param earliestCommit The earliest commit under consideration. If in-commit timestamps were
* enabled for the entire history, this function will return this commit.
* @return The commit that enabled in-commit timestamps. If the table does not have in-commit
* timestamps enabled, this will be the commit after the latest version. If in-commit
* timestamps were enabled for the entire history, this will be `earliestCommit`.
*/
private static Commit getICTEnablementCommit(SnapshotImpl snapshot, Commit earliestCommit) {
Metadata metadata = snapshot.getMetadata();
if (!IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata)) {
// Pretend ICT will be enabled after the latest version and requested timestamp.
// This will force us to use the non-ICT search path.
return new Commit(snapshot.getVersion() + 1, Long.MAX_VALUE);
}
Optional<Long> enablementTimestampOpt =
IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.fromMetadata(metadata);
Optional<Long> enablementVersionOpt =
IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.fromMetadata(metadata);
if (enablementTimestampOpt.isPresent() && enablementVersionOpt.isPresent()) {
return new Commit(enablementVersionOpt.get(), enablementTimestampOpt.get());
} else if (!enablementTimestampOpt.isPresent() && !enablementVersionOpt.isPresent()) {
// This means that ICT has been enabled for the entire history.
return earliestCommit;
} else {
throw new IllegalStateException(
String.format(
"Both %s and %s should be present or absent together"
+ "when inCommitTimestamp is enabled.",
IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.getKey(),
IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.getKey()));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC
throws TableNotFoundException {
SnapshotQueryContext snapshotContext =
SnapshotQueryContext.forTimestampSnapshot(tablePath, millisSinceEpochUTC);
SnapshotImpl latestSnapshot = (SnapshotImpl) getLatestSnapshot(engine);
try {
return snapshotManager.getSnapshotForTimestamp(engine, millisSinceEpochUTC, snapshotContext);
return snapshotManager.getSnapshotForTimestamp(
engine, latestSnapshot, millisSinceEpochUTC, snapshotContext);
} catch (Exception e) {
recordSnapshotErrorReport(engine, snapshotContext, e);
throw e;
Expand Down Expand Up @@ -260,8 +262,10 @@ protected Path getLogPath() {
* @throws TableNotFoundException if no delta table is found
*/
public long getVersionBeforeOrAtTimestamp(Engine engine, long millisSinceEpochUTC) {
SnapshotImpl latestSnapshot = (SnapshotImpl) getLatestSnapshot(engine);
return DeltaHistoryManager.getActiveCommitAtTimestamp(
engine,
latestSnapshot,
getLogPath(),
millisSinceEpochUTC,
false, /* mustBeRecreatable */
Expand Down Expand Up @@ -295,9 +299,11 @@ public long getVersionBeforeOrAtTimestamp(Engine engine, long millisSinceEpochUT
* @throws TableNotFoundException if no delta table is found
*/
public long getVersionAtOrAfterTimestamp(Engine engine, long millisSinceEpochUTC) {
SnapshotImpl latestSnapshot = (SnapshotImpl) getLatestSnapshot(engine);
DeltaHistoryManager.Commit commit =
DeltaHistoryManager.getActiveCommitAtTimestamp(
engine,
latestSnapshot,
getLogPath(),
millisSinceEpochUTC,
false, /* mustBeRecreatable */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,15 @@ public static Optional<CommitInfo> getCommitInfoOpt(Engine engine, Path logPath,
logger.info("No commit info found for commit of version {}", version);
return Optional.empty();
}

/**
* Returns the `inCommitTimestamp` of delta file at the requested version. Throws an exception if
* the delta file does not exist or does not have a commitInfo action or if the commitInfo action
* contains an empty `inCommitTimestamp`.
*/
public static long getRequiredInCommitTimestampFromFile(
Engine engine, Path logPath, long version) {
return getRequiredInCommitTimestamp(
getCommitInfoOpt(engine, logPath, version), String.valueOf(version), logPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ public SnapshotImpl getSnapshotAt(
* @throws InvalidTableException if the table is in an invalid state
*/
public Snapshot getSnapshotForTimestamp(
Engine engine, long millisSinceEpochUTC, SnapshotQueryContext snapshotContext)
Engine engine,
SnapshotImpl latestSnapshot,
long millisSinceEpochUTC,
SnapshotQueryContext snapshotContext)
throws TableNotFoundException {
long versionToRead =
snapshotContext
Expand All @@ -121,6 +124,7 @@ public Snapshot getSnapshotForTimestamp(
() ->
DeltaHistoryManager.getActiveCommitAtTimestamp(
engine,
latestSnapshot,
logPath,
millisSinceEpochUTC,
true /* mustBeRecreatable */,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

public class InCommitTimestampUtils {

Expand Down Expand Up @@ -101,4 +102,52 @@ private static boolean didCurrentTransactionEnableICT(
&& IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(readSnapshot.getMetadata());
return isICTCurrentlyEnabled && !wasICTEnabledInReadSnapshot;
}

/**
* Finds the greatest lower bound of the target value in the range [lowerBoundInclusive,
* upperBoundInclusive] using binary search. The indexToValueMapper function is used to map the
* index to the corresponding value. Note that this function assumes that the values are sorted in
* ascending order.
*
* @param target The target value to find the greatest lower bound for.
* @param lowerBoundInclusive The lower bound of the search range (inclusive).
* @param upperBoundInclusive The upper bound of the search range (inclusive).
* @param indexToValueMapper A function that maps an index to its corresponding value.
* @return An optional which contains a tuple containing the index and the value of the greatest
* lower bound when found, or an empty optional if not found.
*/
public static Optional<Tuple2<Long, Long>> greatestLowerBound(
long target,
long lowerBoundInclusive,
long upperBoundInclusive,
Function<Long, Long> indexToValueMapper) {
if (lowerBoundInclusive > upperBoundInclusive) {
return Optional.empty();
}

long start = lowerBoundInclusive;
long end = upperBoundInclusive;
long resultIndex = -1;
long resultValue = 0;

while (start <= end) {
long mid = start + (end - start) / 2;
long midValue = indexToValueMapper.apply(mid);
if (midValue == target) {
return Optional.of(new Tuple2<>(mid, midValue));
} else if (midValue < target) {
resultIndex = mid;
resultValue = midValue;
start = mid + 1;
} else {
end = mid - 1;
}
}

if (resultIndex == -1) {
return Optional.empty();
} else {
return Optional.of(new Tuple2<>(resultIndex, resultValue));
}
}
}
Loading
Loading