Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,10 @@ class DeltaSharingRestClient(
private def checkRespondedFormat(respondedFormat: String, rpc: String, table: String): Unit = {
if (!responseFormatSet.contains(respondedFormat)) {
logError(s"RespondedFormat($respondedFormat) is different from requested " +
s"responseFormat($responseFormat) for $rpc for table $table.")
s"responseFormat($responseFormat) for $rpc for table $table, queryId[$queryId].")
throw new IllegalArgumentException("The responseFormat returned from the delta sharing " +
s"server doesn't match the requested responseFormat: respondedFormat($respondedFormat)" +
s" != requestedFormat($responseFormat).")
s" != requestedFormat($responseFormat), queryId[$queryId].")
}
}

Expand Down Expand Up @@ -716,7 +716,7 @@ class DeltaSharingRestClient(
|$expectedProtocol, $expectedMetadata. Actual: version $version,
|$respondedFormat, ${lines(0)}, ${lines(1)}""".stripMargin
logError(s"Error while fetching next page files at url $targetUrl " +
s"with body(${JsonUtils.toJson(requestBody.orNull)}: $errorMsg)")
s"with body(${JsonUtils.toJson(requestBody.orNull)}: $errorMsg), queryId[$queryId].")
throw new IllegalStateException(errorMsg)
}

Expand Down Expand Up @@ -1008,7 +1008,8 @@ class DeltaSharingRestClient(
}
} catch {
case e: org.apache.http.ConnectionClosedException =>
val error = s"Request to delta sharing server failed due to ${e}."
val error = s"Request to delta sharing server failed for queryId[$queryId] " +
s"due to ${e}."
logError(error)
lineBuffer += error
lineBuffer.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ case class DeltaSharingSource(
val intervalSeconds = ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS.max(
ConfUtils.streamingQueryTableVersionIntervalSeconds(spark.sessionState.conf)
)
logInfo(s"Configured queryTableVersionIntervalSeconds:${intervalSeconds}.")
logInfo(s"Configured queryTableVersionIntervalSeconds:${intervalSeconds}, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
if (intervalSeconds < ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS) {
throw new IllegalArgumentException(s"QUERY_TABLE_VERSION_INTERVAL_MILLIS($intervalSeconds) " +
s"must not be less than ${ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS} seconds.")
Expand All @@ -174,13 +175,14 @@ case class DeltaSharingSource(
if (lastGetVersionTimestamp == -1 ||
(currentTimeMillis - lastGetVersionTimestamp) >= QUERY_TABLE_VERSION_INTERVAL_MILLIS) {
val serverVersion = deltaLog.client.getTableVersion(deltaLog.table)
logInfo(s"Got table version $serverVersion from Delta Sharing Server.")
logInfo(s"Got table version $serverVersion from Delta Sharing Server." +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
if (serverVersion < 0) {
throw new IllegalStateException(s"Delta Sharing Server returning negative table version:" +
s"$serverVersion.")
} else if (serverVersion < latestTableVersion) {
logWarning(s"Delta Sharing Server returning smaller table version:$serverVersion < " +
s"$latestTableVersion.")
s"$latestTableVersion, for table(id:$tableId, name:${deltaLog.table.toString})")
}
latestTableVersion = serverVersion
lastGetVersionTimestamp = currentTimeMillis
Expand Down Expand Up @@ -382,7 +384,7 @@ case class DeltaSharingSource(
)
}
logInfo(s"Refreshed ${numUrlsRefreshed} urls in sortedFetchedFiles(size: " +
s"${sortedFetchedFiles.size}).")
s"${sortedFetchedFiles.size}), for table(id:$tableId, name:${deltaLog.table.toString})")
}
}

Expand Down Expand Up @@ -456,7 +458,7 @@ case class DeltaSharingSource(
val numFiles = tableFiles.files.size
logInfo(
s"Fetched ${numFiles} files for table version ${tableFiles.version} from" +
" delta sharing server."
s" delta sharing server, for table(id:$tableId, name:${deltaLog.table.toString})."
)
tableFiles.files.sortWith(fileActionCompareFunc).zipWithIndex.foreach {
case (file, index) if (index > fromIndex) =>
Expand Down Expand Up @@ -514,7 +516,7 @@ case class DeltaSharingSource(
logInfo(
s"Fetched and filtered ${allAddFiles.size} files from startingVersion " +
s"${fromVersion} to endingVersion ${endingVersionForQuery} from " +
"delta sharing server."
s"delta sharing server, for table(id:$tableId, name:${deltaLog.table.toString})."
)
for (v <- fromVersion to endingVersionForQuery) {
val vAddFiles = allAddFiles.getOrElse(v, ArrayBuffer[AddFileForCDF]())
Expand Down Expand Up @@ -1128,7 +1130,7 @@ case class DeltaSharingSource(
} else if (options.startingTimestamp.isDefined) {
val version = deltaLog.client.getTableVersion(deltaLog.table, options.startingTimestamp)
logInfo(s"Got table version $version for timestamp ${options.startingTimestamp} " +
s"from Delta Sharing Server.")
s"from Delta Sharing Server, for table(id:$tableId, name:${deltaLog.table.toString})")
Some(version)
} else {
None
Expand Down
Loading