Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 100 additions & 5 deletions client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

package io.delta.sharing.client

import java.io.{BufferedReader, InputStreamReader}
import java.io.{BufferedReader, InputStreamReader, IOException}
import java.net.{URL, URLEncoder}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.UUID

import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.Using
import scala.util.control.NonFatal

import org.apache.commons.io.IOUtils
Expand Down Expand Up @@ -171,6 +172,16 @@ private[sharing] case class GetQueryTableInfoRequest(
}
}

private[sharing] case class UrlValidationResult(
vUrl: ValidationURL,
validationSuccess: Boolean,
errMsg: Option[String]
)

private[sharing] case class UrlValidationReportRequest(
results: Seq[UrlValidationResult]
)

private[sharing] case class ListSharesResponse(
items: Seq[Share],
nextPageToken: Option[String]) extends PaginationResponse
Expand All @@ -194,15 +205,17 @@ class DeltaSharingRestClient(
maxFilesPerReq: Int = 100000,
endStreamActionEnabled: Boolean = false,
enableAsyncQuery: Boolean = false,
enablePreviewRead: Boolean = false,
asyncQueryPollIntervalMillis: Long = 10000L,
asyncQueryMaxDuration: Long = 600000L,
tokenExchangeMaxRetries: Int = 5,
tokenExchangeMaxRetryDurationInSeconds: Int = 60,
tokenRenewalThresholdInSeconds: Int = 600
) extends DeltaSharingClient with Logging {

logInfo(s"Hello Abhijit")
logInfo(s"DeltaSharingRestClient with endStreamActionEnabled: $endStreamActionEnabled, " +
s"enableAsyncQuery:$enableAsyncQuery")
s"enableAsyncQuery:$enableAsyncQuery, enablePreviewRead:$enablePreviewRead")

import DeltaSharingRestClient._

Expand Down Expand Up @@ -237,6 +250,24 @@ class DeltaSharingRestClient(
client
}

@volatile private var createdPreviewReadHttpClient = false
private lazy val previewReadHttpClient = {
val config = RequestConfig.custom()
.setConnectTimeout(timeoutInSeconds * 1000)
.setConnectionRequestTimeout(timeoutInSeconds * 1000)
.setSocketTimeout(timeoutInSeconds * 1000).build()
logDebug(s"Creating previewReadHttpClient with timeoutInSeconds: $timeoutInSeconds.")
val clientBuilder = HttpClientBuilder.create()
val client = clientBuilder
// Disable the default retry behavior because we have our own retry logic.
// See `RetryUtils.runWithExponentialBackoff`.
.disableAutomaticRetries()
.setDefaultRequestConfig(config)
.build()
createdPreviewReadHttpClient = true
client
}

private lazy val authCredentialProvider = AuthCredentialProviderFactory.createCredentialProvider(
profileProvider.getProfile,
AuthConfig(tokenExchangeMaxRetries,
Expand Down Expand Up @@ -359,7 +390,7 @@ class DeltaSharingRestClient(
}

logInfo(
s"Fetched metadata for table ${getFullTableName(table)}, version ${response.version} " +
s"Abh Fetched metadata for table ${getFullTableName(table)}, version ${response.version} " +
s"with response format ${response.respondedFormat}" + getDsQueryIdForLogging
)

Expand Down Expand Up @@ -445,7 +476,7 @@ class DeltaSharingRestClient(
)

logInfo(
s"Fetched files for table ${getFullTableName(table)}, predicate $predicates, limit $limit, " +
s"FETCHED files for table ${getFullTableName(table)}, predicate $predicates, limit $limit, " +
s"versionAsOf $versionAsOf, timestampAsOf $timestampAsOf, " +
s"jsonPredicateHints $jsonPredicateHints, refreshToken $refreshToken, " +
s"idempotency_key $idempotency_key\n" +
Expand Down Expand Up @@ -527,7 +558,7 @@ class DeltaSharingRestClient(
val response = getNDJsonPost(
target = target, data = request, setIncludeEndStreamAction = endStreamActionEnabled
)
val (filteredLines, _) = maybeExtractEndStreamAction(response.lines)
val (filteredLines, endStreamActionOpt) = maybeExtractEndStreamAction(response.lines)
logInfo(s"Took ${System.currentTimeMillis() - start} ms to query ${filteredLines.size} " +
s"files for table " + getFullTableName(table) +
s" with [$startingVersion, $endingVersion]," + getDsQueryIdForLogging
Expand Down Expand Up @@ -658,9 +689,62 @@ class DeltaSharingRestClient(
}
}

handleValidationAction(endStreamAction)

(version, respondedFormat, allLines.toSeq, refreshToken)
}

private def handleValidationAction(endStreamAction: Option[EndStreamAction]): Unit = {
endStreamAction.map(a => {
val vUrls = a.validationUrls
if (vUrls != null) {
vUrls.foreach(vUrl => {
try {
logInfo("Validating url: " + vUrl)
validateAndReport(vUrl)
} catch {
case t: Throwable =>
logInfo("Error while validating url " + vUrl + ": " + t.toString)
}
})
}
})
}

private def validateAndReport(vUrl: ValidationURL): Unit = {
val request = new HttpGet(vUrl.url)
logInfo("validateAndReport: url=" + vUrl)
val results = ArrayBuffer[UrlValidationResult]()
Using.resource(previewReadHttpClient.execute(request)) { response =>
val status = response.getStatusLine()
val statusCode = status.getStatusCode
logInfo("validateAndReport: status=" + statusCode + ", url=" + vUrl)
val result = if (statusCode != HttpStatus.SC_PARTIAL_CONTENT &&
statusCode != HttpStatus.SC_OK) {
UrlValidationResult(
vUrl = vUrl,
validationSuccess = false,
errMsg = Some("Validation Error: HTTP Get request failed with status: $status")
)
} else {
UrlValidationResult(
vUrl = vUrl,
validationSuccess = true,
errMsg = None
)
}
results.append(result)
}

val validationReportRequest = UrlValidationReportRequest(results = results.toSeq)
val json = JsonUtils.toJson(validationReportRequest)
val httpPost = new HttpPost(getTargetUrl("/validateURL"))
httpPost.setHeader("Content-type", "application/json")
httpPost.setEntity(new StringEntity(json, UTF_8))
val response = getResponse(httpPost, setIncludeEndStreamAction = false)
logInfo("validateAndReport: validationReportRequest response=" + response)
}

override def getCDFFiles(
table: Table,
cdfOptions: Map[String, String],
Expand Down Expand Up @@ -1239,6 +1323,10 @@ class DeltaSharingRestClient(
capabilities = capabilities :+ s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"
}

if (enablePreviewRead) {
capabilities = capabilities :+ s"$DELTA_SHARING_CAPABILITIES_PREVIEW_READ=true"
}

val cap = capabilities.mkString(DELTA_SHARING_CAPABILITIES_DELIMITER)
cap
}
Expand All @@ -1247,6 +1335,9 @@ class DeltaSharingRestClient(
if (created) {
try client.close() finally created = false
}
if (createdPreviewReadHttpClient) {
try previewReadHttpClient.close() finally createdPreviewReadHttpClient = false
}
}

override def finalize(): Unit = {
Expand All @@ -1262,6 +1353,7 @@ object DeltaSharingRestClient extends Logging {
val READER_FEATURES = "readerfeatures"
val DELTA_SHARING_CAPABILITIES_ASYNC_READ = "asyncquery"
val DELTA_SHARING_INCLUDE_END_STREAM_ACTION = "includeendstreamaction"
val DELTA_SHARING_CAPABILITIES_PREVIEW_READ = "previewread"
val RESPONSE_FORMAT_DELTA = "delta"
val RESPONSE_FORMAT_PARQUET = "parquet"
val DELTA_SHARING_CAPABILITIES_DELIMITER = ";"
Expand Down Expand Up @@ -1408,6 +1500,7 @@ object DeltaSharingRestClient extends Logging {
val maxFilesPerReq = ConfUtils.maxFilesPerQueryRequest(sqlConf)
val useAsyncQuery = ConfUtils.useAsyncQuery(sqlConf)
val endStreamActionEnabled = ConfUtils.includeEndStreamAction(sqlConf)
val enablePreviewRead = ConfUtils.enablePreviewRead(sqlConf)
val asyncQueryMaxDurationMillis = ConfUtils.asyncQueryTimeout(sqlConf)
val asyncQueryPollDurationMillis = ConfUtils.asyncQueryPollIntervalMillis(sqlConf)

Expand All @@ -1432,6 +1525,7 @@ object DeltaSharingRestClient extends Logging {
classOf[Int],
classOf[Boolean],
classOf[Boolean],
classOf[Boolean],
classOf[Long],
classOf[Long],
classOf[Int],
Expand All @@ -1450,6 +1544,7 @@ object DeltaSharingRestClient extends Logging {
java.lang.Integer.valueOf(maxFilesPerReq),
java.lang.Boolean.valueOf(endStreamActionEnabled),
java.lang.Boolean.valueOf(useAsyncQuery),
java.lang.Boolean.valueOf(enablePreviewRead),
java.lang.Long.valueOf(asyncQueryPollDurationMillis),
java.lang.Long.valueOf(asyncQueryMaxDurationMillis),
java.lang.Integer.valueOf(tokenExchangeMaxRetries),
Expand Down
8 changes: 7 additions & 1 deletion client/src/main/scala/io/delta/sharing/client/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,18 @@ private[sharing] case class Protocol(minReaderVersion: Int) extends Action {
override def wrap: SingleAction = SingleAction(protocol = this)
}

private[sharing] case class ValidationURL(
id: String,
url: String
)

private[sharing] case class EndStreamAction(
refreshToken: String,
nextPageToken: String,
minUrlExpirationTimestamp: java.lang.Long,
errorMessage: String = null,
httpStatusErrorCode: java.lang.Integer = null)
httpStatusErrorCode: java.lang.Integer = null,
validationUrls: Seq[ValidationURL] = null)
extends Action {
override def wrap: SingleAction = SingleAction(endStreamAction = this)
}
Expand Down
11 changes: 11 additions & 0 deletions client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ object ConfUtils {
val USE_ASYNC_QUERY_CONF = "spark.delta.sharing.network.useAsyncQuery"
val USE_ASYNC_QUERY_DEFAULT = "false"

val ENABLE_PREVIEW_READ_CONF = "spark.delta.sharing.network.enablePreviewRead"
val ENABLE_PREVIEW_READ_DEFAULT = "true"

val INCLUDE_END_STREAM_ACTION_CONF = "spark.delta.sharing.query.includeEndStreamAction"
val INCLUDE_END_STREAM_ACTION_DEFAULT = "false"

Expand Down Expand Up @@ -215,6 +218,14 @@ object ConfUtils {
conf.getConfString(INCLUDE_END_STREAM_ACTION_CONF, INCLUDE_END_STREAM_ACTION_DEFAULT).toBoolean
}

def enablePreviewRead(conf: Configuration): Boolean = {
conf.getBoolean(ENABLE_PREVIEW_READ_CONF, ENABLE_PREVIEW_READ_DEFAULT.toBoolean)
}

def enablePreviewRead(conf: SQLConf): Boolean = {
conf.getConfString(ENABLE_PREVIEW_READ_CONF, ENABLE_PREVIEW_READ_DEFAULT).toBoolean
}

def timeoutInSeconds(conf: Configuration): Int = {
val timeoutStr = conf.get(TIMEOUT_CONF, TIMEOUT_DEFAULT)
toTimeInSeconds(timeoutStr, TIMEOUT_CONF)
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
# limitations under the License.
#

sbt.version=1.5.0
sbt.version=1.10.2
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class TestDeltaSharingClient(
maxFilesPerReq: Int = 10000,
endStreamActionEnabled: Boolean = false,
enableAsyncQuery: Boolean = false,
enablePreviewRead: Boolean = false,
asyncQueryPollIntervalMillis: Long = 1000L,
asyncQueryMaxDuration: Long = Long.MaxValue,
tokenExchangeMaxRetries: Int = 5,
Expand Down
Loading