diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 1d365b11d..640e32309 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -1364,8 +1364,10 @@ object DeltaSharingRestClient extends Logging { s"Successfully verified endStreamAction in the response" + queryIdForLogging ) if(lastEndStreamAction.errorMessage != null) { - throw new DeltaSharingServerException("Request failed during streaming response " + - s"with error message ${lastEndStreamAction.errorMessage}") + val errorCodeOpt = Option(lastEndStreamAction.httpStatusErrorCode).map(_.intValue) + throw new DeltaSharingServerException( + s"Server Exception[${errorCodeOpt.getOrElse("")}]: " + + s"${lastEndStreamAction.errorMessage}", errorCodeOpt) } case Some(false) => logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " + diff --git a/client/src/main/scala/io/delta/sharing/client/model.scala b/client/src/main/scala/io/delta/sharing/client/model.scala index 8273fa8f9..2f2b3eccf 100644 --- a/client/src/main/scala/io/delta/sharing/client/model.scala +++ b/client/src/main/scala/io/delta/sharing/client/model.scala @@ -129,7 +129,8 @@ private[sharing] case class EndStreamAction( refreshToken: String, nextPageToken: String, minUrlExpirationTimestamp: java.lang.Long, - errorMessage: String = null) + errorMessage: String = null, + httpStatusErrorCode: java.lang.Integer = null) extends Action { override def wrap: SingleAction = SingleAction(endStreamAction = this) } diff --git a/client/src/main/scala/io/delta/sharing/client/util/RetryUtils.scala b/client/src/main/scala/io/delta/sharing/client/util/RetryUtils.scala index 41df27944..c416fcebb 100644 --- a/client/src/main/scala/io/delta/sharing/client/util/RetryUtils.scala +++ b/client/src/main/scala/io/delta/sharing/client/util/RetryUtils.scala @@ -18,11 +18,9 @@ package io.delta.sharing.client.util import java.io.{InterruptedIOException, IOException} -import scala.util.control.NonFatal - import org.apache.spark.internal.Logging -import io.delta.sharing.spark.MissingEndStreamActionException +import io.delta.sharing.spark.{DeltaSharingExceptionWithErrorCode, MissingEndStreamActionException} private[sharing] object RetryUtils extends Logging { @@ -65,10 +63,10 @@ private[sharing] object RetryUtils extends Logging { def shouldRetry(t: Throwable): Boolean = { t match { - case e: UnexpectedHttpStatus => - if (e.statusCode == 429) { // Too Many Requests + case DeltaSharingExceptionWithErrorCode(Some(statusCode)) => + if (statusCode == 429) { // Too Many Requests true - } else if (e.statusCode >= 500 && e.statusCode < 600) { // Internal Error + } else if (statusCode >= 500 && statusCode < 600) { // Internal Error true } else { false @@ -87,4 +85,4 @@ private[sharing] object RetryUtils extends Logging { } private[sharing] class UnexpectedHttpStatus(message: String, val statusCode: Int) - extends IllegalStateException(message) + extends DeltaSharingExceptionWithErrorCode(message, Some(statusCode)) diff --git a/client/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala b/client/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala index 9af74034b..c591cb751 100644 --- a/client/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala +++ b/client/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala @@ -18,9 +18,20 @@ package io.delta.sharing.spark import org.apache.spark.sql.types.StructType +// Common base exception class that also encapsulates the associated errorCode +abstract class DeltaSharingExceptionWithErrorCode( + message: String, + val statusCodeOpt: Option[Int] +) extends IllegalStateException(message) + +object DeltaSharingExceptionWithErrorCode { + def unapply(e: DeltaSharingExceptionWithErrorCode): Option[Option[Int]] = Some(e.statusCodeOpt) +} + class MissingEndStreamActionException(message: String) extends IllegalStateException(message) -class DeltaSharingServerException(message: String) extends RuntimeException(message) +class DeltaSharingServerException(message: String, statusCodeOpt: Option[Int]) + extends DeltaSharingExceptionWithErrorCode(message, statusCodeOpt) object DeltaSharingErrors { def nonExistentDeltaSharingTable(tableId: String): Throwable = { diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala index 9bc8d8044..1903079d8 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala @@ -1249,6 +1249,13 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { minUrlExpirationTimestamp = null, errorMessage = "BAD REQUEST: Error Occurred During Streaming" ).wrap) + val fakeEndStreamActionStrWithErrorMsgAndCode = JsonUtils.toJson(EndStreamAction( + refreshToken = null, + nextPageToken = null, + minUrlExpirationTimestamp = null, + errorMessage = "BAD REQUEST: Error Occurred During Streaming", + httpStatusErrorCode = 400 + ).wrap) test("checkEndStreamAction succeeded") { // DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true @@ -1339,10 +1346,13 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { test("checkEndStreamAction with error message throws streaming error") { def checkErrorMessage( - e: DeltaSharingServerException, - additionalErrorMsg: String): Unit = { - val commonErrorMsg = "Request failed during streaming response with error message" + e: DeltaSharingServerException, + additionalErrorMsg: String, + errorCodeOpt: Option[Int] = None): Unit = { + val commonErrorMsg = s"Server Exception[${errorCodeOpt.getOrElse("")}]" assert(e.getMessage.contains(commonErrorMsg)) + // null/non-existent httpStatusErrorCode in endStreamAction json string defaults to e.statusCodeOpt=None + assert(e.statusCodeOpt == errorCodeOpt) assert(e.getMessage.contains(additionalErrorMsg)) } @@ -1358,7 +1368,18 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { } checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming") - // checkEndStreamAction throws error if the only line is EndStreamAction with error message + // checkEndStreamAction throws error if the last line is EndStreamAction with error message + e = intercept[DeltaSharingServerException] { + checkEndStreamAction( + Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"), + Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"), + Seq(fakeAddFileStr, fakeEndStreamActionStrWithErrorMsgAndCode), + "random-query-id" + ) + } + checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming", Some(400)) + + // checkEndStreamAction throws error if the only line is EndStreamAction with error message & errorCode e = intercept[DeltaSharingServerException] { checkEndStreamAction( Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"), diff --git a/client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala b/client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala index 7c5d5a9ad..c23a2cbe8 100644 --- a/client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.SparkFunSuite import io.delta.sharing.client.util.{RetryUtils, UnexpectedHttpStatus} import io.delta.sharing.client.util.RetryUtils._ -import io.delta.sharing.spark.MissingEndStreamActionException +import io.delta.sharing.spark.{DeltaSharingServerException, MissingEndStreamActionException} class RetryUtilsSuite extends SparkFunSuite { test("shouldRetry") { @@ -37,6 +37,10 @@ class RetryUtilsSuite extends SparkFunSuite { assert(shouldRetry(new java.net.SocketTimeoutException)) assert(!shouldRetry(new RuntimeException)) assert(shouldRetry(new MissingEndStreamActionException("missing"))) + assert(!shouldRetry(new DeltaSharingServerException("error", Some(403)))) + assert(shouldRetry(new DeltaSharingServerException("error", Some(429)))) + assert(!shouldRetry(new DeltaSharingServerException("error", None))) + assert(shouldRetry(new DeltaSharingServerException("error", Some(503)))) } test("runWithExponentialBackoff") { diff --git a/server/src/main/scala/io/delta/sharing/server/model.scala b/server/src/main/scala/io/delta/sharing/server/model.scala index b943404be..b4594a399 100644 --- a/server/src/main/scala/io/delta/sharing/server/model.scala +++ b/server/src/main/scala/io/delta/sharing/server/model.scala @@ -158,7 +158,8 @@ case class EndStreamAction( refreshToken: String, nextPageToken: String, minUrlExpirationTimestamp: java.lang.Long, - errorMessage: String = null + errorMessage: String = null, + httpStatusErrorCode: java.lang.Integer = null ) extends Action { override def wrap: SingleAction = SingleAction(endStreamAction = this) }