diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java b/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java index 0dc13b484f5..cc7100fb31d 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/catalog/SparkTable.java @@ -18,8 +18,10 @@ import static io.delta.kernel.spark.utils.ScalaUtils.toScalaMap; import static java.util.Objects.requireNonNull; -import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.Snapshot; import io.delta.kernel.spark.read.SparkScanBuilder; +import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager; +import io.delta.kernel.spark.snapshot.SnapshotManager; import io.delta.kernel.spark.utils.SchemaUtils; import java.util.*; import org.apache.hadoop.conf.Configuration; @@ -40,10 +42,9 @@ public class SparkTable implements Table, SupportsRead { EnumSet.of(TableCapability.BATCH_READ, TableCapability.MICRO_BATCH_READ)); private final Identifier identifier; - private final String tablePath; private final Map options; - // TODO: [delta-io/delta#5029] Add getProperties() in snapshot to avoid using Impl class. - private final SnapshotImpl snapshot; + private final SnapshotManager snapshotManager; + private final Snapshot snapshot; private final Configuration hadoopConf; private final StructType schema; @@ -54,12 +55,12 @@ public class SparkTable implements Table, SupportsRead { private final Transform[] partitionTransforms; /** - * Creates a SparkTable backed by a Delta Kernel snapshot and initializes Spark-facing metadata - * (schemas, partitioning, capabilities). + * Creates a SparkTable backed by a Delta Kernel snapshot manager and initializes Spark-facing + * metadata (schemas, partitioning, capabilities). * - *

Side effects: - Loads the latest snapshot for the given tablePath. - Builds Hadoop - * configuration from options for subsequent I/O. - Derives data schema, partition schema, and - * full table schema from the snapshot. + *

Side effects: - Initializes a SnapshotManager for the given tablePath. - Loads the latest + * snapshot via the manager. - Builds Hadoop configuration from options for subsequent I/O. - + * Derives data schema, partition schema, and full table schema from the snapshot. * *

Notes: - Partition column order from the snapshot is preserved for partitioning and appended * after data columns in the public Spark schema, per Spark conventions. - Read-time scan options @@ -72,14 +73,12 @@ public class SparkTable implements Table, SupportsRead { */ public SparkTable(Identifier identifier, String tablePath, Map options) { this.identifier = requireNonNull(identifier, "identifier is null"); - this.tablePath = requireNonNull(tablePath, "snapshot is null"); this.options = options; this.hadoopConf = SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options)); - this.snapshot = - (SnapshotImpl) - io.delta.kernel.TableManager.loadSnapshot(tablePath) - .build(io.delta.kernel.defaults.engine.DefaultEngine.create(hadoopConf)); + this.snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf); + // Load the initial snapshot through the manager + this.snapshot = snapshotManager.loadLatestSnapshot(); this.schema = SchemaUtils.convertKernelSchemaToSparkSchema(snapshot.getSchema()); this.partColNames = @@ -154,7 +153,7 @@ public Transform[] partitioning() { @Override public Map properties() { - Map props = new HashMap<>(snapshot.getMetadata().getConfiguration()); + Map props = new HashMap<>(snapshot.getTableProperties()); props.putAll(this.options); return Collections.unmodifiableMap(props); } @@ -169,7 +168,7 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap scanOptions) { Map combined = new HashMap<>(this.options); combined.putAll(scanOptions.asCaseSensitiveMap()); CaseInsensitiveStringMap merged = new CaseInsensitiveStringMap(combined); - return new SparkScanBuilder(name(), tablePath, dataSchema, partitionSchema, snapshot, merged); + return new SparkScanBuilder(name(), snapshotManager, dataSchema, partitionSchema, merged); } @Override diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkBatch.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkBatch.java index db0961d08e0..642d8ed5995 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkBatch.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkBatch.java @@ -66,7 +66,7 @@ public SparkBatch( scala.collection.immutable.Map scalaOptions, Configuration hadoopConf) { - this.tablePath = Objects.requireNonNull(tablePath, "tableName is null"); + this.tablePath = Objects.requireNonNull(tablePath, "tablePath is null"); this.dataSchema = Objects.requireNonNull(dataSchema, "dataSchema is null"); this.partitionSchema = Objects.requireNonNull(partitionSchema, "partitionSchema is null"); this.readDataSchema = Objects.requireNonNull(readDataSchema, "readDataSchema is null"); @@ -127,6 +127,7 @@ public boolean equals(Object obj) { SparkBatch that = (SparkBatch) obj; return Objects.equals(this.tablePath, that.tablePath) && Objects.equals(this.readDataSchema, that.readDataSchema) + && Objects.equals(this.dataSchema, that.dataSchema) && Objects.equals(this.partitionSchema, that.partitionSchema) && Arrays.equals(this.pushedToKernelFilters, that.pushedToKernelFilters) && Arrays.equals(this.dataFilters, that.dataFilters) @@ -137,6 +138,7 @@ public boolean equals(Object obj) { public int hashCode() { int result = tablePath.hashCode(); result = 31 * result + readDataSchema.hashCode(); + result = 31 * result + dataSchema.hashCode(); result = 31 * result + partitionSchema.hashCode(); result = 31 * result + Arrays.hashCode(pushedToKernelFilters); result = 31 * result + Arrays.hashCode(dataFilters); diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java index a8b85db29d1..04d0e9ca887 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java @@ -15,7 +15,8 @@ */ package io.delta.kernel.spark.read; -import io.delta.kernel.*; +import io.delta.kernel.CommitRange; +import io.delta.kernel.Snapshot; import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; @@ -23,6 +24,7 @@ import io.delta.kernel.internal.actions.AddFile; import io.delta.kernel.internal.actions.RemoveFile; import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.spark.snapshot.SnapshotManager; import io.delta.kernel.spark.utils.StreamingHelper; import io.delta.kernel.utils.CloseableIterator; import java.io.IOException; @@ -43,10 +45,10 @@ public class SparkMicroBatchStream implements MicroBatchStream { new HashSet<>(Arrays.asList(DeltaAction.ADD, DeltaAction.REMOVE))); private final Engine engine; - private final String tablePath; + private final SnapshotManager snapshotManager; - public SparkMicroBatchStream(String tablePath, Configuration hadoopConf) { - this.tablePath = tablePath; + public SparkMicroBatchStream(SnapshotManager snapshotManager, Configuration hadoopConf) { + this.snapshotManager = Objects.requireNonNull(snapshotManager, "snapshotManager is null"); this.engine = DefaultEngine.create(hadoopConf); } @@ -153,21 +155,11 @@ CloseableIterator getFileChanges( private CloseableIterator filterDeltaLogs( long startVersion, Option endOffset) { List allIndexedFiles = new ArrayList<>(); - // StartBoundary (inclusive) - CommitRangeBuilder builder = - TableManager.loadCommitRange(tablePath) - .withStartBoundary(CommitRangeBuilder.CommitBoundary.atVersion(startVersion)); - if (endOffset.isDefined()) { - // EndBoundary (inclusive) - builder = - builder.withEndBoundary( - CommitRangeBuilder.CommitBoundary.atVersion(endOffset.get().reservoirVersion())); - } - CommitRange commitRange = builder.build(engine); + Optional endVersionOpt = + endOffset.isDefined() ? Optional.of(endOffset.get().reservoirVersion()) : Optional.empty(); + CommitRange commitRange = snapshotManager.getTableChanges(engine, startVersion, endVersionOpt); // Required by kernel: perform protocol validation by creating a snapshot at startVersion. - // TODO(#5318): This is not working with ccv2 table - Snapshot startSnapshot = - TableManager.loadSnapshot(tablePath).atVersion(startVersion).build(engine); + Snapshot startSnapshot = snapshotManager.loadSnapshotAt(startVersion); try (CloseableIterator actionsIter = commitRange.getActions(engine, startSnapshot, ACTION_SET)) { // Each ColumnarBatch belongs to a single commit version, @@ -263,7 +255,8 @@ private void validateCommit( if (removeOpt.isPresent()) { RemoveFile removeFile = removeOpt.get(); Throwable error = - DeltaErrors.deltaSourceIgnoreDeleteError(version, removeFile.getPath(), tablePath); + DeltaErrors.deltaSourceIgnoreDeleteError( + version, removeFile.getPath(), snapshotManager.unsafeVolatileSnapshot().getPath()); if (error instanceof RuntimeException) { throw (RuntimeException) error; } else { diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java index 656d364e70d..3e27f28c654 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java @@ -17,12 +17,15 @@ import static io.delta.kernel.spark.utils.ExpressionUtils.dsv2PredicateToCatalystExpression; +import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.MapValue; import io.delta.kernel.data.Row; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; import io.delta.kernel.expressions.Predicate; import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.data.ScanStateRow; +import io.delta.kernel.spark.snapshot.SnapshotManager; import io.delta.kernel.spark.utils.ScalaUtils; import io.delta.kernel.utils.CloseableIterator; import java.io.IOException; @@ -50,7 +53,7 @@ /** Spark DSV2 Scan implementation backed by Delta Kernel. */ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntimeV2Filtering { - private final String tablePath; + private final SnapshotManager snapshotManager; private final StructType readDataSchema; private final StructType dataSchema; private final StructType partitionSchema; @@ -69,7 +72,7 @@ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntim private volatile boolean planned = false; public SparkScan( - String tablePath, + SnapshotManager snapshotManager, StructType dataSchema, StructType partitionSchema, StructType readDataSchema, @@ -78,9 +81,7 @@ public SparkScan( io.delta.kernel.Scan kernelScan, CaseInsensitiveStringMap options) { - final String normalizedTablePath = Objects.requireNonNull(tablePath, "tablePath is null"); - this.tablePath = - normalizedTablePath.endsWith("/") ? normalizedTablePath : normalizedTablePath + "/"; + this.snapshotManager = Objects.requireNonNull(snapshotManager, "snapshotManager is null"); this.dataSchema = Objects.requireNonNull(dataSchema, "dataSchema is null"); this.partitionSchema = Objects.requireNonNull(partitionSchema, "partitionSchema is null"); this.readDataSchema = Objects.requireNonNull(readDataSchema, "readDataSchema is null"); @@ -112,7 +113,7 @@ public StructType readSchema() { public Batch toBatch() { ensurePlanned(); return new SparkBatch( - tablePath, + getTablePath(), dataSchema, partitionSchema, readDataSchema, @@ -126,7 +127,7 @@ public Batch toBatch() { @Override public MicroBatchStream toMicroBatchStream(String checkpointLocation) { - return new SparkMicroBatchStream(tablePath, hadoopConf); + return new SparkMicroBatchStream(snapshotManager, hadoopConf); } @Override @@ -157,6 +158,18 @@ public OptionalLong numRows() { }; } + /** + * Get the table path from the scan state. + * + * @return the table path with trailing slash + */ + private String getTablePath() { + final Engine tableEngine = DefaultEngine.create(hadoopConf); + final Row scanState = kernelScan.getScanState(tableEngine); + final String tableRoot = ScanStateRow.getTableRoot(scanState).toUri().toString(); + return tableRoot.endsWith("/") ? tableRoot : tableRoot + "/"; + } + /** * Build the partition {@link InternalRow} from kernel partition values by casting them to the * desired Spark types using the session time zone for temporal types. @@ -202,15 +215,15 @@ private InternalRow getPartitionRow(MapValue partitionValues) { */ private void planScanFiles() { final Engine tableEngine = DefaultEngine.create(hadoopConf); - final Iterator scanFileBatches = - kernelScan.getScanFiles(tableEngine); + final String tablePath = getTablePath(); + final Iterator scanFileBatches = kernelScan.getScanFiles(tableEngine); final String[] locations = new String[0]; final scala.collection.immutable.Map otherConstantMetadataColumnValues = scala.collection.immutable.Map$.MODULE$.empty(); while (scanFileBatches.hasNext()) { - final io.delta.kernel.data.FilteredColumnarBatch batch = scanFileBatches.next(); + final FilteredColumnarBatch batch = scanFileBatches.next(); try (CloseableIterator addFileRowIter = batch.getRows()) { while (addFileRowIter.hasNext()) { diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java index 719c0f12259..fb6243b2545 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScanBuilder.java @@ -19,7 +19,7 @@ import io.delta.kernel.expressions.And; import io.delta.kernel.expressions.Predicate; -import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.spark.snapshot.SnapshotManager; import io.delta.kernel.spark.utils.ExpressionUtils; import java.util.*; import java.util.stream.Collectors; @@ -39,7 +39,7 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownFilters { private io.delta.kernel.ScanBuilder kernelScanBuilder; - private final String tablePath; + private final SnapshotManager snapshotManager; private final StructType dataSchema; private final StructType partitionSchema; private final CaseInsensitiveStringMap options; @@ -55,13 +55,12 @@ public class SparkScanBuilder public SparkScanBuilder( String tableName, - String tablePath, + SnapshotManager snapshotManager, StructType dataSchema, StructType partitionSchema, - SnapshotImpl snapshot, CaseInsensitiveStringMap options) { - this.kernelScanBuilder = requireNonNull(snapshot, "snapshot is null").getScanBuilder(); - this.tablePath = requireNonNull(tablePath, "tablePath is null"); + this.kernelScanBuilder = snapshotManager.unsafeVolatileSnapshot().getScanBuilder(); + this.snapshotManager = requireNonNull(snapshotManager, "snapshotManager is null"); this.dataSchema = requireNonNull(dataSchema, "dataSchema is null"); this.partitionSchema = requireNonNull(partitionSchema, "partitionSchema is null"); this.options = requireNonNull(options, "options is null"); @@ -146,7 +145,7 @@ public Filter[] pushedFilters() { @Override public org.apache.spark.sql.connector.read.Scan build() { return new SparkScan( - tablePath, + snapshotManager, dataSchema, partitionSchema, requiredDataSchema, diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/PathBasedSnapshotManager.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/PathBasedSnapshotManager.java new file mode 100644 index 00000000000..a4391e215bb --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/PathBasedSnapshotManager.java @@ -0,0 +1,166 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.CommitRange; +import io.delta.kernel.CommitRangeBuilder; +import io.delta.kernel.Snapshot; +import io.delta.kernel.TableManager; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.DeltaHistoryManager; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.spark.exception.VersionNotFoundException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.annotation.Experimental; + +/** Implementation of SnapshotManager for managing Delta snapshots for Path-based Table. */ +@Experimental +public class PathBasedSnapshotManager implements SnapshotManager { + + private final String tablePath; + private final AtomicReference snapshotAtomicReference; + private final Engine kernelEngine; + + public PathBasedSnapshotManager(String tablePath, Configuration hadoopConf) { + this.tablePath = requireNonNull(tablePath, "tablePath is null"); + this.snapshotAtomicReference = new AtomicReference<>(); + this.kernelEngine = DefaultEngine.create(requireNonNull(hadoopConf, "hadoopConf is null")); + } + + /** + * Returns the cached snapshot without guaranteeing its freshness. + * + *

This method uses atomic operations to ensure thread safety when initializing the cached + * snapshot. If multiple threads call this method concurrently when the cache is empty, only one + * will load the snapshot and all others will use that loaded snapshot. + * + * @return the cached snapshot, or a newly loaded snapshot if none exists + */ + @Override + public Snapshot unsafeVolatileSnapshot() { + return snapshotAtomicReference.updateAndGet( + current -> + current != null ? current : TableManager.loadSnapshot(tablePath).build(kernelEngine)); + } + + /** + * Loads and caches the latest snapshot of the Delta table. + * + * @return the newly loaded snapshot + */ + @Override + public Snapshot loadLatestSnapshot() { + Snapshot snapshot = TableManager.loadSnapshot(tablePath).build(kernelEngine); + snapshotAtomicReference.set(snapshot); + return snapshot; + } + + /** + * Loads a specific version of the Delta table. + * + * @param version the version to load + * @return the snapshot at the specified version + */ + @Override + public Snapshot loadSnapshotAt(long version) { + return TableManager.loadSnapshot(tablePath).atVersion(version).build(kernelEngine); + } + + /** + * Finds the active commit at a specific timestamp. + * + *

This method searches the Delta table's commit history to find the commit that was active at + * the specified timestamp. + * + * @param timestamp the timestamp to query + * @param canReturnLastCommit if true, returns the last commit if the timestamp is after all + * commits + * @param mustBeRecreatable if true, only considers commits that can be recreated (i.e., all + * necessary log files are available) + * @param canReturnEarliestCommit if true, returns the earliest commit if the timestamp is before + * all commits + * @return the commit that was active at the specified timestamp + */ + @Override + public DeltaHistoryManager.Commit getActiveCommitAtTime( + Timestamp timestamp, + Boolean canReturnLastCommit, + Boolean mustBeRecreatable, + Boolean canReturnEarliestCommit) { + SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot(); + return DeltaHistoryManager.getActiveCommitAtTimestamp( + kernelEngine, + snapshot, + snapshot.getLogPath(), + timestamp.getTime(), + mustBeRecreatable, + canReturnLastCommit, + canReturnEarliestCommit, + new ArrayList<>()); + } + + /** + * Checks if a specific version of the Delta table exists and is accessible. + * + * @param version the version to check + * @param mustBeRecreatable if true, requires that the version can be fully recreated from + * available log files + * @param allowOutOfRange if true, allows versions greater than the latest version without + * throwing an exception + * @throws VersionNotFoundException if the version is not available + */ + @Override + public void checkVersionExists(Long version, Boolean mustBeRecreatable, Boolean allowOutOfRange) + throws VersionNotFoundException { + SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot(); + long earliest = + mustBeRecreatable + ? DeltaHistoryManager.getEarliestRecreatableCommit( + kernelEngine, + snapshot.getLogPath(), + Optional.empty() /*earliestRatifiedCommitVersion*/) + : DeltaHistoryManager.getEarliestDeltaFile( + kernelEngine, + snapshot.getLogPath(), + Optional.empty() /*earliestRatifiedCommitVersion*/); + + long latest = snapshot.getVersion(); + if (version < earliest || ((version > latest) && !allowOutOfRange)) { + throw new VersionNotFoundException(version, earliest, latest); + } + } + + @Override + public CommitRange getTableChanges(Engine engine, long startVersion, Optional endVersion) { + CommitRangeBuilder builder = + TableManager.loadCommitRange(tablePath) + .withStartBoundary(CommitRangeBuilder.CommitBoundary.atVersion(startVersion)); + + if (endVersion.isPresent()) { + builder = + builder.withEndBoundary(CommitRangeBuilder.CommitBoundary.atVersion(endVersion.get())); + } + + return builder.build(engine); + } +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/SnapshotManager.java b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/SnapshotManager.java new file mode 100644 index 00000000000..a0dda58f177 --- /dev/null +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/SnapshotManager.java @@ -0,0 +1,128 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot; + +import io.delta.kernel.CommitRange; +import io.delta.kernel.Snapshot; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.DeltaHistoryManager; +import io.delta.kernel.spark.exception.VersionNotFoundException; +import java.sql.Timestamp; +import java.util.Optional; +import org.apache.spark.annotation.Experimental; + +/** + * Interface for managing Delta table snapshots. + * + *

This interface provides methods for loading, caching, and querying Delta table snapshots. It + * supports both current snapshot access and historical snapshot queries based on version or + * timestamp. + * + *

Implementations of this interface are responsible for managing snapshot lifecycle, including + * loading snapshots from storage and maintaining any necessary caching. + */ +@Experimental +public interface SnapshotManager { + + /** + * Returns a cached snapshot without guaranteeing its freshness. + * + *

Expected Behavior: + * + *

+ * + * @return the cached snapshot, or a newly loaded latest snapshot if no cached snapshot exists + */ + Snapshot unsafeVolatileSnapshot(); + + /** + * Loads and returns the latest snapshot of the Delta table. + * + * @return the latest snapshot of the Delta table + */ + Snapshot loadLatestSnapshot(); + + /** + * Loads and returns a snapshot at a specific version of the Delta table. + * + * @param version the version number to load (must be >= 0) + * @return the snapshot at the specified version + * @throws io.delta.kernel.exceptions.KernelException if the version cannot be loaded + */ + Snapshot loadSnapshotAt(long version); + + /** + * Finds and returns the commit that was active at a specific timestamp. + * + * @param timestamp the timestamp to query + * @param canReturnLastCommit if true, returns the last commit if the timestamp is after all + * commits; if false, throws an exception + * @param mustBeRecreatable if true, only considers commits that can be fully recreated from + * available log files; if false, considers all commits + * @param canReturnEarliestCommit if true, returns the earliest commit if the timestamp is before + * all commits; if false, throws an exception + * @return the commit that was active at the specified timestamp + * @throws io.delta.kernel.exceptions.KernelException if no suitable commit is found based on the + * provided flags + */ + DeltaHistoryManager.Commit getActiveCommitAtTime( + Timestamp timestamp, + Boolean canReturnLastCommit, + Boolean mustBeRecreatable, + Boolean canReturnEarliestCommit); + + /** + * Checks if a specific version of the Delta table exists and is accessible. + * + * @param version the version to check + * @param mustBeRecreatable if true, requires that the version can be fully recreated from + * available log files; if false, only requires that the version's log file exists + * @param allowOutOfRange if true, allows versions greater than the latest version without + * throwing an exception; if false, throws exception for out-of-range versions + * @throws VersionNotFoundException if the version is not available or does not meet the specified + * criteria + */ + void checkVersionExists(Long version, Boolean mustBeRecreatable, Boolean allowOutOfRange) + throws VersionNotFoundException; + + /** + * Gets a range of table changes (commits) between start and end versions. + * + *

Expected Behavior: + * + *

+ * + *

Use Case: Use this method for streaming queries, incremental processing, or CDC + * scenarios where you need to process changes between versions. + * + * @param engine the engine implementation for executing operations + * @param startVersion the starting version (inclusive) + * @param endVersion optional ending version (inclusive); if not provided, extends to latest + * @return a CommitRange representing the specified range of commits + */ + CommitRange getTableChanges(Engine engine, long startVersion, Optional endVersion); +} diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java index 47fe3da77b2..2864e66d974 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/utils/StreamingHelper.java @@ -18,137 +18,24 @@ import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static io.delta.kernel.internal.util.Preconditions.checkState; -import io.delta.kernel.Snapshot; -import io.delta.kernel.TableManager; import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.Row; -import io.delta.kernel.defaults.engine.DefaultEngine; -import io.delta.kernel.engine.Engine; -import io.delta.kernel.internal.DeltaHistoryManager; -import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.internal.actions.AddFile; import io.delta.kernel.internal.actions.RemoveFile; import io.delta.kernel.internal.data.StructRow; -import io.delta.kernel.spark.exception.VersionNotFoundException; -import java.sql.Timestamp; -import java.util.ArrayList; import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.conf.Configuration; import org.apache.spark.annotation.Experimental; /** - * Helper class for managing Delta table snapshots in streaming scenarios. + * Helper class providing utilities for working with Delta table data in streaming scenarios. * - *

This class provides utilities to load, update, and query Delta table snapshots using the Delta - * Kernel API. It maintains a cached snapshot that can be accessed and updated as needed. + *

This class provides static utility methods for extracting information from Delta table + * batches, such as version numbers and data change actions. */ @Experimental public class StreamingHelper { - private final String tablePath; - private final AtomicReference snapshotAtomicReference; - private final Engine kernelEngine; - - /** - * Constructs a new StreamingHelper for the specified Delta table. - * - * @param tablePath the path to the Delta table - * @param hadoopConf the Hadoop configuration to use for file system access - */ - public StreamingHelper(String tablePath, Configuration hadoopConf) { - this.tablePath = tablePath; - this.snapshotAtomicReference = new AtomicReference<>(); - this.kernelEngine = DefaultEngine.create(hadoopConf); - } - - /** - * Returns the cached snapshot without guaranteeing its freshness. - * - * @return the cached snapshot, or a newly loaded snapshot if none exists - */ - public Snapshot unsafeVolatileSnapshot() { - Snapshot unsafeVolatileSnapshot = snapshotAtomicReference.get(); - if (unsafeVolatileSnapshot == null) { - return loadLatestSnapshot(); - } - return unsafeVolatileSnapshot; - } - - /** - * Loads and caches the latest snapshot of the Delta table. - * - * @return the newly loaded snapshot - */ - public Snapshot loadLatestSnapshot() { - Snapshot snapshot = TableManager.loadSnapshot(tablePath).build(kernelEngine); - snapshotAtomicReference.set(snapshot); - return snapshot; - } - - /** - * Finds the active commit at a specific timestamp. - * - *

This method searches the Delta table's commit history to find the commit that was active at - * the specified timestamp. - * - * @param timeStamp the timestamp to query - * @param canReturnLastCommit if true, returns the last commit if the timestamp is after all - * commits - * @param mustBeRecreatable if true, only considers commits that can be recreated (i.e., all - * necessary log files are available) - * @param canReturnEarliestCommit if true, returns the earliest commit if the timestamp is before - * all commits - * @return the commit that was active at the specified timestamp - */ - public DeltaHistoryManager.Commit getActiveCommitAtTime( - Timestamp timeStamp, - Boolean canReturnLastCommit, - Boolean mustBeRecreatable, - Boolean canReturnEarliestCommit) { - SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot(); - return DeltaHistoryManager.getActiveCommitAtTimestamp( - kernelEngine, - snapshot, - snapshot.getLogPath(), - timeStamp.getTime(), - mustBeRecreatable, - canReturnLastCommit, - canReturnEarliestCommit, - new ArrayList<>()); - } - - /** - * Checks if a specific version of the Delta table exists and is accessible. - * - * @param version the version to check - * @param mustBeRecreatable if true, requires that the version can be fully recreated from - * available log files - * @param allowOutOfRange if true, allows versions greater than the latest version without - * throwing an exception - * @throws VersionNotFoundException if the version is not available - */ - public void checkVersionExists(Long version, Boolean mustBeRecreatable, Boolean allowOutOfRange) - throws VersionNotFoundException { - SnapshotImpl snapshot = (SnapshotImpl) loadLatestSnapshot(); - long earliest = - mustBeRecreatable - ? DeltaHistoryManager.getEarliestRecreatableCommit( - kernelEngine, - snapshot.getLogPath(), - Optional.empty() /*earliestRatifiedCommitVersion*/) - : DeltaHistoryManager.getEarliestDeltaFile( - kernelEngine, - snapshot.getLogPath(), - Optional.empty() /*earliestRatifiedCommitVersion*/); - - long latest = snapshot.getVersion(); - if (version < earliest || ((version > latest) && !allowOutOfRange)) { - throw new VersionNotFoundException(version, earliest, latest); - } - } - /** * Returns the index of the field with the given name in the schema of the batch. Throws an {@link * IllegalArgumentException} if the field is not found. @@ -203,4 +90,7 @@ public static Optional getDataChangeRemove(ColumnarBatch batch, int RemoveFile removeFile = new RemoveFile(removeFileRow); return removeFile.getDataChange() ? Optional.of(removeFile) : Optional.empty(); } + + /** Private constructor to prevent instantiation of this utility class. */ + private StreamingHelper() {} } diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java index 6cc21158141..b364782ba2c 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import io.delta.kernel.spark.SparkDsv2TestBase; +import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager; import io.delta.kernel.utils.CloseableIterator; import java.io.File; import java.util.ArrayList; @@ -26,7 +27,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Stream; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.delta.DeltaLog; @@ -47,8 +47,12 @@ public class SparkMicroBatchStreamTest extends SparkDsv2TestBase { private SparkMicroBatchStream microBatchStream; @BeforeEach - void setUp() { - microBatchStream = new SparkMicroBatchStream(null, new Configuration()); + void setUp(@TempDir File tempDir) { + String testPath = tempDir.getAbsolutePath(); + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(testPath, spark.sessionState().newHadoopConf()); + microBatchStream = + new SparkMicroBatchStream(snapshotManager, spark.sessionState().newHadoopConf()); } @Test @@ -147,7 +151,10 @@ public void testGetFileChanges( } sql("INSERT INTO %s VALUES %s", testTableName, insertValues.toString()); } - SparkMicroBatchStream stream = new SparkMicroBatchStream(testTablePath, new Configuration()); + SparkMicroBatchStream stream = + new SparkMicroBatchStream( + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()), + spark.sessionState().newHadoopConf()); // dsv1 DeltaSource DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); @@ -325,7 +332,10 @@ public void testGetFileChanges_EmptyVersions( deltaChanges.close(); // Test DSv2 SparkMicroBatchStream - SparkMicroBatchStream stream = new SparkMicroBatchStream(testTablePath, new Configuration()); + SparkMicroBatchStream stream = + new SparkMicroBatchStream( + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()), + spark.sessionState().newHadoopConf()); try (CloseableIterator kernelChanges = stream.getFileChanges(fromVersion, fromIndex, isInitialSnapshot, endOffset)) { List kernelFilesList = new ArrayList<>(); @@ -414,7 +424,10 @@ public void testGetFileChanges_OnRemoveFile_throwError( String.format("DSv1 should throw on REMOVE for scenario: %s", testDescription)); // Test DSv2 SparkMicroBatchStream - SparkMicroBatchStream stream = new SparkMicroBatchStream(testTablePath, new Configuration()); + SparkMicroBatchStream stream = + new SparkMicroBatchStream( + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()), + spark.sessionState().newHadoopConf()); UnsupportedOperationException dsv2Exception = assertThrows( UnsupportedOperationException.class, diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkScanBuilderTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkScanBuilderTest.java index cd32ea01b87..1a38b10e350 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkScanBuilderTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkScanBuilderTest.java @@ -20,12 +20,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import io.delta.kernel.Snapshot; -import io.delta.kernel.TableManager; import io.delta.kernel.expressions.Column; import io.delta.kernel.expressions.Literal; import io.delta.kernel.expressions.Predicate; -import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.spark.SparkDsv2TestBase; +import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager; import java.io.File; import java.lang.reflect.Field; import java.util.Arrays; @@ -51,7 +50,9 @@ public void testBuild_returnsScanWithExpectedSchema(@TempDir File tempDir) { String.format( "CREATE TABLE %s (id INT, name STRING, dep_id INT) USING delta PARTITIONED BY (dep_id) LOCATION '%s'", tableName, path)); - Snapshot snapshot = TableManager.loadSnapshot(path).build(defaultEngine); + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(path, spark.sessionState().newHadoopConf()); + Snapshot snapshot = snapshotManager.loadLatestSnapshot(); StructType dataSchema = DataTypes.createStructType( new StructField[] { @@ -65,10 +66,9 @@ public void testBuild_returnsScanWithExpectedSchema(@TempDir File tempDir) { SparkScanBuilder builder = new SparkScanBuilder( tableName, - path, + snapshotManager, dataSchema, partitionSchema, - (SnapshotImpl) snapshot, CaseInsensitiveStringMap.empty()); StructType expectedSparkSchema = @@ -93,7 +93,9 @@ public void testToMicroBatchStream_returnsSparkMicroBatchStream(@TempDir File te String.format( "CREATE TABLE %s (id INT, name STRING, dep_id INT) USING delta PARTITIONED BY (dep_id) LOCATION '%s'", tableName, path)); - Snapshot snapshot = TableManager.loadSnapshot(path).build(defaultEngine); + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(path, spark.sessionState().newHadoopConf()); + Snapshot snapshot = snapshotManager.loadLatestSnapshot(); StructType dataSchema = DataTypes.createStructType( new StructField[] { @@ -107,10 +109,9 @@ public void testToMicroBatchStream_returnsSparkMicroBatchStream(@TempDir File te SparkScanBuilder builder = new SparkScanBuilder( tableName, - path, + snapshotManager, dataSchema, partitionSchema, - (SnapshotImpl) snapshot, CaseInsensitiveStringMap.empty()); Scan scan = builder.build(); @@ -669,7 +670,9 @@ private SparkScanBuilder createTestScanBuilder(File tempDir) { String.format( "CREATE OR REPLACE TABLE %s (id INT, name STRING, dep_id INT) USING delta PARTITIONED BY (dep_id) LOCATION '%s'", tableName, path)); - Snapshot snapshot = TableManager.loadSnapshot(path).build(defaultEngine); + PathBasedSnapshotManager snapshotManager = + new PathBasedSnapshotManager(path, spark.sessionState().newHadoopConf()); + Snapshot snapshot = snapshotManager.loadLatestSnapshot(); StructType dataSchema = DataTypes.createStructType( new StructField[] { @@ -681,12 +684,7 @@ private SparkScanBuilder createTestScanBuilder(File tempDir) { DataTypes.createStructType( new StructField[] {DataTypes.createStructField("dep_id", DataTypes.IntegerType, true)}); return new SparkScanBuilder( - tableName, - path, - dataSchema, - partitionSchema, - (SnapshotImpl) snapshot, - CaseInsensitiveStringMap.empty()); + tableName, snapshotManager, dataSchema, partitionSchema, CaseInsensitiveStringMap.empty()); } private Predicate[] getPushedKernelPredicates(SparkScanBuilder builder) throws Exception { diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/PathBasedSnapshotManagerTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/PathBasedSnapshotManagerTest.java new file mode 100644 index 00000000000..a81242a2322 --- /dev/null +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/PathBasedSnapshotManagerTest.java @@ -0,0 +1,403 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.snapshot; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.delta.kernel.Snapshot; +import io.delta.kernel.internal.DeltaHistoryManager; +import io.delta.kernel.spark.SparkDsv2TestBase; +import io.delta.kernel.spark.exception.VersionNotFoundException; +import java.io.File; +import java.sql.Timestamp; +import java.util.stream.Stream; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.delta.DeltaLog; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import scala.Option; + +public class PathBasedSnapshotManagerTest extends SparkDsv2TestBase { + + private PathBasedSnapshotManager snapshotManager; + + @Test + public void testUnsafeVolatileSnapshot(@TempDir File tempDir) { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_volatile_snapshot"; + createEmptyTestTable(testTablePath, testTableName); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + org.apache.spark.sql.delta.Snapshot deltaSnapshot = deltaLog.unsafeVolatileSnapshot(); + Snapshot kernelSnapshot = snapshotManager.unsafeVolatileSnapshot(); + + spark.sql(String.format("INSERT INTO %s VALUES (4, 'David')", testTableName)); + + assertEquals(0L, deltaSnapshot.version()); + assertEquals(deltaSnapshot.version(), kernelSnapshot.getVersion()); + } + + @Test + public void testLoadLatestSnapshot(@TempDir File tempDir) { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_update"; + createEmptyTestTable(testTablePath, testTableName); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + + Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); + assertEquals(0L, initialSnapshot.getVersion()); + + spark.sql(String.format("INSERT INTO %s VALUES (4, 'David')", testTableName)); + + org.apache.spark.sql.delta.Snapshot deltaSnapshot = + deltaLog.update(false, Option.empty(), Option.empty()); + Snapshot updatedSnapshot = snapshotManager.loadLatestSnapshot(); + org.apache.spark.sql.delta.Snapshot cachedSnapshot = deltaLog.unsafeVolatileSnapshot(); + Snapshot kernelcachedSnapshot = snapshotManager.unsafeVolatileSnapshot(); + + assertEquals(1L, updatedSnapshot.getVersion()); + assertEquals(deltaSnapshot.version(), updatedSnapshot.getVersion()); + assertEquals(1L, kernelcachedSnapshot.getVersion()); + assertEquals(cachedSnapshot.version(), kernelcachedSnapshot.getVersion()); + } + + @Test + public void testMultipleLoadLatestSnapshot(@TempDir File tempDir) { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_multiple_updates"; + createEmptyTestTable(testTablePath, testTableName); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); + + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + + assertEquals(0L, snapshotManager.loadLatestSnapshot().getVersion()); + + for (int i = 0; i < 3; i++) { + spark.sql( + String.format("INSERT INTO %s VALUES (%d, 'User%d')", testTableName, 20 + i, 20 + i)); + + org.apache.spark.sql.delta.Snapshot deltaSnapshot = + deltaLog.update(false, Option.empty(), Option.empty()); + Snapshot kernelSnapshot = snapshotManager.loadLatestSnapshot(); + + long expectedVersion = i + 1; + assertEquals(expectedVersion, deltaSnapshot.version()); + assertEquals(expectedVersion, kernelSnapshot.getVersion()); + } + } + + @Test + public void testLoadSnapshotAt(@TempDir File tempDir) { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_load_at_version"; + createEmptyTestTable(testTablePath, testTableName); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); + + // Create multiple versions + for (int i = 0; i < 3; i++) { + spark.sql( + String.format("INSERT INTO %s VALUES (%d, 'User%d')", testTableName, 10 + i, 10 + i)); + } + + // Load specific versions + Snapshot snapshot0 = snapshotManager.loadSnapshotAt(0L); + assertEquals(0L, snapshot0.getVersion()); + + Snapshot snapshot1 = snapshotManager.loadSnapshotAt(1L); + assertEquals(1L, snapshot1.getVersion()); + + Snapshot snapshot2 = snapshotManager.loadSnapshotAt(2L); + assertEquals(2L, snapshot2.getVersion()); + + Snapshot snapshot3 = snapshotManager.loadSnapshotAt(3L); + assertEquals(3L, snapshot3.getVersion()); + + // Note: loadSnapshotAt does not update the cached snapshot + } + + private void setupTableWithDeletedVersions(String testTablePath, String testTableName) { + createEmptyTestTable(testTablePath, testTableName); + for (int i = 0; i < 10; i++) { + spark.sql( + String.format("INSERT INTO %s VALUES (%d, 'User%d')", testTableName, 100 + i, 100 + i)); + } + File deltaLogDir = new File(testTablePath, "_delta_log"); + File version0File = new File(deltaLogDir, "00000000000000000000.json"); + File version1File = new File(deltaLogDir, "00000000000000000001.json"); + assertTrue(version0File.exists()); + assertTrue(version1File.exists()); + version0File.delete(); + version1File.delete(); + assertFalse(version0File.exists()); + assertFalse(version1File.exists()); + } + + @Test + public void testGetActiveCommitAtTime_pastTimestamp(@TempDir File tempDir) throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_commit_past"; + setupTableWithDeletedVersions(testTablePath, testTableName); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); + + Thread.sleep(100); + Timestamp timestamp = new Timestamp(System.currentTimeMillis()); + spark.sql(String.format("INSERT INTO %s VALUES (200, 'NewUser')", testTableName)); + + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + org.apache.spark.sql.delta.DeltaHistoryManager.Commit deltaCommit = + deltaLog + .history() + .getActiveCommitAtTime( + timestamp, + Option.empty() /* catalogTable */, + false /* canReturnLastCommit */, + true /* mustBeRecreatable */, + false /* canReturnEarliestCommit */); + + DeltaHistoryManager.Commit kernelCommit = + snapshotManager.getActiveCommitAtTime( + timestamp, + false /* canReturnLastCommit */, + true /* mustBeRecreatable */, + false /* canReturnEarliestCommit */); + + assertEquals(deltaCommit.version(), kernelCommit.getVersion()); + assertEquals(deltaCommit.timestamp(), kernelCommit.getTimestamp()); + } + + @Test + public void testGetActiveCommitAtTime_futureTimestamp_canReturnLast(@TempDir File tempDir) + throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_commit_future_last"; + setupTableWithDeletedVersions(testTablePath, testTableName); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); + + Timestamp futureTimestamp = new Timestamp(System.currentTimeMillis() + 10000); + + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + org.apache.spark.sql.delta.DeltaHistoryManager.Commit deltaCommit = + deltaLog + .history() + .getActiveCommitAtTime( + futureTimestamp, + Option.empty() /* catalogTable */, + true /* canReturnLastCommit */, + true /* mustBeRecreatable */, + false /* canReturnEarliestCommit */); + + DeltaHistoryManager.Commit kernelCommit = + snapshotManager.getActiveCommitAtTime( + futureTimestamp, + true /* canReturnLastCommit */, + true /* mustBeRecreatable */, + false /* canReturnEarliestCommit */); + + assertEquals(deltaCommit.version(), kernelCommit.getVersion()); + assertEquals(deltaCommit.timestamp(), kernelCommit.getTimestamp()); + } + + @Test + public void testGetActiveCommitAtTime_futureTimestamp_notMustBeRecreatable(@TempDir File tempDir) + throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_commit_future_not_recreatable"; + setupTableWithDeletedVersions(testTablePath, testTableName); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); + + Timestamp futureTimestamp = new Timestamp(System.currentTimeMillis() + 10000); + + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + org.apache.spark.sql.delta.DeltaHistoryManager.Commit deltaCommit = + deltaLog + .history() + .getActiveCommitAtTime( + futureTimestamp, + Option.empty() /* catalogTable */, + true /* canReturnLastCommit */, + false /* mustBeRecreatable */, + false /* canReturnEarliestCommit */); + + DeltaHistoryManager.Commit kernelCommit = + snapshotManager.getActiveCommitAtTime( + futureTimestamp, + true /* canReturnLastCommit */, + false /* mustBeRecreatable */, + false /* canReturnEarliestCommit */); + + assertEquals(deltaCommit.version(), kernelCommit.getVersion()); + assertEquals(deltaCommit.timestamp(), kernelCommit.getTimestamp()); + } + + @Test + public void testGetActiveCommitAtTime_earlyTimestamp_canReturnEarliest(@TempDir File tempDir) + throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_commit_early"; + setupTableWithDeletedVersions(testTablePath, testTableName); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); + + Timestamp earlyTimestamp = new Timestamp(0); + + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + org.apache.spark.sql.delta.DeltaHistoryManager.Commit deltaCommit = + deltaLog + .history() + .getActiveCommitAtTime( + earlyTimestamp, + Option.empty() /* catalogTable */, + false /* canReturnLastCommit */, + true /* mustBeRecreatable */, + true /* canReturnEarliestCommit */); + + DeltaHistoryManager.Commit kernelCommit = + snapshotManager.getActiveCommitAtTime( + earlyTimestamp, + false /* canReturnLastCommit */, + true /* mustBeRecreatable */, + true /* canReturnEarliestCommit */); + + assertEquals(deltaCommit.version(), kernelCommit.getVersion()); + assertEquals(deltaCommit.timestamp(), kernelCommit.getTimestamp()); + } + + @Test + public void testGetActiveCommitAtTime_earlyTimestamp_notMustBeRecreatable_canReturnEarliest( + @TempDir File tempDir) throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_commit_early_not_recreatable"; + setupTableWithDeletedVersions(testTablePath, testTableName); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); + + Timestamp earlyTimestamp = new Timestamp(0); + + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + org.apache.spark.sql.delta.DeltaHistoryManager.Commit deltaCommit = + deltaLog + .history() + .getActiveCommitAtTime( + earlyTimestamp, + Option.empty() /* catalogTable */, + false /* canReturnLastCommit */, + false /* mustBeRecreatable */, + true /* canReturnEarliestCommit */); + + DeltaHistoryManager.Commit kernelCommit = + snapshotManager.getActiveCommitAtTime( + earlyTimestamp, + false /* canReturnLastCommit */, + false /* mustBeRecreatable */, + true /* canReturnEarliestCommit */); + + assertEquals(deltaCommit.version(), kernelCommit.getVersion()); + assertEquals(deltaCommit.timestamp(), kernelCommit.getTimestamp()); + } + + private static Stream checkVersionExistsTestCases() { + return Stream.of( + Arguments.of( + "current", + 10L /* versionToCheck */, + true /* mustBeRecreatable */, + false /* allowOutOfRange */, + false /* shouldThrow */), + Arguments.of( + "notAllowOutOfRange", + 21L /* versionToCheck */, + true /* mustBeRecreatable */, + false /* allowOutOfRange */, + true /* shouldThrow */), + Arguments.of( + "allowOutOfRange", + 21L /* versionToCheck */, + true /* mustBeRecreatable */, + true /* allowOutOfRange */, + false /* shouldThrow */), + Arguments.of( + "belowEarliest", + 1L /* versionToCheck */, + true /* mustBeRecreatable */, + false /* allowOutOfRange */, + true /* shouldThrow */), + Arguments.of( + "mustBeRecreatable_false", + 2L /* versionToCheck */, + false /* mustBeRecreatable */, + false /* allowOutOfRange */, + false /* shouldThrow */), + Arguments.of( + "mustBeRecreatable_true", + 2L /* versionToCheck */, + true /* mustBeRecreatable */, + false /* allowOutOfRange */, + true /* shouldThrow */)); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("checkVersionExistsTestCases") + public void testCheckVersionExists( + String testName, + long versionToCheck, + boolean mustBeRecreatable, + boolean allowOutOfRange, + boolean shouldThrow, + @TempDir File tempDir) + throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_version_" + testName; + setupTableWithDeletedVersions(testTablePath, testTableName); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + + if (shouldThrow) { + assertThrows( + VersionNotFoundException.class, + () -> + snapshotManager.checkVersionExists( + versionToCheck, mustBeRecreatable, allowOutOfRange)); + + assertThrows( + org.apache.spark.sql.delta.VersionNotFoundException.class, + () -> + deltaLog + .history() + .checkVersionExists( + versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange)); + } else { + snapshotManager.checkVersionExists(versionToCheck, mustBeRecreatable, allowOutOfRange); + deltaLog + .history() + .checkVersionExists(versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange); + } + } +} diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/StreamingHelperTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/StreamingHelperTest.java index d1991ee1edb..4ac1531752b 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/utils/StreamingHelperTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/utils/StreamingHelperTest.java @@ -24,6 +24,7 @@ import io.delta.kernel.internal.DeltaHistoryManager; import io.delta.kernel.spark.SparkDsv2TestBase; import io.delta.kernel.spark.exception.VersionNotFoundException; +import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager; import java.io.File; import java.sql.Timestamp; import java.util.stream.Stream; @@ -38,17 +39,18 @@ public class StreamingHelperTest extends SparkDsv2TestBase { - private StreamingHelper streamingHelper; + private PathBasedSnapshotManager snapshotManager; @Test public void testUnsafeVolatileSnapshot(@TempDir File tempDir) { String testTablePath = tempDir.getAbsolutePath(); String testTableName = "test_volatile_snapshot"; createEmptyTestTable(testTablePath, testTableName); - streamingHelper = new StreamingHelper(testTablePath, spark.sessionState().newHadoopConf()); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); org.apache.spark.sql.delta.Snapshot deltaSnapshot = deltaLog.unsafeVolatileSnapshot(); - Snapshot kernelSnapshot = streamingHelper.unsafeVolatileSnapshot(); + Snapshot kernelSnapshot = snapshotManager.unsafeVolatileSnapshot(); spark.sql(String.format("INSERT INTO %s VALUES (4, 'David')", testTableName)); @@ -61,19 +63,20 @@ public void testLoadLatestSnapshot(@TempDir File tempDir) { String testTablePath = tempDir.getAbsolutePath(); String testTableName = "test_update"; createEmptyTestTable(testTablePath, testTableName); - streamingHelper = new StreamingHelper(testTablePath, spark.sessionState().newHadoopConf()); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); - Snapshot initialSnapshot = streamingHelper.loadLatestSnapshot(); + Snapshot initialSnapshot = snapshotManager.loadLatestSnapshot(); assertEquals(0L, initialSnapshot.getVersion()); spark.sql(String.format("INSERT INTO %s VALUES (4, 'David')", testTableName)); org.apache.spark.sql.delta.Snapshot deltaSnapshot = deltaLog.update(false, Option.empty(), Option.empty()); - Snapshot updatedSnapshot = streamingHelper.loadLatestSnapshot(); + Snapshot updatedSnapshot = snapshotManager.loadLatestSnapshot(); org.apache.spark.sql.delta.Snapshot cachedSnapshot = deltaLog.unsafeVolatileSnapshot(); - Snapshot kernelcachedSnapshot = streamingHelper.unsafeVolatileSnapshot(); + Snapshot kernelcachedSnapshot = snapshotManager.unsafeVolatileSnapshot(); assertEquals(1L, updatedSnapshot.getVersion()); assertEquals(deltaSnapshot.version(), updatedSnapshot.getVersion()); @@ -86,11 +89,12 @@ public void testMultipleLoadLatestSnapshot(@TempDir File tempDir) { String testTablePath = tempDir.getAbsolutePath(); String testTableName = "test_multiple_updates"; createEmptyTestTable(testTablePath, testTableName); - streamingHelper = new StreamingHelper(testTablePath, spark.sessionState().newHadoopConf()); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); - assertEquals(0L, streamingHelper.loadLatestSnapshot().getVersion()); + assertEquals(0L, snapshotManager.loadLatestSnapshot().getVersion()); for (int i = 0; i < 3; i++) { spark.sql( @@ -98,7 +102,7 @@ public void testMultipleLoadLatestSnapshot(@TempDir File tempDir) { org.apache.spark.sql.delta.Snapshot deltaSnapshot = deltaLog.update(false, Option.empty(), Option.empty()); - Snapshot kernelSnapshot = streamingHelper.loadLatestSnapshot(); + Snapshot kernelSnapshot = snapshotManager.loadLatestSnapshot(); long expectedVersion = i + 1; assertEquals(expectedVersion, deltaSnapshot.version()); @@ -128,7 +132,8 @@ public void testGetActiveCommitAtTime_pastTimestamp(@TempDir File tempDir) throw String testTablePath = tempDir.getAbsolutePath(); String testTableName = "test_commit_past"; setupTableWithDeletedVersions(testTablePath, testTableName); - streamingHelper = new StreamingHelper(testTablePath, spark.sessionState().newHadoopConf()); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); Thread.sleep(100); Timestamp timestamp = new Timestamp(System.currentTimeMillis()); @@ -146,7 +151,7 @@ public void testGetActiveCommitAtTime_pastTimestamp(@TempDir File tempDir) throw false /* canReturnEarliestCommit */); DeltaHistoryManager.Commit kernelCommit = - streamingHelper.getActiveCommitAtTime( + snapshotManager.getActiveCommitAtTime( timestamp, false /* canReturnLastCommit */, true /* mustBeRecreatable */, @@ -162,7 +167,8 @@ public void testGetActiveCommitAtTime_futureTimestamp_canReturnLast(@TempDir Fil String testTablePath = tempDir.getAbsolutePath(); String testTableName = "test_commit_future_last"; setupTableWithDeletedVersions(testTablePath, testTableName); - streamingHelper = new StreamingHelper(testTablePath, spark.sessionState().newHadoopConf()); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); Timestamp futureTimestamp = new Timestamp(System.currentTimeMillis() + 10000); @@ -178,7 +184,7 @@ public void testGetActiveCommitAtTime_futureTimestamp_canReturnLast(@TempDir Fil false /* canReturnEarliestCommit */); DeltaHistoryManager.Commit kernelCommit = - streamingHelper.getActiveCommitAtTime( + snapshotManager.getActiveCommitAtTime( futureTimestamp, true /* canReturnLastCommit */, true /* mustBeRecreatable */, @@ -194,7 +200,8 @@ public void testGetActiveCommitAtTime_futureTimestamp_notMustBeRecreatable(@Temp String testTablePath = tempDir.getAbsolutePath(); String testTableName = "test_commit_future_not_recreatable"; setupTableWithDeletedVersions(testTablePath, testTableName); - streamingHelper = new StreamingHelper(testTablePath, spark.sessionState().newHadoopConf()); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); Timestamp futureTimestamp = new Timestamp(System.currentTimeMillis() + 10000); @@ -210,7 +217,7 @@ public void testGetActiveCommitAtTime_futureTimestamp_notMustBeRecreatable(@Temp false /* canReturnEarliestCommit */); DeltaHistoryManager.Commit kernelCommit = - streamingHelper.getActiveCommitAtTime( + snapshotManager.getActiveCommitAtTime( futureTimestamp, true /* canReturnLastCommit */, false /* mustBeRecreatable */, @@ -226,7 +233,8 @@ public void testGetActiveCommitAtTime_earlyTimestamp_canReturnEarliest(@TempDir String testTablePath = tempDir.getAbsolutePath(); String testTableName = "test_commit_early"; setupTableWithDeletedVersions(testTablePath, testTableName); - streamingHelper = new StreamingHelper(testTablePath, spark.sessionState().newHadoopConf()); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); Timestamp earlyTimestamp = new Timestamp(0); @@ -242,7 +250,7 @@ public void testGetActiveCommitAtTime_earlyTimestamp_canReturnEarliest(@TempDir true /* canReturnEarliestCommit */); DeltaHistoryManager.Commit kernelCommit = - streamingHelper.getActiveCommitAtTime( + snapshotManager.getActiveCommitAtTime( earlyTimestamp, false /* canReturnLastCommit */, true /* mustBeRecreatable */, @@ -258,7 +266,8 @@ public void testGetActiveCommitAtTime_earlyTimestamp_notMustBeRecreatable_canRet String testTablePath = tempDir.getAbsolutePath(); String testTableName = "test_commit_early_not_recreatable"; setupTableWithDeletedVersions(testTablePath, testTableName); - streamingHelper = new StreamingHelper(testTablePath, spark.sessionState().newHadoopConf()); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); Timestamp earlyTimestamp = new Timestamp(0); @@ -274,7 +283,7 @@ public void testGetActiveCommitAtTime_earlyTimestamp_notMustBeRecreatable_canRet true /* canReturnEarliestCommit */); DeltaHistoryManager.Commit kernelCommit = - streamingHelper.getActiveCommitAtTime( + snapshotManager.getActiveCommitAtTime( earlyTimestamp, false /* canReturnLastCommit */, false /* mustBeRecreatable */, @@ -337,14 +346,15 @@ public void testCheckVersionExists( String testTablePath = tempDir.getAbsolutePath(); String testTableName = "test_version_" + testName; setupTableWithDeletedVersions(testTablePath, testTableName); - streamingHelper = new StreamingHelper(testTablePath, spark.sessionState().newHadoopConf()); + snapshotManager = + new PathBasedSnapshotManager(testTablePath, spark.sessionState().newHadoopConf()); DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); if (shouldThrow) { assertThrows( VersionNotFoundException.class, () -> - streamingHelper.checkVersionExists( + snapshotManager.checkVersionExists( versionToCheck, mustBeRecreatable, allowOutOfRange)); assertThrows( @@ -355,7 +365,7 @@ public void testCheckVersionExists( .checkVersionExists( versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange)); } else { - streamingHelper.checkVersionExists(versionToCheck, mustBeRecreatable, allowOutOfRange); + snapshotManager.checkVersionExists(versionToCheck, mustBeRecreatable, allowOutOfRange); deltaLog .history() .checkVersionExists(versionToCheck, Option.empty(), mustBeRecreatable, allowOutOfRange);