From 692b523b8e92f40d4664e81393a934b26956773c Mon Sep 17 00:00:00 2001 From: Abhijit Chakankar Date: Wed, 22 Oct 2025 12:02:23 -0700 Subject: [PATCH] Validation URL; do not checkin. --- .../sharing/client/DeltaSharingClient.scala | 105 +++++++++++++++++- .../scala/io/delta/sharing/client/model.scala | 8 +- .../delta/sharing/client/util/ConfUtils.scala | 11 ++ project/build.properties | 2 +- .../spark/TestDeltaSharingClient.scala | 1 + 5 files changed, 120 insertions(+), 7 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 640e32309..5cfb99609 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -16,12 +16,13 @@ package io.delta.sharing.client -import java.io.{BufferedReader, InputStreamReader} +import java.io.{BufferedReader, InputStreamReader, IOException} import java.net.{URL, URLEncoder} import java.nio.charset.StandardCharsets.UTF_8 import java.util.UUID import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import scala.util.Using import scala.util.control.NonFatal import org.apache.commons.io.IOUtils @@ -171,6 +172,16 @@ private[sharing] case class GetQueryTableInfoRequest( } } +private[sharing] case class UrlValidationResult( + vUrl: ValidationURL, + validationSuccess: Boolean, + errMsg: Option[String] +) + +private[sharing] case class UrlValidationReportRequest( + results: Seq[UrlValidationResult] +) + private[sharing] case class ListSharesResponse( items: Seq[Share], nextPageToken: Option[String]) extends PaginationResponse @@ -194,6 +205,7 @@ class DeltaSharingRestClient( maxFilesPerReq: Int = 100000, endStreamActionEnabled: Boolean = false, enableAsyncQuery: Boolean = false, + enablePreviewRead: Boolean = false, asyncQueryPollIntervalMillis: Long = 10000L, asyncQueryMaxDuration: Long = 600000L, tokenExchangeMaxRetries: Int = 5, @@ -201,8 +213,9 @@ class DeltaSharingRestClient( tokenRenewalThresholdInSeconds: Int = 600 ) extends DeltaSharingClient with Logging { + logInfo(s"Hello Abhijit") logInfo(s"DeltaSharingRestClient with endStreamActionEnabled: $endStreamActionEnabled, " + - s"enableAsyncQuery:$enableAsyncQuery") + s"enableAsyncQuery:$enableAsyncQuery, enablePreviewRead:$enablePreviewRead") import DeltaSharingRestClient._ @@ -237,6 +250,24 @@ class DeltaSharingRestClient( client } + @volatile private var createdPreviewReadHttpClient = false + private lazy val previewReadHttpClient = { + val config = RequestConfig.custom() + .setConnectTimeout(timeoutInSeconds * 1000) + .setConnectionRequestTimeout(timeoutInSeconds * 1000) + .setSocketTimeout(timeoutInSeconds * 1000).build() + logDebug(s"Creating previewReadHttpClient with timeoutInSeconds: $timeoutInSeconds.") + val clientBuilder = HttpClientBuilder.create() + val client = clientBuilder + // Disable the default retry behavior because we have our own retry logic. + // See `RetryUtils.runWithExponentialBackoff`. + .disableAutomaticRetries() + .setDefaultRequestConfig(config) + .build() + createdPreviewReadHttpClient = true + client + } + private lazy val authCredentialProvider = AuthCredentialProviderFactory.createCredentialProvider( profileProvider.getProfile, AuthConfig(tokenExchangeMaxRetries, @@ -359,7 +390,7 @@ class DeltaSharingRestClient( } logInfo( - s"Fetched metadata for table ${getFullTableName(table)}, version ${response.version} " + + s"Abh Fetched metadata for table ${getFullTableName(table)}, version ${response.version} " + s"with response format ${response.respondedFormat}" + getDsQueryIdForLogging ) @@ -445,7 +476,7 @@ class DeltaSharingRestClient( ) logInfo( - s"Fetched files for table ${getFullTableName(table)}, predicate $predicates, limit $limit, " + + s"FETCHED files for table ${getFullTableName(table)}, predicate $predicates, limit $limit, " + s"versionAsOf $versionAsOf, timestampAsOf $timestampAsOf, " + s"jsonPredicateHints $jsonPredicateHints, refreshToken $refreshToken, " + s"idempotency_key $idempotency_key\n" + @@ -527,7 +558,7 @@ class DeltaSharingRestClient( val response = getNDJsonPost( target = target, data = request, setIncludeEndStreamAction = endStreamActionEnabled ) - val (filteredLines, _) = maybeExtractEndStreamAction(response.lines) + val (filteredLines, endStreamActionOpt) = maybeExtractEndStreamAction(response.lines) logInfo(s"Took ${System.currentTimeMillis() - start} ms to query ${filteredLines.size} " + s"files for table " + getFullTableName(table) + s" with [$startingVersion, $endingVersion]," + getDsQueryIdForLogging @@ -658,9 +689,62 @@ class DeltaSharingRestClient( } } + handleValidationAction(endStreamAction) + (version, respondedFormat, allLines.toSeq, refreshToken) } + private def handleValidationAction(endStreamAction: Option[EndStreamAction]): Unit = { + endStreamAction.map(a => { + val vUrls = a.validationUrls + if (vUrls != null) { + vUrls.foreach(vUrl => { + try { + logInfo("Validating url: " + vUrl) + validateAndReport(vUrl) + } catch { + case t: Throwable => + logInfo("Error while validating url " + vUrl + ": " + t.toString) + } + }) + } + }) + } + + private def validateAndReport(vUrl: ValidationURL): Unit = { + val request = new HttpGet(vUrl.url) + logInfo("validateAndReport: url=" + vUrl) + val results = ArrayBuffer[UrlValidationResult]() + Using.resource(previewReadHttpClient.execute(request)) { response => + val status = response.getStatusLine() + val statusCode = status.getStatusCode + logInfo("validateAndReport: status=" + statusCode + ", url=" + vUrl) + val result = if (statusCode != HttpStatus.SC_PARTIAL_CONTENT && + statusCode != HttpStatus.SC_OK) { + UrlValidationResult( + vUrl = vUrl, + validationSuccess = false, + errMsg = Some("Validation Error: HTTP Get request failed with status: $status") + ) + } else { + UrlValidationResult( + vUrl = vUrl, + validationSuccess = true, + errMsg = None + ) + } + results.append(result) + } + + val validationReportRequest = UrlValidationReportRequest(results = results.toSeq) + val json = JsonUtils.toJson(validationReportRequest) + val httpPost = new HttpPost(getTargetUrl("/validateURL")) + httpPost.setHeader("Content-type", "application/json") + httpPost.setEntity(new StringEntity(json, UTF_8)) + val response = getResponse(httpPost, setIncludeEndStreamAction = false) + logInfo("validateAndReport: validationReportRequest response=" + response) + } + override def getCDFFiles( table: Table, cdfOptions: Map[String, String], @@ -1239,6 +1323,10 @@ class DeltaSharingRestClient( capabilities = capabilities :+ s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true" } + if (enablePreviewRead) { + capabilities = capabilities :+ s"$DELTA_SHARING_CAPABILITIES_PREVIEW_READ=true" + } + val cap = capabilities.mkString(DELTA_SHARING_CAPABILITIES_DELIMITER) cap } @@ -1247,6 +1335,9 @@ class DeltaSharingRestClient( if (created) { try client.close() finally created = false } + if (createdPreviewReadHttpClient) { + try previewReadHttpClient.close() finally createdPreviewReadHttpClient = false + } } override def finalize(): Unit = { @@ -1262,6 +1353,7 @@ object DeltaSharingRestClient extends Logging { val READER_FEATURES = "readerfeatures" val DELTA_SHARING_CAPABILITIES_ASYNC_READ = "asyncquery" val DELTA_SHARING_INCLUDE_END_STREAM_ACTION = "includeendstreamaction" + val DELTA_SHARING_CAPABILITIES_PREVIEW_READ = "previewread" val RESPONSE_FORMAT_DELTA = "delta" val RESPONSE_FORMAT_PARQUET = "parquet" val DELTA_SHARING_CAPABILITIES_DELIMITER = ";" @@ -1408,6 +1500,7 @@ object DeltaSharingRestClient extends Logging { val maxFilesPerReq = ConfUtils.maxFilesPerQueryRequest(sqlConf) val useAsyncQuery = ConfUtils.useAsyncQuery(sqlConf) val endStreamActionEnabled = ConfUtils.includeEndStreamAction(sqlConf) + val enablePreviewRead = ConfUtils.enablePreviewRead(sqlConf) val asyncQueryMaxDurationMillis = ConfUtils.asyncQueryTimeout(sqlConf) val asyncQueryPollDurationMillis = ConfUtils.asyncQueryPollIntervalMillis(sqlConf) @@ -1432,6 +1525,7 @@ object DeltaSharingRestClient extends Logging { classOf[Int], classOf[Boolean], classOf[Boolean], + classOf[Boolean], classOf[Long], classOf[Long], classOf[Int], @@ -1450,6 +1544,7 @@ object DeltaSharingRestClient extends Logging { java.lang.Integer.valueOf(maxFilesPerReq), java.lang.Boolean.valueOf(endStreamActionEnabled), java.lang.Boolean.valueOf(useAsyncQuery), + java.lang.Boolean.valueOf(enablePreviewRead), java.lang.Long.valueOf(asyncQueryPollDurationMillis), java.lang.Long.valueOf(asyncQueryMaxDurationMillis), java.lang.Integer.valueOf(tokenExchangeMaxRetries), diff --git a/client/src/main/scala/io/delta/sharing/client/model.scala b/client/src/main/scala/io/delta/sharing/client/model.scala index 2f2b3eccf..8cf5be58c 100644 --- a/client/src/main/scala/io/delta/sharing/client/model.scala +++ b/client/src/main/scala/io/delta/sharing/client/model.scala @@ -125,12 +125,18 @@ private[sharing] case class Protocol(minReaderVersion: Int) extends Action { override def wrap: SingleAction = SingleAction(protocol = this) } +private[sharing] case class ValidationURL( + id: String, + url: String +) + private[sharing] case class EndStreamAction( refreshToken: String, nextPageToken: String, minUrlExpirationTimestamp: java.lang.Long, errorMessage: String = null, - httpStatusErrorCode: java.lang.Integer = null) + httpStatusErrorCode: java.lang.Integer = null, + validationUrls: Seq[ValidationURL] = null) extends Action { override def wrap: SingleAction = SingleAction(endStreamAction = this) } diff --git a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala index 58a5b9f14..1908ebfd2 100644 --- a/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala +++ b/client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala @@ -42,6 +42,9 @@ object ConfUtils { val USE_ASYNC_QUERY_CONF = "spark.delta.sharing.network.useAsyncQuery" val USE_ASYNC_QUERY_DEFAULT = "false" + val ENABLE_PREVIEW_READ_CONF = "spark.delta.sharing.network.enablePreviewRead" + val ENABLE_PREVIEW_READ_DEFAULT = "true" + val INCLUDE_END_STREAM_ACTION_CONF = "spark.delta.sharing.query.includeEndStreamAction" val INCLUDE_END_STREAM_ACTION_DEFAULT = "false" @@ -215,6 +218,14 @@ object ConfUtils { conf.getConfString(INCLUDE_END_STREAM_ACTION_CONF, INCLUDE_END_STREAM_ACTION_DEFAULT).toBoolean } + def enablePreviewRead(conf: Configuration): Boolean = { + conf.getBoolean(ENABLE_PREVIEW_READ_CONF, ENABLE_PREVIEW_READ_DEFAULT.toBoolean) + } + + def enablePreviewRead(conf: SQLConf): Boolean = { + conf.getConfString(ENABLE_PREVIEW_READ_CONF, ENABLE_PREVIEW_READ_DEFAULT).toBoolean + } + def timeoutInSeconds(conf: Configuration): Int = { val timeoutStr = conf.get(TIMEOUT_CONF, TIMEOUT_DEFAULT) toTimeInSeconds(timeoutStr, TIMEOUT_CONF) diff --git a/project/build.properties b/project/build.properties index 336cc0be0..d0ed4ca9a 100644 --- a/project/build.properties +++ b/project/build.properties @@ -11,4 +11,4 @@ # limitations under the License. # -sbt.version=1.5.0 +sbt.version=1.10.2 diff --git a/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala b/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala index adf7cfd04..1df04129e 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/TestDeltaSharingClient.scala @@ -50,6 +50,7 @@ class TestDeltaSharingClient( maxFilesPerReq: Int = 10000, endStreamActionEnabled: Boolean = false, enableAsyncQuery: Boolean = false, + enablePreviewRead: Boolean = false, asyncQueryPollIntervalMillis: Long = 1000L, asyncQueryMaxDuration: Long = Long.MaxValue, tokenExchangeMaxRetries: Int = 5,