diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/exception/VersionNotFoundException.java b/kernel-spark/src/main/java/io/delta/kernel/spark/exception/VersionNotFoundException.java index cf6f59ac1d5..d0110468d32 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/exception/VersionNotFoundException.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/exception/VersionNotFoundException.java @@ -18,10 +18,29 @@ /** Exception thrown when a requested version is not available in the Delta log. */ public class VersionNotFoundException extends Exception { + private final long userVersion; + private final long earliest; + private final long latest; + public VersionNotFoundException(long userVersion, long earliest, long latest) { super( String.format( "Cannot time travel Delta table to version %d. Available versions: [%d, %d].", userVersion, earliest, latest)); + this.userVersion = userVersion; + this.earliest = earliest; + this.latest = latest; + } + + public long getUserVersion() { + return userVersion; + } + + public long getEarliest() { + return earliest; + } + + public long getLatest() { + return latest; } } diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java index a8b85db29d1..920c71ecc47 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java @@ -19,6 +19,7 @@ import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction; import io.delta.kernel.internal.actions.AddFile; import io.delta.kernel.internal.actions.RemoveFile; @@ -28,26 +29,50 @@ import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReaderFactory; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.delta.DeltaErrors; +import org.apache.spark.sql.delta.DeltaOptions; +import org.apache.spark.sql.delta.DeltaStartingVersion; +import org.apache.spark.sql.delta.StartingVersion; +import org.apache.spark.sql.delta.StartingVersionLatest$; +import org.apache.spark.sql.delta.sources.DeltaSQLConf; import org.apache.spark.sql.delta.sources.DeltaSourceOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Option; public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger logger = LoggerFactory.getLogger(SparkMicroBatchStream.class); + private static final Set ACTION_SET = Collections.unmodifiableSet( new HashSet<>(Arrays.asList(DeltaAction.ADD, DeltaAction.REMOVE))); private final Engine engine; private final String tablePath; + private final Optional options; + private final StreamingHelper streamingHelper; + private final SparkSession spark; public SparkMicroBatchStream(String tablePath, Configuration hadoopConf) { + this(tablePath, hadoopConf, SparkSession.active(), /* options= */ Optional.empty()); + } + + public SparkMicroBatchStream( + String tablePath, + Configuration hadoopConf, + SparkSession spark, + Optional options) { + this.spark = spark; this.tablePath = tablePath; this.engine = DefaultEngine.create(hadoopConf); + this.options = options; + this.streamingHelper = new StreamingHelper(tablePath, hadoopConf); } //////////// @@ -97,6 +122,108 @@ public void stop() { throw new UnsupportedOperationException("stop is not supported"); } + /////////////////////// + // getStartingVersion // + /////////////////////// + + /** + * Extracts whether users provided the option to time travel a relation. If a query restarts from + * a checkpoint and the checkpoint has recorded the offset, this method should never be called. + * + *

This is the DSv2 Kernel-based implementation of DeltaSource.getStartingVersion. + */ + Optional getStartingVersion() { + if (!options.isPresent()) { + return Optional.empty(); + } + + DeltaOptions opts = options.get(); + + // TODO(#5319): DeltaSource.scala uses `allowOutOfRange` parameter from + // DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP. + if (opts.startingVersion().isDefined()) { + DeltaStartingVersion startingVersion = opts.startingVersion().get(); + if (startingVersion instanceof StartingVersionLatest$) { + Snapshot latestSnapshot = streamingHelper.loadLatestSnapshot(); + // "latest": start reading from the next commit + return Optional.of(latestSnapshot.getVersion() + 1); + } else if (startingVersion instanceof StartingVersion) { + long version = ((StartingVersion) startingVersion).version(); + if (!validateProtocolAt(spark, tablePath, engine, version)) { + // When starting from a given version, we don't require that the snapshot of this + // version can be reconstructed, even though the input table is technically in an + // inconsistent state. If the snapshot cannot be reconstructed, then the protocol + // check is skipped, so this is technically not safe, but we keep it this way for + // historical reasons. + try { + streamingHelper.checkVersionExists( + version, /* mustBeRecreatable= */ false, /* allowOutOfRange= */ false); + } catch (io.delta.kernel.spark.exception.VersionNotFoundException e) { + // Re-throw as DSv1 VersionNotFoundException wrapped in RuntimeException + // This maintains the same error message and semantics as DSv1 + org.apache.spark.sql.delta.VersionNotFoundException dsv1Exception = + org.apache.spark.sql.delta.VersionNotFoundException$.MODULE$.apply( + e.getUserVersion(), e.getEarliest(), e.getLatest()); + throw new RuntimeException(dsv1Exception); + } + } + return Optional.of(version); + } + } + // TODO(#5319): Implement startingTimestamp support + return Optional.empty(); + } + + /** + * Validate the protocol at a given version. If the snapshot reconstruction fails for any other + * reason than unsupported feature exception, we suppress it. This allows fallback to previous + * behavior where the starting version/timestamp was not mandatory to point to reconstructable + * snapshot. + * + *

This is the DSv2 Kernel-based implementation of DeltaSource.validateProtocolAt. + * + *

Returns true when the validation was performed and succeeded. + */ + private static boolean validateProtocolAt( + SparkSession spark, String tablePath, Engine engine, long version) { + boolean alwaysValidateProtocol = + (Boolean) + spark + .sessionState() + .conf() + .getConf(DeltaSQLConf.FAST_DROP_FEATURE_STREAMING_ALWAYS_VALIDATE_PROTOCOL()); + if (!alwaysValidateProtocol) { + return false; + } + + try { + // Attempt to construct a snapshot at the startingVersion to validate the protocol + // If snapshot reconstruction fails, fall back to old behavior where the only + // requirement was for the commit to exist + TableManager.loadSnapshot(tablePath).atVersion(version).build(engine); + return true; + } catch (KernelException e) { + // Check if it's an unsupported table feature exception + // Kernel throws plain KernelException (not a subclass) for unsupported features, + // so we must check the message. See DeltaErrors.unsupportedTableFeature, + // DeltaErrors.unsupportedReaderFeatures, and DeltaErrors.unsupportedWriterFeatures. + // TODO(#5369): Use specific exception types instead of message parsing + String exceptionMessage = e.getMessage(); + if (exceptionMessage != null + && (exceptionMessage.contains("Unsupported Delta reader features") + || exceptionMessage.contains("Unsupported Delta table feature"))) { + throw e; + } + // Suppress other non-fatal KernelExceptions + logger.warn("Protocol validation failed at version {} with: {}", version, e.getMessage()); + return false; + } catch (Exception e) { + // Suppress other non-fatal exceptions + logger.warn("Protocol validation failed at version {} with: {}", version, e.getMessage()); + return false; + } + } + //////////////////// // getFileChanges // //////////////////// diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java index 656d364e70d..69f26ea4792 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkScan.java @@ -39,6 +39,7 @@ import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.connector.read.*; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.delta.DeltaOptions; import org.apache.spark.sql.execution.datasources.*; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.sources.Filter; @@ -126,7 +127,9 @@ public Batch toBatch() { @Override public MicroBatchStream toMicroBatchStream(String checkpointLocation) { - return new SparkMicroBatchStream(tablePath, hadoopConf); + DeltaOptions deltaOptions = new DeltaOptions(scalaOptions, sqlConf); + return new SparkMicroBatchStream( + tablePath, hadoopConf, SparkSession.active(), Optional.of(deltaOptions)); } @Override diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java new file mode 100644 index 00000000000..097b9abc67b --- /dev/null +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java @@ -0,0 +1,328 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.spark.read; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import io.delta.kernel.spark.SparkDsv2TestBase; +import java.io.File; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.delta.CheckpointInstance; +import org.apache.spark.sql.delta.DeltaLog; +import org.apache.spark.sql.delta.DeltaOptions; +import org.apache.spark.sql.delta.Snapshot; +import org.apache.spark.sql.delta.sources.DeltaSQLConf; +import org.apache.spark.sql.delta.sources.DeltaSource; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import scala.Option; +import scala.collection.immutable.Map$; + +/** Tests for SparkMicroBatchStream.getStartingVersion parity between DSv1 and DSv2. */ +public class SparkMicroBatchStreamGetStartingVersionTest extends SparkDsv2TestBase { + + /** + * Parameterized test that verifies parity between DSv1 DeltaSource.getStartingVersion and DSv2 + * SparkMicroBatchStream.getStartingVersion. + */ + @ParameterizedTest + @MethodSource("getStartingVersionParameters") + public void testGetStartingVersion( + String startingVersion, Optional expectedVersion, @TempDir File tempDir) + throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_starting_version_" + System.nanoTime(); + createEmptyTestTable(testTablePath, testTableName); + + // Create 5 versions (version 0 = CREATE TABLE, versions 1-5 = INSERTs) + createVersions(testTableName, 5); + + testAndCompareStartingVersion( + testTablePath, startingVersion, expectedVersion, "startingVersion=" + startingVersion); + } + + /** Provides test parameters for the parameterized getStartingVersion test. */ + private static Stream getStartingVersionParameters() { + return Stream.of( + Arguments.of(/* startingVersion= */ "0", /* expectedVersion= */ Optional.of(0L)), + Arguments.of(/* startingVersion= */ "1", /* expectedVersion= */ Optional.of(1L)), + Arguments.of(/* startingVersion= */ "3", /* expectedVersion= */ Optional.of(3L)), + Arguments.of(/* startingVersion= */ "5", /* expectedVersion= */ Optional.of(5L)), + Arguments.of(/* startingVersion= */ "latest", /* expectedVersion= */ Optional.of(6L)), + Arguments.of(/* startingVersion= */ null, /* expectedVersion= */ Optional.empty())); + } + + /** + * Test that verifies both DSv1 and DSv2 handle the case where no DeltaOptions are provided. DSv1 + * receives an empty DeltaOptions (no parameters), while DSv2 receives Optional.empty(). This + * tests the equivalence between these two approaches. + */ + @Test + public void testGetStartingVersion_NoOptions(@TempDir File tempDir) throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_no_options_" + System.nanoTime(); + createEmptyTestTable(testTablePath, testTableName); + + // Create 5 versions (version 0 = CREATE TABLE, versions 1-5 = INSERTs) + createVersions(testTableName, 5); + + // dsv1 + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + DeltaOptions emptyOptions = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf()); + DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath, emptyOptions); + scala.Option dsv1Result = deltaSource.getStartingVersion(); + + // dsv2 + SparkMicroBatchStream dsv2Stream = + new SparkMicroBatchStream( + testTablePath, new Configuration(), spark, /* options= */ Optional.empty()); + Optional dsv2Result = dsv2Stream.getStartingVersion(); + + compareStartingVersionResults(dsv1Result, dsv2Result, Optional.empty(), "No options provided"); + } + + /** Test that verifies both DSv1 and DSv2 handle negative startingVersion values identically. */ + @Test + public void testGetStartingVersion_NegativeVersion_throwsError(@TempDir File tempDir) + throws Exception { + // Negative values are rejected during DeltaOptions parsing, before getStartingVersion is + // called. + assertThrows(IllegalArgumentException.class, () -> createDeltaOptions("-1")); + } + + /** + * Parameterized test that verifies both DSv1 and DSv2 handle the protocol validation behavior + * identically with the validation flag on/off. + * + *

When protocol validation is enabled, validateProtocolAt is called and must succeed. When + * disabled, the code immediately falls back to checkVersionExists without protocol validation. + */ + @ParameterizedTest + @MethodSource("protocolValidationParameters") + public void testGetStartingVersion_ProtocolValidationFlag( + boolean enableProtocolValidation, + String startingVersion, + String testDescription, + @TempDir File tempDir) + throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = + "test_protocol_fallback_" + Math.abs(testDescription.hashCode()) + "_" + System.nanoTime(); + createEmptyTestTable(testTablePath, testTableName); + + // Create 5 versions (version 0 = CREATE TABLE, versions 1-5 = INSERTs) + createVersions(testTableName, 5); + + // Test with protocol validation enabled/disabled + String configKey = DeltaSQLConf.FAST_DROP_FEATURE_STREAMING_ALWAYS_VALIDATE_PROTOCOL().key(); + try { + spark.conf().set(configKey, String.valueOf(enableProtocolValidation)); + testAndCompareStartingVersion( + testTablePath, + startingVersion, + Optional.of(Long.parseLong(startingVersion)), + testDescription); + } finally { + spark.conf().unset(configKey); + } + } + + /** Provides test parameters for protocol validation scenarios. */ + private static Stream protocolValidationParameters() { + return Stream.of( + Arguments.of( + /* enableProtocolValidation= */ true, + /* startingVersion= */ "2", + "Protocol validation enabled"), + Arguments.of( + /* enableProtocolValidation= */ false, + /* startingVersion= */ "3", + "Protocol validation disabled")); + } + + // TODO(#5320): Add test for unsupported table feature + // Test case where protocol validation encounters an unsupported table feature and throws + // (does NOT fall back to checkVersionExists). This is difficult to test reliably as it + // requires creating a table with features that Kernel doesn't support, which Spark SQL + // validates upfront. This scenario is tested through integration tests. + + /** + * Test case where protocol validation fails with a non-feature exception (snapshot cannot be + * recreated), but checkVersionExists succeeds (commit logically exists). + * + *

Scenario: After creating a checkpoint at version 10, old log files 0-5 are deleted + * (simulating log cleanup by timestamp). This makes version 7 non-recreatable (it exists between + * the deleted logs and the checkpoint). Protocol validation fails when trying to build snapshot + * at version 7, but checkVersionExists succeeds because the commit still logically exists. + */ + @Test + public void testGetStartingVersion_ProtocolValidationNonFeatureExceptionFallback( + @TempDir File tempDir) throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = "test_non_recreatable_" + System.nanoTime(); + createEmptyTestTable(testTablePath, testTableName); + + // Create 10 versions (version 0 = CREATE TABLE, versions 1-10 = INSERTs) + createVersions(testTableName, /* numVersions= */ 10); + + // Create checkpoint at version 10 + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + Snapshot snapshotV10 = + deltaLog.getSnapshotAt( + 10, Option.empty(), Option.empty(), false); + deltaLog.checkpoint(snapshotV10, Option.empty()); + + // Simulate log cleanup by timestamp: delete logs 0-5 + // This makes version 7 non-recreatable while allowing DeltaLog to load the latest snapshot + Path logPath = new Path(testTablePath, "_delta_log"); + for (long version = 0; version <= 5; version++) { + Path logFile = new Path(logPath, String.format("%020d.json", version)); + File file = new File(logFile.toUri().getPath()); + if (file.exists()) { + file.delete(); + } + } + + // Test with startingVersion=7 (a version that's no longer recreatable but logically exists) + String startingVersion = "7"; + + // dsv1 + DeltaLog freshDeltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + DeltaSource deltaSource = + createDeltaSource(freshDeltaLog, testTablePath, createDeltaOptions(startingVersion)); + scala.Option dsv1Result = deltaSource.getStartingVersion(); + + // dsv2 + SparkMicroBatchStream dsv2Stream = + new SparkMicroBatchStream( + testTablePath, + new Configuration(), + spark, + Optional.ofNullable(createDeltaOptions(startingVersion))); + Optional dsv2Result = dsv2Stream.getStartingVersion(); + + compareStartingVersionResults( + dsv1Result, + dsv2Result, + Optional.of(Long.parseLong(startingVersion)), + "Protocol validation fallback with non-recreatable version"); + } + + // ================================================================================================ + // Helper methods + // ================================================================================================ + + /** Helper method to create multiple versions by inserting rows. */ + private void createVersions(String testTableName, int numVersions) { + for (int i = 1; i <= numVersions; i++) { + sql("INSERT INTO %s VALUES (%d, 'User%d')", testTableName, i, i); + } + } + + /** Helper method to test and compare getStartingVersion results from DSv1 and DSv2. */ + private void testAndCompareStartingVersion( + String testTablePath, + String startingVersion, + Optional expectedVersion, + String testDescription) + throws Exception { + // DSv1: Create DeltaSource and get starting version + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + DeltaSource deltaSource = + createDeltaSource(deltaLog, testTablePath, createDeltaOptions(startingVersion)); + scala.Option dsv1Result = deltaSource.getStartingVersion(); + + // DSv2: Create SparkMicroBatchStream and get starting version + SparkMicroBatchStream dsv2Stream = + new SparkMicroBatchStream( + testTablePath, + new Configuration(), + spark, + Optional.ofNullable(createDeltaOptions(startingVersion))); + Optional dsv2Result = dsv2Stream.getStartingVersion(); + + compareStartingVersionResults(dsv1Result, dsv2Result, expectedVersion, testDescription); + } + + /** Helper method to execute SQL with String.format. */ + private static void sql(String query, Object... args) { + SparkDsv2TestBase.spark.sql(String.format(query, args)); + } + + /** Helper method to create a DeltaSource instance with custom options for testing. */ + private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath, DeltaOptions options) { + scala.collection.immutable.Seq emptySeq = + scala.collection.JavaConverters.asScalaBuffer( + new java.util.ArrayList()) + .toList(); + Snapshot snapshot = deltaLog.update(false, Option.empty(), Option.empty()); + return new DeltaSource( + spark, + deltaLog, + /* catalogTableOpt= */ Option.empty(), + options, + /* snapshotAtSourceInit= */ snapshot, + /* metadataPath= */ tablePath + "/_checkpoint", + /* metadataTrackingLog= */ Option.empty(), + /* filters= */ emptySeq); + } + + /** Helper method to create DeltaOptions with startingVersion for testing. */ + private DeltaOptions createDeltaOptions(String startingVersionValue) { + if (startingVersionValue == null) { + // Empty options + return new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf()); + } else { + // Create Scala Map with startingVersion + scala.collection.immutable.Map scalaMap = + Map$.MODULE$.empty().updated("startingVersion", startingVersionValue); + return new DeltaOptions(scalaMap, spark.sessionState().conf()); + } + } + + /** Helper method to compare getStartingVersion results from DSv1 and DSv2. */ + private void compareStartingVersionResults( + scala.Option dsv1Result, + Optional dsv2Result, + Optional expectedVersion, + String testDescription) { + + Optional dsv1Optional; + if (dsv1Result.isEmpty()) { + dsv1Optional = Optional.empty(); + } else { + dsv1Optional = Optional.of((Long) dsv1Result.get()); + } + + assertEquals( + dsv1Optional, + dsv2Result, + String.format("DSv1 and DSv2 getStartingVersion should match for %s", testDescription)); + + assertEquals( + expectedVersion, + dsv2Result, + String.format("DSv2 getStartingVersion should match for %s", testDescription)); + } +}