Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.KernelException;
import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.RemoveFile;
Expand All @@ -28,26 +29,47 @@
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.delta.DeltaErrors;
import org.apache.spark.sql.delta.DeltaOptions;
import org.apache.spark.sql.delta.DeltaStartingVersion;
import org.apache.spark.sql.delta.StartingVersion;
import org.apache.spark.sql.delta.StartingVersionLatest$;
import org.apache.spark.sql.delta.sources.DeltaSQLConf;
import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

public class SparkMicroBatchStream implements MicroBatchStream {

private static final Logger logger = LoggerFactory.getLogger(SparkMicroBatchStream.class);

private static final Set<DeltaAction> ACTION_SET =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(DeltaAction.ADD, DeltaAction.REMOVE)));

private final Engine engine;
private final String tablePath;
private final DeltaOptions options;
private final StreamingHelper streamingHelper;
private final SparkSession spark;

public SparkMicroBatchStream(String tablePath, Configuration hadoopConf) {
this(tablePath, hadoopConf, SparkSession.active(), /* options= */ null);
}

public SparkMicroBatchStream(
String tablePath, Configuration hadoopConf, SparkSession spark, DeltaOptions options) {
this.spark = spark;
this.tablePath = tablePath;
this.engine = DefaultEngine.create(hadoopConf);
this.options = options;
this.streamingHelper = new StreamingHelper(tablePath, hadoopConf);
}

////////////
Expand Down Expand Up @@ -97,6 +119,100 @@ public void stop() {
throw new UnsupportedOperationException("stop is not supported");
}

///////////////////////
// getStartingVersion //
///////////////////////

/**
* Extracts whether users provided the option to time travel a relation. If a query restarts from
* a checkpoint and the checkpoint has recorded the offset, this method should never be called.
*
* <p>This is the DSv2 Kernel-based implementation of DeltaSource.getStartingVersion.
*/
Optional<Long> getStartingVersion() {
if (options == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check non null in the constructor? Objects.requireNonNull

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm leaning toward continuing to support null options -- it should represent optional configurations, not required (e.g. some tests pass in zero options). wdyt?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between null options and new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf());

Was trying to avoid assigning null if possible to reduce risks of NPE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would "options" be null?

it should represent optional configurations, not required (e.g. some tests pass in zero options)

Java supports an "Optional" API. Why not use that so it is clear to the reader this variable is optional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return Optional.empty();
}

// TODO(#5319): DeltaSource.scala uses `allowOutOfRange` parameter from
// DeltaSQLConf.DELTA_CDF_ALLOW_OUT_OF_RANGE_TIMESTAMP.
if (options.startingVersion().isDefined()) {
DeltaStartingVersion startingVersion = options.startingVersion().get();
if (startingVersion.equals(StartingVersionLatest$.MODULE$)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use instanceOf instead of equals?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Snapshot latestSnapshot = streamingHelper.loadLatestSnapshot();
// "latest": start reading from the next commit
return Optional.of(latestSnapshot.getVersion() + 1);
} else if (startingVersion instanceof StartingVersion) {
long version = ((StartingVersion) startingVersion).version();
if (!validateProtocolAt(spark, tablePath, engine, version)) {
// When starting from a given version, we don't require that the snapshot of this
// version can be reconstructed, even though the input table is technically in an
// inconsistent state. If the snapshot cannot be reconstructed, then the protocol
// check is skipped, so this is technically not safe, but we keep it this way for
// historical reasons.
try {
streamingHelper.checkVersionExists(
version, /* mustBeRecreatable= */ false, /* allowOutOfRange= */ false);
} catch (Exception e) {
throw new RuntimeException("Failed to validate starting version: " + version, e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if it helps to avoid this try-catch block if we make the not found exception extend AnalysisException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #5369

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we throw the same exception class as DSv1 which would be AnalysisException?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. @jerrypeng yes we can throw the DSv1 version of this Exception
  2. Although we should still wrap this with RuntimeException because (2a). the caller should not have to handle this exception again and (2b). AnalysisException is also checked unfortunately.

}
}
return Optional.of(version);
}
}
// TODO(#5319): Implement startingTimestamp support
return Optional.empty();
}

/**
* Validate the protocol at a given version. If the snapshot reconstruction fails for any other
* reason than unsupported feature exception, we suppress it. This allows fallback to previous
* behavior where the starting version/timestamp was not mandatory to point to reconstructable
* snapshot.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows fallback to previous behavior where the starting version/timestamp was not mandatory to point to reconstructable snapshot.

Where is this code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see

         if (!validateProtocolAt(spark, tablePath, engine, version)) {
          // When starting from a given version, we don't require that the snapshot of this
          // version can be reconstructed, even though the input table is technically in an
          // inconsistent state. If the snapshot cannot be reconstructed, then the protocol
          // check is skipped, so this is technically not safe, but we keep it this way for
          // historical reasons.
          try {
            streamingHelper.checkVersionExists(
                version, /* mustBeRecreatable= */ false, /* allowOutOfRange= */ 
              

*
* <p>This is the DSv2 Kernel-based implementation of DeltaSource.validateProtocolAt.
*
* <p>Returns true when the validation was performed and succeeded.
*/
private static boolean validateProtocolAt(
SparkSession spark, String tablePath, Engine engine, long version) {
boolean alwaysValidateProtocol =
(Boolean)
spark
.sessionState()
.conf()
.getConf(DeltaSQLConf.FAST_DROP_FEATURE_STREAMING_ALWAYS_VALIDATE_PROTOCOL());
if (!alwaysValidateProtocol) {
return false;
}

try {
// Attempt to construct a snapshot at the startingVersion to validate the protocol
// If snapshot reconstruction fails, fall back to old behavior where the only
// requirement was for the commit to exist
TableManager.loadSnapshot(tablePath).atVersion(version).build(engine);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we should define a method in streaming helper (maybe called LoadSnapshotAt)

return true;
} catch (KernelException e) {
// Check if it's an unsupported table feature exception
// Kernel throws plain KernelException (not a subclass) for unsupported features,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a improvement tasks for kernel to have finer-grain error throwing cc @raveeram-db

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be great. created an issue: #5369

// so we must check the message. See DeltaErrors.unsupportedTableFeature,
// DeltaErrors.unsupportedReaderFeatures, and DeltaErrors.unsupportedWriterFeatures.
// TODO(#5369): Use specific exception types instead of message parsing
String exceptionMessage = e.getMessage();
if (exceptionMessage != null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe only keep exceptionMessage.contains("Unsupported Delta reader features")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. what about "Unsupported Delta table feature"?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Unsupported Delta table feature"
Let's added that, good catch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

&& exceptionMessage.contains("Unsupported Delta reader features")) {
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to wrap the KernelException in a RuntimeException? Also DSv1 throws "DeltaUnsupportedTableFeatureException". Can we throw the same exception to maintain the same error handling behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. You are right, KernelException is unchecked; I removed the wrapping
  2. it's challenging to convert the KernelException to dsv1's DeltaUnsupportedTableFeatureException in this case as it doesn't expose any error params. I agree we should try to throw the same exceptions as DSv1, but sometimes it's challenging to do so. Is it really that important to keep the Exception types consistent if we are failing the stream anyway?

}
// Suppress other KernelExceptions
logger.warn("Protocol validation failed at version {} with: {}", version, e.getMessage());
return false;
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to suppress all other exception here for example InterruptedException. We should just follow what DSv1 is doing and only suppress NonFatal Exceptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what we are doing here is equivalent to the scala NonFatal

  • We won't catch VirtualMachineError, ThreadDeath, LinkageError (extends Error)
  • We won't catch ControlThrowable (extends Throwable)
  • We don't need to catch InterruptedException because it won't be thrown by the kernel.

// Suppress other exceptions
logger.warn("Protocol validation failed at version {} with: {}", version, e.getMessage());
return false;
}
}

////////////////////
// getFileChanges //
////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.read.*;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.delta.DeltaOptions;
import org.apache.spark.sql.execution.datasources.*;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.sources.Filter;
Expand Down Expand Up @@ -126,7 +127,8 @@ public Batch toBatch() {

@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
return new SparkMicroBatchStream(tablePath, hadoopConf);
DeltaOptions deltaOptions = new DeltaOptions(scalaOptions, sqlConf);
return new SparkMicroBatchStream(tablePath, hadoopConf, SparkSession.active(), deltaOptions);
}

@Override
Expand Down
Loading
Loading