Skip to content
This repository was archived by the owner on Mar 10, 2025. It is now read-only.

Commit cb4b662

Browse files
committed
update the code to correctly handle SAM types
1 parent ce12d39 commit cb4b662

File tree

2 files changed

+9
-5
lines changed

2 files changed

+9
-5
lines changed

src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.rdd.RDD
3131
import org.apache.spark.sql.sources.Filter
3232
import org.apache.spark.sql.types.StructType
3333
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
34+
import org.apache.spark.util.TaskCompletionListener
3435
import org.apache.spark.{Partition, TaskContext}
3536

3637
import scala.collection.mutable
@@ -117,9 +118,10 @@ class CosmosDBRDD(
117118
var cosmosDBPartition: CosmosDBPartition = partition.asInstanceOf[CosmosDBPartition]
118119
logInfo(s"CosmosDBRDD:compute: Start CosmosDBRDD compute task for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")
119120

120-
context.addTaskCompletionListener((ctx: TaskContext) => {
121+
val taskCompletionListener:TaskCompletionListener = (ctx: TaskContext) => {
121122
logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")
122-
})
123+
}
124+
context.addTaskCompletionListener(taskCompletionListener)
123125

124126
new CosmosDBRDDIterator(
125127
hadoopConfig,

src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap
2727

2828
import com.fasterxml.jackson.databind.ObjectMapper
2929
import com.microsoft.azure.cosmosdb.internal.HttpConstants.StatusCodes
30-
import com.microsoft.azure.cosmosdb.internal.directconnectivity.{ ServiceUnavailableException}
30+
import com.microsoft.azure.cosmosdb.internal.directconnectivity.ServiceUnavailableException
3131
import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig}
3232
import com.microsoft.azure.cosmosdb.spark.partitioner.CosmosDBPartition
3333
import com.microsoft.azure.cosmosdb.spark.schema._
@@ -38,6 +38,7 @@ import com.microsoft.azure.documentdb.internal.HttpConstants.SubStatusCodes
3838
import org.apache.commons.lang3.StringUtils
3939
import org.apache.spark._
4040
import org.apache.spark.sql.sources.Filter
41+
import org.apache.spark.util.TaskCompletionListener
4142

4243
import scala.collection.mutable
4344

@@ -414,10 +415,11 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
414415
})
415416

416417
// Register an on-task-completion callback to close the input stream.
417-
taskContext.addTaskCompletionListener((context: TaskContext) => {
418+
val taskCompletionListener: TaskCompletionListener = (context: TaskContext) => {
418419
connection.reinitializeClient()
419420
closeIfNeeded()
420-
})
421+
}
422+
taskContext.addTaskCompletionListener(taskCompletionListener)
421423

422424
if (!readingChangeFeed) {
423425
queryDocuments

0 commit comments

Comments
 (0)