From 17870e73f75718622cbd5b9d5e5c6e08f316df0f Mon Sep 17 00:00:00 2001 From: Vrinda Jindal <162244905+vrinda-db@users.noreply.github.com> Date: Wed, 13 Aug 2025 16:03:47 -0700 Subject: [PATCH] . (#771)[client] Add retry based on the httpStatusErrorCode in endStreamAction --- .../sharing/client/DeltaSharingClient.scala | 6 ++-- .../scala/io/delta/sharing/client/model.scala | 3 +- .../sharing/client/util/RetryUtils.scala | 12 ++++---- .../sharing/spark/DeltaSharingErrors.scala | 13 ++++++++- .../client/DeltaSharingRestClientSuite.scala | 29 ++++++++++++++++--- .../sharing/client/util/RetryUtilsSuite.scala | 6 +++- .../scala/io/delta/sharing/server/model.scala | 3 +- 7 files changed, 55 insertions(+), 17 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 1a2da6e94..192840c21 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -1307,8 +1307,10 @@ object DeltaSharingRestClient extends Logging { s"Successfully verified endStreamAction in the response" + queryIdForLogging ) if(lastEndStreamAction.errorMessage != null) { - throw new DeltaSharingServerException("Request failed during streaming response " + - s"with error message ${lastEndStreamAction.errorMessage}") + val errorCodeOpt = Option(lastEndStreamAction.httpStatusErrorCode).map(_.intValue) + throw new DeltaSharingServerException( + s"Server Exception[${errorCodeOpt.getOrElse("")}]: " + + s"${lastEndStreamAction.errorMessage}", errorCodeOpt) } case Some(false) => logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " + diff --git a/client/src/main/scala/io/delta/sharing/client/model.scala b/client/src/main/scala/io/delta/sharing/client/model.scala index 8273fa8f9..2f2b3eccf 100644 --- a/client/src/main/scala/io/delta/sharing/client/model.scala +++ b/client/src/main/scala/io/delta/sharing/client/model.scala @@ -129,7 +129,8 @@ private[sharing] case class EndStreamAction( refreshToken: String, nextPageToken: String, minUrlExpirationTimestamp: java.lang.Long, - errorMessage: String = null) + errorMessage: String = null, + httpStatusErrorCode: java.lang.Integer = null) extends Action { override def wrap: SingleAction = SingleAction(endStreamAction = this) } diff --git a/client/src/main/scala/io/delta/sharing/client/util/RetryUtils.scala b/client/src/main/scala/io/delta/sharing/client/util/RetryUtils.scala index 6ce6e17a4..9af369061 100644 --- a/client/src/main/scala/io/delta/sharing/client/util/RetryUtils.scala +++ b/client/src/main/scala/io/delta/sharing/client/util/RetryUtils.scala @@ -18,11 +18,9 @@ package io.delta.sharing.client.util import java.io.{InterruptedIOException, IOException} -import scala.util.control.NonFatal - import org.apache.spark.internal.Logging -import io.delta.sharing.spark.MissingEndStreamActionException +import io.delta.sharing.spark.{DeltaSharingExceptionWithErrorCode, MissingEndStreamActionException} private[sharing] object RetryUtils extends Logging { @@ -64,10 +62,10 @@ private[sharing] object RetryUtils extends Logging { def shouldRetry(t: Throwable): Boolean = { t match { - case e: UnexpectedHttpStatus => - if (e.statusCode == 429) { // Too Many Requests + case DeltaSharingExceptionWithErrorCode(Some(statusCode)) => + if (statusCode == 429) { // Too Many Requests true - } else if (e.statusCode >= 500 && e.statusCode < 600) { // Internal Error + } else if (statusCode >= 500 && statusCode < 600) { // Internal Error true } else { false @@ -86,4 +84,4 @@ private[sharing] object RetryUtils extends Logging { } private[sharing] class UnexpectedHttpStatus(message: String, val statusCode: Int) - extends IllegalStateException(message) + extends DeltaSharingExceptionWithErrorCode(message, Some(statusCode)) diff --git a/client/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala b/client/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala index c27195f98..26eae4bc0 100644 --- a/client/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala +++ b/client/src/main/scala/io/delta/sharing/spark/DeltaSharingErrors.scala @@ -18,9 +18,20 @@ package io.delta.sharing.spark import org.apache.spark.sql.types.StructType +// Common base exception class that also encapsulates the associated errorCode +abstract class DeltaSharingExceptionWithErrorCode( + message: String, + val statusCodeOpt: Option[Int] +) extends IllegalStateException(message) + +object DeltaSharingExceptionWithErrorCode { + def unapply(e: DeltaSharingExceptionWithErrorCode): Option[Option[Int]] = Some(e.statusCodeOpt) +} + class MissingEndStreamActionException(message: String) extends IllegalStateException(message) -class DeltaSharingServerException(message: String) extends RuntimeException(message) +class DeltaSharingServerException(message: String, statusCodeOpt: Option[Int]) + extends DeltaSharingExceptionWithErrorCode(message, statusCodeOpt) object DeltaSharingErrors { def nonExistentDeltaSharingTable(tableId: String): Throwable = { diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala index 34f6bf3a4..997aea058 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala @@ -1243,6 +1243,13 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { minUrlExpirationTimestamp = null, errorMessage = "BAD REQUEST: Error Occurred During Streaming" ).wrap) + val fakeEndStreamActionStrWithErrorMsgAndCode = JsonUtils.toJson(EndStreamAction( + refreshToken = null, + nextPageToken = null, + minUrlExpirationTimestamp = null, + errorMessage = "BAD REQUEST: Error Occurred During Streaming", + httpStatusErrorCode = 400 + ).wrap) test("checkEndStreamAction succeeded") { // DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true @@ -1333,10 +1340,13 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { test("checkEndStreamAction with error message throws streaming error") { def checkErrorMessage( - e: DeltaSharingServerException, - additionalErrorMsg: String): Unit = { - val commonErrorMsg = "Request failed during streaming response with error message" + e: DeltaSharingServerException, + additionalErrorMsg: String, + errorCodeOpt: Option[Int] = None): Unit = { + val commonErrorMsg = s"Server Exception[${errorCodeOpt.getOrElse("")}]" assert(e.getMessage.contains(commonErrorMsg)) + // null/non-existent httpStatusErrorCode in endStreamAction json string defaults to e.statusCodeOpt=None + assert(e.statusCodeOpt == errorCodeOpt) assert(e.getMessage.contains(additionalErrorMsg)) } @@ -1352,7 +1362,18 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { } checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming") - // checkEndStreamAction throws error if the only line is EndStreamAction with error message + // checkEndStreamAction throws error if the last line is EndStreamAction with error message + e = intercept[DeltaSharingServerException] { + checkEndStreamAction( + Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"), + Map(DELTA_SHARING_INCLUDE_END_STREAM_ACTION -> "true"), + Seq(fakeAddFileStr, fakeEndStreamActionStrWithErrorMsgAndCode), + "random-query-id" + ) + } + checkErrorMessage(e, "BAD REQUEST: Error Occurred During Streaming", Some(400)) + + // checkEndStreamAction throws error if the only line is EndStreamAction with error message & errorCode e = intercept[DeltaSharingServerException] { checkEndStreamAction( Some(s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"), diff --git a/client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala b/client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala index 934147233..e050ff96a 100644 --- a/client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.SparkFunSuite import io.delta.sharing.client.util.{RetryUtils, UnexpectedHttpStatus} import io.delta.sharing.client.util.RetryUtils._ -import io.delta.sharing.spark.MissingEndStreamActionException +import io.delta.sharing.spark.{DeltaSharingServerException, MissingEndStreamActionException} class RetryUtilsSuite extends SparkFunSuite { test("shouldRetry") { @@ -37,6 +37,10 @@ class RetryUtilsSuite extends SparkFunSuite { assert(shouldRetry(new java.net.SocketTimeoutException)) assert(!shouldRetry(new RuntimeException)) assert(shouldRetry(new MissingEndStreamActionException("missing"))) + assert(!shouldRetry(new DeltaSharingServerException("error", Some(403)))) + assert(shouldRetry(new DeltaSharingServerException("error", Some(429)))) + assert(!shouldRetry(new DeltaSharingServerException("error", None))) + assert(shouldRetry(new DeltaSharingServerException("error", Some(503)))) } test("runWithExponentialBackoff") { diff --git a/server/src/main/scala/io/delta/sharing/server/model.scala b/server/src/main/scala/io/delta/sharing/server/model.scala index 00d91fab1..abe6bd32d 100644 --- a/server/src/main/scala/io/delta/sharing/server/model.scala +++ b/server/src/main/scala/io/delta/sharing/server/model.scala @@ -155,7 +155,8 @@ case class EndStreamAction( refreshToken: String, nextPageToken: String, minUrlExpirationTimestamp: java.lang.Long, - errorMessage: String = null + errorMessage: String = null, + httpStatusErrorCode: java.lang.Integer = null ) extends Action { override def wrap: SingleAction = SingleAction(endStreamAction = this) }