-
Couldn't load subscription status.
- Fork 1.9k
[kernel-spark] Add getStartingVersion() for obtaining the initial offset for DSv2 streaming #5356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
| import io.delta.kernel.data.ColumnarBatch; | ||
| import io.delta.kernel.defaults.engine.DefaultEngine; | ||
| import io.delta.kernel.engine.Engine; | ||
| import io.delta.kernel.exceptions.KernelException; | ||
| import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction; | ||
| import io.delta.kernel.internal.actions.AddFile; | ||
| import io.delta.kernel.internal.actions.RemoveFile; | ||
|
|
@@ -28,26 +29,50 @@ | |
| import java.io.IOException; | ||
| import java.util.*; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.spark.sql.SparkSession; | ||
| import org.apache.spark.sql.connector.read.InputPartition; | ||
| import org.apache.spark.sql.connector.read.PartitionReaderFactory; | ||
| import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; | ||
| import org.apache.spark.sql.connector.read.streaming.Offset; | ||
| import org.apache.spark.sql.delta.DeltaErrors; | ||
| import org.apache.spark.sql.delta.DeltaOptions; | ||
| import org.apache.spark.sql.delta.DeltaStartingVersion; | ||
| import org.apache.spark.sql.delta.StartingVersion; | ||
| import org.apache.spark.sql.delta.StartingVersionLatest$; | ||
| import org.apache.spark.sql.delta.sources.DeltaSQLConf; | ||
| import org.apache.spark.sql.delta.sources.DeltaSourceOffset; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| import scala.Option; | ||
|
|
||
| public class SparkMicroBatchStream implements MicroBatchStream { | ||
|
|
||
| private static final Logger logger = LoggerFactory.getLogger(SparkMicroBatchStream.class); | ||
|
|
||
| private static final Set<DeltaAction> ACTION_SET = | ||
| Collections.unmodifiableSet( | ||
| new HashSet<>(Arrays.asList(DeltaAction.ADD, DeltaAction.REMOVE))); | ||
|
|
||
| private final Engine engine; | ||
| private final String tablePath; | ||
| private final Optional<DeltaOptions> options; | ||
| private final StreamingHelper streamingHelper; | ||
| private final SparkSession spark; | ||
|
|
||
| public SparkMicroBatchStream(String tablePath, Configuration hadoopConf) { | ||
| this(tablePath, hadoopConf, SparkSession.active(), /* options= */ Optional.empty()); | ||
| } | ||
|
|
||
| public SparkMicroBatchStream( | ||
| String tablePath, | ||
| Configuration hadoopConf, | ||
| SparkSession spark, | ||
| Optional<DeltaOptions> options) { | ||
| this.spark = spark; | ||
| this.tablePath = tablePath; | ||
| this.engine = DefaultEngine.create(hadoopConf); | ||
| this.options = options; | ||
| this.streamingHelper = new StreamingHelper(tablePath, hadoopConf); | ||
| } | ||
|
|
||
| //////////// | ||
|
|
@@ -97,6 +122,108 @@ public void stop() { | |
| throw new UnsupportedOperationException("stop is not supported"); | ||
| } | ||
|
|
||
| /////////////////////// | ||
| // getStartingVersion // | ||
| /////////////////////// | ||
|
|
||
| /** | ||
| * Extracts whether users provided the option to time travel a relation. If a query restarts from | ||
| * a checkpoint and the checkpoint has recorded the offset, this method should never be called. | ||
| * | ||
| * <p>This is the DSv2 Kernel-based implementation of DeltaSource.getStartingVersion. | ||
| */ | ||
| Optional<Long> getStartingVersion() { | ||
| if (!options.isPresent()) { | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| DeltaOptions opts = options.get(); | ||
|
|
||
| // TODO(#5319): DeltaSource.scala uses `allowOutOfRange` parameter from | ||
| // DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP. | ||
| if (opts.startingVersion().isDefined()) { | ||
| DeltaStartingVersion startingVersion = opts.startingVersion().get(); | ||
| if (startingVersion instanceof StartingVersionLatest$) { | ||
| Snapshot latestSnapshot = streamingHelper.loadLatestSnapshot(); | ||
| // "latest": start reading from the next commit | ||
| return Optional.of(latestSnapshot.getVersion() + 1); | ||
| } else if (startingVersion instanceof StartingVersion) { | ||
| long version = ((StartingVersion) startingVersion).version(); | ||
| if (!validateProtocolAt(spark, tablePath, engine, version)) { | ||
| // When starting from a given version, we don't require that the snapshot of this | ||
| // version can be reconstructed, even though the input table is technically in an | ||
| // inconsistent state. If the snapshot cannot be reconstructed, then the protocol | ||
| // check is skipped, so this is technically not safe, but we keep it this way for | ||
| // historical reasons. | ||
| try { | ||
| streamingHelper.checkVersionExists( | ||
| version, /* mustBeRecreatable= */ false, /* allowOutOfRange= */ false); | ||
| } catch (io.delta.kernel.spark.exception.VersionNotFoundException e) { | ||
| // Re-throw as DSv1 VersionNotFoundException wrapped in RuntimeException | ||
| // This maintains the same error message and semantics as DSv1 | ||
| org.apache.spark.sql.delta.VersionNotFoundException dsv1Exception = | ||
| org.apache.spark.sql.delta.VersionNotFoundException$.MODULE$.apply( | ||
| e.getUserVersion(), e.getEarliest(), e.getLatest()); | ||
| throw new RuntimeException(dsv1Exception); | ||
| } | ||
| } | ||
| return Optional.of(version); | ||
| } | ||
| } | ||
| // TODO(#5319): Implement startingTimestamp support | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| /** | ||
| * Validate the protocol at a given version. If the snapshot reconstruction fails for any other | ||
| * reason than unsupported feature exception, we suppress it. This allows fallback to previous | ||
| * behavior where the starting version/timestamp was not mandatory to point to reconstructable | ||
| * snapshot. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Where is this code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please see |
||
| * | ||
| * <p>This is the DSv2 Kernel-based implementation of DeltaSource.validateProtocolAt. | ||
| * | ||
| * <p>Returns true when the validation was performed and succeeded. | ||
| */ | ||
| private static boolean validateProtocolAt( | ||
| SparkSession spark, String tablePath, Engine engine, long version) { | ||
| boolean alwaysValidateProtocol = | ||
| (Boolean) | ||
| spark | ||
| .sessionState() | ||
| .conf() | ||
| .getConf(DeltaSQLConf.FAST_DROP_FEATURE_STREAMING_ALWAYS_VALIDATE_PROTOCOL()); | ||
zikangh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (!alwaysValidateProtocol) { | ||
| return false; | ||
| } | ||
|
|
||
| try { | ||
| // Attempt to construct a snapshot at the startingVersion to validate the protocol | ||
| // If snapshot reconstruction fails, fall back to old behavior where the only | ||
| // requirement was for the commit to exist | ||
| TableManager.loadSnapshot(tablePath).atVersion(version).build(engine); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually we should define a method in streaming helper (maybe called LoadSnapshotAt) |
||
| return true; | ||
| } catch (KernelException e) { | ||
| // Check if it's an unsupported table feature exception | ||
| // Kernel throws plain KernelException (not a subclass) for unsupported features, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there is a improvement tasks for kernel to have finer-grain error throwing cc @raveeram-db There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would be great. created an issue: #5369 |
||
| // so we must check the message. See DeltaErrors.unsupportedTableFeature, | ||
| // DeltaErrors.unsupportedReaderFeatures, and DeltaErrors.unsupportedWriterFeatures. | ||
| // TODO(#5369): Use specific exception types instead of message parsing | ||
| String exceptionMessage = e.getMessage(); | ||
| if (exceptionMessage != null | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe only keep There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. what about "Unsupported Delta table feature"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| && (exceptionMessage.contains("Unsupported Delta reader features") | ||
| || exceptionMessage.contains("Unsupported Delta table feature"))) { | ||
| throw e; | ||
| } | ||
| // Suppress other non-fatal KernelExceptions | ||
| logger.warn("Protocol validation failed at version {} with: {}", version, e.getMessage()); | ||
| return false; | ||
| } catch (Exception e) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we want to suppress all other exception here for example InterruptedException. We should just follow what DSv1 is doing and only suppress NonFatal Exceptions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what we are doing here is equivalent to the scala
|
||
| // Suppress other non-fatal exceptions | ||
| logger.warn("Protocol validation failed at version {} with: {}", version, e.getMessage()); | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| //////////////////// | ||
| // getFileChanges // | ||
| //////////////////// | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.