Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion server/src/main/scala/io/delta/sharing/server/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ case class RemoveFile(

case class EndStreamAction(
refreshToken: String,
errorMessage: String = null) extends Action {
errorMessage: String = null,
httpStatusErrorCode: java.lang.Integer = null)
extends Action {
override def wrap: SingleAction = SingleAction(endStreamAction = this)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,8 +730,10 @@ private[spark] object DeltaSharingRestClient extends Logging {
s"Successfully verified endStreamAction in the response" + queryIdForLogging
)
if(lastEndStreamAction.errorMessage != null) {
throw new DeltaSharingServerException("Request failed during streaming response " +
s"with error message ${lastEndStreamAction.errorMessage}")
val errorCodeOpt = Option(lastEndStreamAction.httpStatusErrorCode).map(_.intValue)
throw new DeltaSharingServerException(
s"Server Exception[${errorCodeOpt.getOrElse("")}]: " +
s"${lastEndStreamAction.errorMessage}", errorCodeOpt)
}
case Some(false) =>
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@ import org.apache.spark.sql.types.StructType

class MissingEndStreamActionException(message: String) extends IllegalStateException(message)

class DeltaSharingServerException(message: String) extends RuntimeException(message)
// Common base exception class that also encapsulates the associated errorCode
abstract class DeltaSharingExceptionWithErrorCode(message: String, val statusCodeOpt: Option[Int])
extends IllegalStateException(message)

object DeltaSharingExceptionWithErrorCode {
def unapply(e: DeltaSharingExceptionWithErrorCode): Option[Option[Int]] = Some(e.statusCodeOpt)
}

class DeltaSharingServerException(message: String, statusCodeOpt: Option[Int])
extends DeltaSharingExceptionWithErrorCode(message, statusCodeOpt)


object DeltaSharingErrors {
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/scala/io/delta/sharing/spark/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ private[sharing] case class Protocol(minReaderVersion: Int) extends Action {

private[sharing] case class EndStreamAction(
refreshToken: String,
errorMessage: String = null)
errorMessage: String = null,
httpStatusErrorCode: java.lang.Integer = null)
extends Action {
override def wrap: SingleAction = SingleAction(endStreamAction = this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.control.NonFatal

import org.apache.spark.internal.Logging

import io.delta.sharing.spark.MissingEndStreamActionException
import io.delta.sharing.spark.{DeltaSharingExceptionWithErrorCode, MissingEndStreamActionException}

private[sharing] object RetryUtils extends Logging {

Expand Down Expand Up @@ -64,10 +64,10 @@ private[sharing] object RetryUtils extends Logging {

def shouldRetry(t: Throwable): Boolean = {
t match {
case e: UnexpectedHttpStatus =>
if (e.statusCode == 429) { // Too Many Requests
case DeltaSharingExceptionWithErrorCode(Some(statusCode)) =>
if (statusCode == 429) { // Too Many Requests
true
} else if (e.statusCode >= 500 && e.statusCode < 600) { // Internal Error
} else if (statusCode >= 500 && statusCode < 600) { // Internal Error
true
} else {
false
Expand All @@ -86,4 +86,4 @@ private[sharing] object RetryUtils extends Logging {
}

private[sharing] class UnexpectedHttpStatus(message: String, val statusCode: Int)
extends IllegalStateException(message)
extends DeltaSharingExceptionWithErrorCode(message, Some(statusCode))
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,11 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
refreshToken = null,
errorMessage = "BAD REQUEST: Error Occurred During Streaming"
).wrap)
val fakeEndStreamActionStrWithErrorMsgAndCode = JsonUtils.toJson(EndStreamAction(
refreshToken = null,
errorMessage = "BAD REQUEST: Error Occurred During Streaming",
httpStatusErrorCode = 400
).wrap)

test("checkEndStreamAction succeeded") {
// DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true
Expand Down Expand Up @@ -981,9 +986,12 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
test("checkEndStreamAction with error message throws streaming error") {
def checkErrorMessage(
e: DeltaSharingServerException,
additionalErrorMsg: String): Unit = {
val commonErrorMsg = "Request failed during streaming response with error message"
additionalErrorMsg: String,
errorCodeOpt: Option[Int] = None): Unit = {
val commonErrorMsg = s"Server Exception[${errorCodeOpt.getOrElse("")}]"
assert(e.getMessage.contains(commonErrorMsg))
// null/non-existent httpStatusErrorCode in endStreamAction json string defaults to e.statusCodeOpt=None
assert(e.statusCodeOpt == errorCodeOpt)
assert(e.getMessage.contains(additionalErrorMsg))
}

Expand All @@ -998,6 +1006,17 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
}
checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming")

// checkEndStreamAction throws error if the last line is EndStreamAction with error message
e = intercept[DeltaSharingServerException] {
checkEndStreamAction(
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
Seq(fakeAddFileStr, fakeEndStreamActionStrWithErrorMsgAndCode),
"random-query-id"
)
}
checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming", Some(400))

// checkEndStreamAction throws error if the only line is EndStreamAction with error message
e = intercept[DeltaSharingServerException] {
checkEndStreamAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkFunSuite

import io.delta.sharing.spark.MissingEndStreamActionException
import io.delta.sharing.spark.{DeltaSharingServerException, MissingEndStreamActionException}

class RetryUtilsSuite extends SparkFunSuite {
import RetryUtils._
Expand All @@ -37,6 +37,10 @@ class RetryUtilsSuite extends SparkFunSuite {
assert(shouldRetry(new java.net.SocketTimeoutException))
assert(!shouldRetry(new RuntimeException))
assert(shouldRetry(new MissingEndStreamActionException("missing")))
assert(!shouldRetry(new DeltaSharingServerException("error", Some(403))))
assert(shouldRetry(new DeltaSharingServerException("error", Some(429))))
assert(!shouldRetry(new DeltaSharingServerException("error", None)))
assert(shouldRetry(new DeltaSharingServerException("error", Some(503))))
}

test("runWithExponentialBackoff") {
Expand Down
Loading