Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1364,8 +1364,10 @@ object DeltaSharingRestClient extends Logging {
s"Successfully verified endStreamAction in the response" + queryIdForLogging
)
if(lastEndStreamAction.errorMessage != null) {
throw new DeltaSharingServerException("Request failed during streaming response " +
s"with error message ${lastEndStreamAction.errorMessage}")
val errorCodeOpt = Option(lastEndStreamAction.httpStatusErrorCode).map(_.intValue)
throw new DeltaSharingServerException(
s"Server Exception[${errorCodeOpt.getOrElse("")}]: " +
s"${lastEndStreamAction.errorMessage}", errorCodeOpt)
}
case Some(false) =>
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
Expand Down
3 changes: 2 additions & 1 deletion client/src/main/scala/io/delta/sharing/client/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ private[sharing] case class EndStreamAction(
refreshToken: String,
nextPageToken: String,
minUrlExpirationTimestamp: java.lang.Long,
errorMessage: String = null)
errorMessage: String = null,
httpStatusErrorCode: java.lang.Integer = null)
extends Action {
override def wrap: SingleAction = SingleAction(endStreamAction = this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.io.{InterruptedIOException, IOException}

import org.apache.spark.internal.Logging

import io.delta.sharing.spark.MissingEndStreamActionException
import io.delta.sharing.spark.{DeltaSharingExceptionWithErrorCode, MissingEndStreamActionException}


private[sharing] object RetryUtils extends Logging {
Expand Down Expand Up @@ -64,10 +64,10 @@ private[sharing] object RetryUtils extends Logging {

def shouldRetry(t: Throwable): Boolean = {
t match {
case e: UnexpectedHttpStatus =>
if (e.statusCode == 429) { // Too Many Requests
case DeltaSharingExceptionWithErrorCode(Some(statusCode)) =>
if (statusCode == 429) { // Too Many Requests
true
} else if (e.statusCode >= 500 && e.statusCode < 600) { // Internal Error
} else if (statusCode >= 500 && statusCode < 600) { // Internal Error
true
} else {
false
Expand All @@ -86,4 +86,4 @@ private[sharing] object RetryUtils extends Logging {
}

private[sharing] class UnexpectedHttpStatus(message: String, val statusCode: Int)
extends IllegalStateException(message)
extends DeltaSharingExceptionWithErrorCode(message, Some(statusCode))
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,20 @@ package io.delta.sharing.spark

import org.apache.spark.sql.types.StructType

// Common base exception class that also encapsulates the associated errorCode
abstract class DeltaSharingExceptionWithErrorCode(
message: String,
val statusCodeOpt: Option[Int]
) extends IllegalStateException(message)

object DeltaSharingExceptionWithErrorCode {
def unapply(e: DeltaSharingExceptionWithErrorCode): Option[Option[Int]] = Some(e.statusCodeOpt)
}

class MissingEndStreamActionException(message: String) extends IllegalStateException(message)

class DeltaSharingServerException(message: String) extends RuntimeException(message)
class DeltaSharingServerException(message: String, statusCodeOpt: Option[Int])
extends DeltaSharingExceptionWithErrorCode(message, statusCodeOpt)

object DeltaSharingErrors {
def nonExistentDeltaSharingTable(tableId: String): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,13 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
minUrlExpirationTimestamp = null,
errorMessage = "BAD REQUEST: Error Occurred During Streaming"
).wrap)
val fakeEndStreamActionStrWithErrorMsgAndCode = JsonUtils.toJson(EndStreamAction(
refreshToken = null,
nextPageToken = null,
minUrlExpirationTimestamp = null,
errorMessage = "BAD REQUEST: Error Occurred During Streaming",
httpStatusErrorCode = 400
).wrap)

test("checkEndStreamAction succeeded") {
// DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true
Expand Down Expand Up @@ -1339,10 +1346,13 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {

test("checkEndStreamAction with error message throws streaming error") {
def checkErrorMessage(
e: DeltaSharingServerException,
additionalErrorMsg: String): Unit = {
val commonErrorMsg = "Request failed during streaming response with error message"
e: DeltaSharingServerException,
additionalErrorMsg: String,
errorCodeOpt: Option[Int] = None): Unit = {
val commonErrorMsg = s"Server Exception[${errorCodeOpt.getOrElse("")}]"
assert(e.getMessage.contains(commonErrorMsg))
// null/non-existent httpStatusErrorCode in endStreamAction json string defaults to e.statusCodeOpt=None
assert(e.statusCodeOpt == errorCodeOpt)

assert(e.getMessage.contains(additionalErrorMsg))
}
Expand All @@ -1358,7 +1368,18 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
}
checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming")

// checkEndStreamAction throws error if the only line is EndStreamAction with error message
// checkEndStreamAction throws error if the last line is EndStreamAction with error message
e = intercept[DeltaSharingServerException] {
checkEndStreamAction(
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"),
Seq(fakeAddFileStr, fakeEndStreamActionStrWithErrorMsgAndCode),
"random-query-id"
)
}
checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming", Some(400))

// checkEndStreamAction throws error if the only line is EndStreamAction with error message & errorCode
e = intercept[DeltaSharingServerException] {
checkEndStreamAction(
Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.SparkFunSuite

import io.delta.sharing.client.util.{RetryUtils, UnexpectedHttpStatus}
import io.delta.sharing.client.util.RetryUtils._
import io.delta.sharing.spark.MissingEndStreamActionException
import io.delta.sharing.spark.{DeltaSharingServerException, MissingEndStreamActionException}

class RetryUtilsSuite extends SparkFunSuite {
test("shouldRetry") {
Expand All @@ -37,6 +37,10 @@ class RetryUtilsSuite extends SparkFunSuite {
assert(shouldRetry(new java.net.SocketTimeoutException))
assert(!shouldRetry(new RuntimeException))
assert(shouldRetry(new MissingEndStreamActionException("missing")))
assert(!shouldRetry(new DeltaSharingServerException("error", Some(403))))
assert(shouldRetry(new DeltaSharingServerException("error", Some(429))))
assert(!shouldRetry(new DeltaSharingServerException("error", None)))
assert(shouldRetry(new DeltaSharingServerException("error", Some(503))))
}

test("runWithExponentialBackoff") {
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/scala/io/delta/sharing/server/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ case class EndStreamAction(
refreshToken: String,
nextPageToken: String,
minUrlExpirationTimestamp: java.lang.Long,
errorMessage: String = null
errorMessage: String = null,
httpStatusErrorCode: java.lang.Integer = null
) extends Action {
override def wrap: SingleAction = SingleAction(endStreamAction = this)
}
Expand Down
Loading