From a1bf467470d48ea7570a8744368cfbc80f317271 Mon Sep 17 00:00:00 2001 From: Vrinda Jindal Date: Wed, 13 Aug 2025 17:23:56 -0700 Subject: [PATCH] Add retry based on the httpStatusErrorCode in endStreamAction --- .../scala/io/delta/sharing/server/model.scala | 4 +++- .../sharing/spark/DeltaSharingClient.scala | 6 +++-- .../sharing/spark/DeltaSharingErrors.scala | 11 ++++++++- .../scala/io/delta/sharing/spark/model.scala | 3 ++- .../delta/sharing/spark/util/RetryUtils.scala | 10 ++++---- .../spark/DeltaSharingRestClientSuite.scala | 23 +++++++++++++++++-- .../sharing/spark/util/RetryUtilsSuite.scala | 6 ++++- 7 files changed, 50 insertions(+), 13 deletions(-) diff --git a/server/src/main/scala/io/delta/sharing/server/model.scala b/server/src/main/scala/io/delta/sharing/server/model.scala index 76ae9aab6..a28e793c3 100644 --- a/server/src/main/scala/io/delta/sharing/server/model.scala +++ b/server/src/main/scala/io/delta/sharing/server/model.scala @@ -133,7 +133,9 @@ case class RemoveFile( case class EndStreamAction( refreshToken: String, - errorMessage: String = null) extends Action { + errorMessage: String = null, + httpStatusErrorCode: java.lang.Integer = null) + extends Action { override def wrap: SingleAction = SingleAction(endStreamAction = this) } diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala index 5a02064d6..cb554fca2 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingClient.scala @@ -730,8 +730,10 @@ private[spark] object DeltaSharingRestClient extends Logging { s"Successfully verified endStreamAction in the response" + queryIdForLogging ) if(lastEndStreamAction.errorMessage != null) { - throw new DeltaSharingServerException("Request failed during streaming response " + - s"with error message ${lastEndStreamAction.errorMessage}") + val errorCodeOpt = Option(lastEndStreamAction.httpStatusErrorCode).map(_.intValue) + throw new DeltaSharingServerException( + s"Server Exception[${errorCodeOpt.getOrElse("")}]: " + + s"${lastEndStreamAction.errorMessage}", errorCodeOpt) } case Some(false) => logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " + diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala index 87184b579..798696e0c 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala @@ -20,7 +20,16 @@ import org.apache.spark.sql.types.StructType class MissingEndStreamActionException(message: String) extends IllegalStateException(message) -class DeltaSharingServerException(message: String) extends RuntimeException(message) +// Common base exception class that also encapsulates the associated errorCode +abstract class DeltaSharingExceptionWithErrorCode(message: String, val statusCodeOpt: Option[Int]) + extends IllegalStateException(message) + +object DeltaSharingExceptionWithErrorCode { + def unapply(e: DeltaSharingExceptionWithErrorCode): Option[Option[Int]] = Some(e.statusCodeOpt) +} + +class DeltaSharingServerException(message: String, statusCodeOpt: Option[Int]) + extends DeltaSharingExceptionWithErrorCode(message, statusCodeOpt) object DeltaSharingErrors { diff --git a/spark/src/main/scala/io/delta/sharing/spark/model.scala b/spark/src/main/scala/io/delta/sharing/spark/model.scala index 7d8e3a715..66ee4b413 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/model.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/model.scala @@ -116,7 +116,8 @@ private[sharing] case class Protocol(minReaderVersion: Int) extends Action { private[sharing] case class EndStreamAction( refreshToken: String, - errorMessage: String = null) + errorMessage: String = null, + httpStatusErrorCode: java.lang.Integer = null) extends Action { override def wrap: SingleAction = SingleAction(endStreamAction = this) } diff --git a/spark/src/main/scala/io/delta/sharing/spark/util/RetryUtils.scala b/spark/src/main/scala/io/delta/sharing/spark/util/RetryUtils.scala index bba453dc9..7adf6a5cb 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/util/RetryUtils.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/util/RetryUtils.scala @@ -22,7 +22,7 @@ import scala.util.control.NonFatal import org.apache.spark.internal.Logging -import io.delta.sharing.spark.MissingEndStreamActionException +import io.delta.sharing.spark.{DeltaSharingExceptionWithErrorCode, MissingEndStreamActionException} private[sharing] object RetryUtils extends Logging { @@ -64,10 +64,10 @@ private[sharing] object RetryUtils extends Logging { def shouldRetry(t: Throwable): Boolean = { t match { - case e: UnexpectedHttpStatus => - if (e.statusCode == 429) { // Too Many Requests + case DeltaSharingExceptionWithErrorCode(Some(statusCode)) => + if (statusCode == 429) { // Too Many Requests true - } else if (e.statusCode >= 500 && e.statusCode < 600) { // Internal Error + } else if (statusCode >= 500 && statusCode < 600) { // Internal Error true } else { false @@ -86,4 +86,4 @@ private[sharing] object RetryUtils extends Logging { } private[sharing] class UnexpectedHttpStatus(message: String, val statusCode: Int) - extends IllegalStateException(message) + extends DeltaSharingExceptionWithErrorCode(message, Some(statusCode)) diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala index 8e31d9a65..dd088d46c 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingRestClientSuite.scala @@ -890,6 +890,11 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { refreshToken = null, errorMessage = "BAD REQUEST: Error Occurred During Streaming" ).wrap) + val fakeEndStreamActionStrWithErrorMsgAndCode = JsonUtils.toJson(EndStreamAction( + refreshToken = null, + errorMessage = "BAD REQUEST: Error Occurred During Streaming", + httpStatusErrorCode = 400 + ).wrap) test("checkEndStreamAction succeeded") { // DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true @@ -981,9 +986,12 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { test("checkEndStreamAction with error message throws streaming error") { def checkErrorMessage( e: DeltaSharingServerException, - additionalErrorMsg: String): Unit = { - val commonErrorMsg = "Request failed during streaming response with error message" + additionalErrorMsg: String, + errorCodeOpt: Option[Int] = None): Unit = { + val commonErrorMsg = s"Server Exception[${errorCodeOpt.getOrElse("")}]" assert(e.getMessage.contains(commonErrorMsg)) + // null/non-existent httpStatusErrorCode in endStreamAction json string defaults to e.statusCodeOpt=None + assert(e.statusCodeOpt == errorCodeOpt) assert(e.getMessage.contains(additionalErrorMsg)) } @@ -998,6 +1006,17 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { } checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming") + // checkEndStreamAction throws error if the last line is EndStreamAction with error message + e = intercept[DeltaSharingServerException] { + checkEndStreamAction( + Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"), + Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"), + Seq(fakeAddFileStr, fakeEndStreamActionStrWithErrorMsgAndCode), + "random-query-id" + ) + } + checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming", Some(400)) + // checkEndStreamAction throws error if the only line is EndStreamAction with error message e = intercept[DeltaSharingServerException] { checkEndStreamAction( diff --git a/spark/src/test/scala/io/delta/sharing/spark/util/RetryUtilsSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/util/RetryUtilsSuite.scala index 8a37db1df..b8db4215e 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/util/RetryUtilsSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/util/RetryUtilsSuite.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkFunSuite -import io.delta.sharing.spark.MissingEndStreamActionException +import io.delta.sharing.spark.{DeltaSharingServerException, MissingEndStreamActionException} class RetryUtilsSuite extends SparkFunSuite { import RetryUtils._ @@ -37,6 +37,10 @@ class RetryUtilsSuite extends SparkFunSuite { assert(shouldRetry(new java.net.SocketTimeoutException)) assert(!shouldRetry(new RuntimeException)) assert(shouldRetry(new MissingEndStreamActionException("missing"))) + assert(!shouldRetry(new DeltaSharingServerException("error", Some(403)))) + assert(shouldRetry(new DeltaSharingServerException("error", Some(429)))) + assert(!shouldRetry(new DeltaSharingServerException("error", None))) + assert(shouldRetry(new DeltaSharingServerException("error", Some(503)))) } test("runWithExponentialBackoff") {