From cba3a088fa1ad4426c0d06c43d0b371c8c80d88c Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Fri, 27 Sep 2024 15:29:09 -0700 Subject: [PATCH 1/3] tmp --- .../sharing/client/DeltaSharingClient.scala | 43 ++++++++++++++++--- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 9e49394c5..6ecbe86d9 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -960,6 +960,12 @@ class DeltaSharingRestClient( httpRequest } + private def debugoutput(str: String): Unit = { + // scalastyle:off println + Console.println(s"----[linzhou]----$str") + logError(s"----[linzhou]----$str") + } + /** * Send the http request and return the table version in the header if any, and the response * content. @@ -977,12 +983,18 @@ class DeltaSharingRestClient( // Reset queryId before calling RetryUtils, and before prepareHeaders. queryId = Some(UUID.randomUUID().toString().split('-').head) RetryUtils.runWithExponentialBackoff(numRetries, maxRetryDuration) { + val startTime = System.currentTimeMillis() val profile = profileProvider.getProfile val response = client.execute( getHttpHost(profile.endpoint), prepareHeaders(httpRequest), HttpClientContext.create() ) + + def elapsedTime: Long = { + System.currentTimeMillis() - startTime + } + try { val status = response.getStatusLine() val entity = response.getEntity() @@ -999,32 +1011,49 @@ class DeltaSharingRestClient( new InputStreamReader(new BoundedInputStream(input), UTF_8) ) var line: Option[String] = None - while ({ - line = Option(reader.readLine()); line.isDefined + + debugoutput(s"----[linzhou]----before while:" + + s"${(System.currentTimeMillis() - startTime)}ms, ${getEntityDebugStr(entity)}") + while ( { + line = Option(reader.readLine()); + line.isDefined }) { - lineBuffer += line.get + val a = line.get + debugoutput(s"----[linzhou]----in while: ($elapsedTime)ms, newLine:[$a]") + debugoutput(s"----[linzhou]----in while: ($elapsedTime)ms, " + + s"debug:[${getEntityDebugStr(entity)}]") + lineBuffer += a } lineBuffer.toList } } catch { case e: org.apache.http.ConnectionClosedException => - val error = s"Request to delta sharing server failed for queryId[$queryId] " + - s"due to ${e}." + val error = s"Request to delta sharing server failed due to ${e}." logError(error) lineBuffer += error lineBuffer.toList + case otherE: Exception => + val error = s"Request to delta sharing server failed due tooo ${otherE}." + logError(error) + throw otherE } finally { + debugoutput(s"----[linzhou]----in finally: ($elapsedTime)ms, " + + s"status:${response.getStatusLine}") input.close() } } + debugoutput(s"----[linzhou]----after: ($elapsedTime)ms, " + + s"status:${response.getStatusLine}") + debugoutput(s"----[linzhou]----after: ($elapsedTime)ms, " + + s"entity:${getEntityDebugStr(response.getEntity)}") val statusCode = status.getStatusCode if (!(statusCode == HttpStatus.SC_OK || (allowNoContent && statusCode == HttpStatus.SC_NO_CONTENT))) { var additionalErrorInfo = "" - if (statusCode == HttpStatus.SC_UNAUTHORIZED && tokenExpired(profile)) { + if (statusCode == HttpStatus.SC_UNAUTHORIZED && tokenExpired()) { additionalErrorInfo = s"It may be caused by an expired token as it has expired " + - s"at ${profile.expirationTime}" + s"at ${authCredentialProvider.getExpirationTime()}" } // Only show the last 100 lines in the error to keep it contained. val responseToShow = lines.drop(lines.size - 100).mkString("\n") From 40c650ad38177cb3fd27db8842c6e377d5ef3286 Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Fri, 27 Sep 2024 15:29:49 -0700 Subject: [PATCH 2/3] add missing function --- .../scala/io/delta/sharing/client/DeltaSharingClient.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 6ecbe86d9..b3d9dd73d 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -960,6 +960,13 @@ class DeltaSharingRestClient( httpRequest } + + import org.apache.http.HttpEntity + private def getEntityDebugStr(entity: HttpEntity): String = { + s"isRepeatable:${entity.isRepeatable},isChunked:${entity.isChunked}," + + s"getContentLength:${entity.getContentLength},isStreaming:${entity.isStreaming}" + } + private def debugoutput(str: String): Unit = { // scalastyle:off println Console.println(s"----[linzhou]----$str") From 5c91c279e3406ccf333f6d31a20a358f670f3ab3 Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Fri, 27 Sep 2024 15:31:07 -0700 Subject: [PATCH 3/3] revert --- .../scala/io/delta/sharing/client/DeltaSharingClient.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index b3d9dd73d..e4fd04c01 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -1058,9 +1058,9 @@ class DeltaSharingRestClient( if (!(statusCode == HttpStatus.SC_OK || (allowNoContent && statusCode == HttpStatus.SC_NO_CONTENT))) { var additionalErrorInfo = "" - if (statusCode == HttpStatus.SC_UNAUTHORIZED && tokenExpired()) { + if (statusCode == HttpStatus.SC_UNAUTHORIZED && tokenExpired(profile)) { additionalErrorInfo = s"It may be caused by an expired token as it has expired " + - s"at ${authCredentialProvider.getExpirationTime()}" + s"at ${profile.expirationTime}" } // Only show the last 100 lines in the error to keep it contained. val responseToShow = lines.drop(lines.size - 100).mkString("\n")