-
Couldn't load subscription status.
- Fork 1.9k
[kernel-spark] Add getStartingVersion() for obtaining the initial offset for DSv2 streaming #5356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Hi, could I get a review from @huan233usc @gengliangwang @jerrypeng @tdas please? Thanks! |
| * <p>This is the DSv2 Kernel-based implementation of DeltaSource.getStartingVersion. | ||
| */ | ||
| Optional<Long> getStartingVersion() { | ||
| if (options == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check non null in the constructor? Objects.requireNonNull
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm leaning toward continuing to support null options -- it should represent optional configurations, not required (e.g. some tests pass in zero options). wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between null options and new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf());
Was trying to avoid assigning null if possible to reduce risks of NPE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would "options" be null?
it should represent optional configurations, not required (e.g. some tests pass in zero options)
Java supports an "Optional" API. Why not use that so it is clear to the reader this variable is optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Show resolved
Hide resolved
| // so we must check the message. See DeltaErrors.unsupportedTableFeature, | ||
| // DeltaErrors.unsupportedReaderFeatures, and DeltaErrors.unsupportedWriterFeatures. | ||
| String exceptionMessage = e.getMessage(); | ||
| if (exceptionMessage != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe only keep exceptionMessage.contains("Unsupported Delta reader features")?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. what about "Unsupported Delta table feature"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Unsupported Delta table feature"
Let's added that, good catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| return true; | ||
| } catch (KernelException e) { | ||
| // Check if it's an unsupported table feature exception | ||
| // Kernel throws plain KernelException (not a subclass) for unsupported features, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a improvement tasks for kernel to have finer-grain error throwing cc @raveeram-db
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be great. created an issue: #5369
...rk/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java
Show resolved
Hide resolved
...rk/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java
Outdated
Show resolved
Hide resolved
...rk/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java
Outdated
Show resolved
Hide resolved
kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java
Show resolved
Hide resolved
| streamingHelper.checkVersionExists( | ||
| version, /* mustBeRecreatable= */ false, /* allowOutOfRange= */ false); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failed to validate starting version: " + version, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if it helps to avoid this try-catch block if we make the not found exception extend AnalysisException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #5369
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we throw the same exception class as DSv1 which would be AnalysisException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- @jerrypeng yes we can throw the DSv1 version of this Exception
- Although we should still wrap this with RuntimeException because (2a). the caller should not have to handle this exception again and (2b). AnalysisException is also checked unfortunately.
...rk/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java
Show resolved
Hide resolved
...rk/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java
Outdated
Show resolved
Hide resolved
...rk/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamGetStartingVersionTest.java
Outdated
Show resolved
Hide resolved
| deltaLog.checkpoint(); | ||
|
|
||
| // Delete log files for versions 1-5 to make them non-recreatable | ||
| // Note: Version 0 is kept because it contains the table schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this produces a state of table that will never be reached?
So basically the log file clean up, we will by timestamp so always delete 0-5 (as 0 is earlier), for the schema, we will make sure there is always a checkpoint at the earlist version
-- so to setup the test, let's create 5.checkpoint, 10.checkpoint then remove log 0-5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
| // DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP. | ||
| if (options.startingVersion().isDefined()) { | ||
| DeltaStartingVersion startingVersion = options.startingVersion().get(); | ||
| if (startingVersion.equals(StartingVersionLatest$.MODULE$)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use instanceOf instead of equals?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| String exceptionMessage = e.getMessage(); | ||
| if (exceptionMessage != null | ||
| && exceptionMessage.contains("Unsupported Delta reader features")) { | ||
| throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to wrap the KernelException in a RuntimeException? Also DSv1 throws "DeltaUnsupportedTableFeatureException". Can we throw the same exception to maintain the same error handling behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- You are right, KernelException is unchecked; I removed the wrapping
- it's challenging to convert the KernelException to dsv1's DeltaUnsupportedTableFeatureException in this case as it doesn't expose any error params. I agree we should try to throw the same exceptions as DSv1, but sometimes it's challenging to do so. Is it really that important to keep the Exception types consistent if we are failing the stream anyway?
| // Suppress other KernelExceptions | ||
| logger.warn("Protocol validation failed at version {} with: {}", version, e.getMessage()); | ||
| return false; | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to suppress all other exception here for example InterruptedException. We should just follow what DSv1 is doing and only suppress NonFatal Exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what we are doing here is equivalent to the scala NonFatal
- We won't catch VirtualMachineError, ThreadDeath, LinkageError (extends
Error) - We won't catch ControlThrowable (extends
Throwable) - We don't need to catch InterruptedException because it won't be thrown by the kernel.
| Arguments.of(/* startingVersion= */ "3", /* expectedVersion= */ Optional.of(3L)), | ||
| Arguments.of(/* startingVersion= */ "5", /* expectedVersion= */ Optional.of(5L)), | ||
| Arguments.of(/* startingVersion= */ "latest", /* expectedVersion= */ Optional.of(6L)), | ||
| Arguments.of(/* startingVersion= */ null, /* expectedVersion= */ Optional.empty())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about these cases:
- startingVersion is not set
- startingVersion set to Optional.empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(1) is already covered by testGetStartingVersion. I added (2): testGetStartingVersion_NoOptions
| * Validate the protocol at a given version. If the snapshot reconstruction fails for any other | ||
| * reason than unsupported feature exception, we suppress it. This allows fallback to previous | ||
| * behavior where the starting version/timestamp was not mandatory to point to reconstructable | ||
| * snapshot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows fallback to previous behavior where the starting version/timestamp was not mandatory to point to reconstructable snapshot.
Where is this code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see
if (!validateProtocolAt(spark, tablePath, engine, version)) {
// When starting from a given version, we don't require that the snapshot of this
// version can be reconstructed, even though the input table is technically in an
// inconsistent state. If the snapshot cannot be reconstructed, then the protocol
// check is skipped, so this is technically not safe, but we keep it this way for
// historical reasons.
try {
streamingHelper.checkVersionExists(
version, /* mustBeRecreatable= */ false, /* allowOutOfRange= */
| /* catalogTableOpt= */ Option.empty(), | ||
| options, | ||
| /* snapshotAtSourceInit= */ snapshot, | ||
| /* metadataPath= */ tablePath + "/_checkpoint", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it the location of streaming checkpoint?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the location of PersistedMetadata for advanced schema evolution.
| // Attempt to construct a snapshot at the startingVersion to validate the protocol | ||
| // If snapshot reconstruction fails, fall back to old behavior where the only | ||
| // requirement was for the commit to exist | ||
| TableManager.loadSnapshot(tablePath).atVersion(version).build(engine); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we should define a method in streaming helper (maybe called LoadSnapshotAt)
Which Delta project/connector is this regarding?
Description
This PR is Part I of implementing SparkMicroBatchStream.initialSnapshot() to support Kernel-based dsv2 Delta streaming (M1 milestone). This PR handles the read option
startingVersion.How was this patch tested?
Parameterized tests verifying parity between DSv1 (DeltaSource) and DSv2 (SparkMicroBatchStream).
Does this PR introduce any user-facing changes?
No