Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,29 @@
/** Exception thrown when a requested version is not available in the Delta log. */
public class VersionNotFoundException extends Exception {

private final long userVersion;
private final long earliest;
private final long latest;

public VersionNotFoundException(long userVersion, long earliest, long latest) {
super(
String.format(
"Cannot time travel Delta table to version %d. Available versions: [%d, %d].",
userVersion, earliest, latest));
this.userVersion = userVersion;
this.earliest = earliest;
this.latest = latest;
}

public long getUserVersion() {
return userVersion;
}

public long getEarliest() {
return earliest;
}

public long getLatest() {
return latest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.RemoveFile;
Expand All @@ -28,26 +29,50 @@
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.delta.DeltaErrors;
import org.apache.spark.sql.delta.DeltaOptions;
import org.apache.spark.sql.delta.DeltaStartingVersion;
import org.apache.spark.sql.delta.StartingVersion;
import org.apache.spark.sql.delta.StartingVersionLatest$;
import org.apache.spark.sql.delta.sources.DeltaSQLConf;
import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

public class SparkMicroBatchStream implements MicroBatchStream {

private static final Logger logger = LoggerFactory.getLogger(SparkMicroBatchStream.class);

private static final Set<DeltaAction> ACTION_SET =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(DeltaAction.ADD, DeltaAction.REMOVE)));

private final Engine engine;
private final String tablePath;
private final Optional<DeltaOptions> options;
private final StreamingHelper streamingHelper;
private final SparkSession spark;

public SparkMicroBatchStream(String tablePath, Configuration hadoopConf) {
this(tablePath, hadoopConf, SparkSession.active(), /* options= */ Optional.empty());
}

public SparkMicroBatchStream(
String tablePath,
Configuration hadoopConf,
SparkSession spark,
Optional<DeltaOptions> options) {
this.spark = spark;
this.tablePath = tablePath;
this.engine = DefaultEngine.create(hadoopConf);
this.options = options;
this.streamingHelper = new StreamingHelper(tablePath, hadoopConf);
}

////////////
Expand Down Expand Up @@ -97,6 +122,108 @@ public void stop() {
throw new UnsupportedOperationException("stop is not supported");
}

///////////////////////
// getStartingVersion //
///////////////////////

/**
* Extracts whether users provided the option to time travel a relation. If a query restarts from
* a checkpoint and the checkpoint has recorded the offset, this method should never be called.
*
* <p>This is the DSv2 Kernel-based implementation of DeltaSource.getStartingVersion.
*/
Optional<Long> getStartingVersion() {
if (!options.isPresent()) {
return Optional.empty();
}

DeltaOptions opts = options.get();

// TODO(#5319): DeltaSource.scala uses `allowOutOfRange` parameter from
// DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP.
if (opts.startingVersion().isDefined()) {
DeltaStartingVersion startingVersion = opts.startingVersion().get();
if (startingVersion instanceof StartingVersionLatest$) {
Snapshot latestSnapshot = streamingHelper.loadLatestSnapshot();
// "latest": start reading from the next commit
return Optional.of(latestSnapshot.getVersion() + 1);
} else if (startingVersion instanceof StartingVersion) {
long version = ((StartingVersion) startingVersion).version();
if (!validateProtocolAt(spark, tablePath, engine, version)) {
// When starting from a given version, we don't require that the snapshot of this
// version can be reconstructed, even though the input table is technically in an
// inconsistent state. If the snapshot cannot be reconstructed, then the protocol
// check is skipped, so this is technically not safe, but we keep it this way for
// historical reasons.
try {
streamingHelper.checkVersionExists(
version, /* mustBeRecreatable= */ false, /* allowOutOfRange= */ false);
} catch (io.delta.kernel.spark.exception.VersionNotFoundException e) {
// Re-throw as DSv1 VersionNotFoundException wrapped in RuntimeException
// This maintains the same error message and semantics as DSv1
org.apache.spark.sql.delta.VersionNotFoundException dsv1Exception =
org.apache.spark.sql.delta.VersionNotFoundException$.MODULE$.apply(
e.getUserVersion(), e.getEarliest(), e.getLatest());
throw new RuntimeException(dsv1Exception);
}
}
return Optional.of(version);
}
}
// TODO(#5319): Implement startingTimestamp support
return Optional.empty();
}

/**
* Validate the protocol at a given version. If the snapshot reconstruction fails for any other
* reason than unsupported feature exception, we suppress it. This allows fallback to previous
* behavior where the starting version/timestamp was not mandatory to point to reconstructable
* snapshot.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows fallback to previous behavior where the starting version/timestamp was not mandatory to point to reconstructable snapshot.

Where is this code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see

         if (!validateProtocolAt(spark, tablePath, engine, version)) {
          // When starting from a given version, we don't require that the snapshot of this
          // version can be reconstructed, even though the input table is technically in an
          // inconsistent state. If the snapshot cannot be reconstructed, then the protocol
          // check is skipped, so this is technically not safe, but we keep it this way for
          // historical reasons.
          try {
            streamingHelper.checkVersionExists(
                version, /* mustBeRecreatable= */ false, /* allowOutOfRange= */ 
              

*
* <p>This is the DSv2 Kernel-based implementation of DeltaSource.validateProtocolAt.
*
* <p>Returns true when the validation was performed and succeeded.
*/
private static boolean validateProtocolAt(
SparkSession spark, String tablePath, Engine engine, long version) {
boolean alwaysValidateProtocol =
(Boolean)
spark
.sessionState()
.conf()
.getConf(DeltaSQLConf.FAST_DROP_FEATURE_STREAMING_ALWAYS_VALIDATE_PROTOCOL());
if (!alwaysValidateProtocol) {
return false;
}

try {
// Attempt to construct a snapshot at the startingVersion to validate the protocol
// If snapshot reconstruction fails, fall back to old behavior where the only
// requirement was for the commit to exist
TableManager.loadSnapshot(tablePath).atVersion(version).build(engine);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we should define a method in streaming helper (maybe called LoadSnapshotAt)

return true;
} catch (KernelException e) {
// Check if it's an unsupported table feature exception
// Kernel throws plain KernelException (not a subclass) for unsupported features,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a improvement tasks for kernel to have finer-grain error throwing cc @raveeram-db

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be great. created an issue: #5369

// so we must check the message. See DeltaErrors.unsupportedTableFeature,
// DeltaErrors.unsupportedReaderFeatures, and DeltaErrors.unsupportedWriterFeatures.
// TODO(#5369): Use specific exception types instead of message parsing
String exceptionMessage = e.getMessage();
if (exceptionMessage != null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe only keep exceptionMessage.contains("Unsupported Delta reader features")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. what about "Unsupported Delta table feature"?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Unsupported Delta table feature"
Let's added that, good catch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

&& (exceptionMessage.contains("Unsupported Delta reader features")
|| exceptionMessage.contains("Unsupported Delta table feature"))) {
throw e;
}
// Suppress other non-fatal KernelExceptions
logger.warn("Protocol validation failed at version {} with: {}", version, e.getMessage());
return false;
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to suppress all other exception here for example InterruptedException. We should just follow what DSv1 is doing and only suppress NonFatal Exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what we are doing here is equivalent to the scala NonFatal

  • We won't catch VirtualMachineError, ThreadDeath, LinkageError (extends Error)
  • We won't catch ControlThrowable (extends Throwable)
  • We don't need to catch InterruptedException because it won't be thrown by the kernel.

// Suppress other non-fatal exceptions
logger.warn("Protocol validation failed at version {} with: {}", version, e.getMessage());
return false;
}
}

////////////////////
// getFileChanges //
////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.read.*;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.delta.DeltaOptions;
import org.apache.spark.sql.execution.datasources.*;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.Filter;
Expand Down Expand Up @@ -126,7 +127,9 @@ public Batch toBatch() {

@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
return new SparkMicroBatchStream(tablePath, hadoopConf);
DeltaOptions deltaOptions = new DeltaOptions(scalaOptions, sqlConf);
return new SparkMicroBatchStream(
tablePath, hadoopConf, SparkSession.active(), Optional.of(deltaOptions));
}

@Override
Expand Down
Loading
Loading