Skip to content
This repository was archived by the owner on Mar 10, 2025. It is now read-only.

Schema changes for connecto #324

Open
wants to merge 13 commits into
base: 2.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ limitations under the License.
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-cosmosdb-spark_2.4.0_2.11</artifactId>
<packaging>jar</packaging>
<version>1.4.0</version>
<version>1.4.4-SNAPSHOT</version>
<name>${project.groupId}:${project.artifactId}</name>
<description>Spark Connector for Microsoft Azure CosmosDB</description>
<url>http://azure.microsoft.com/en-us/services/documentdb/</url>
Expand Down Expand Up @@ -126,6 +126,11 @@ limitations under the License.
<artifactId>jackson-databind</artifactId>
<version>2.9.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>2.9.8</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
package com.microsoft.azure.cosmosdb.spark

object Constants {
val currentVersion = "2.4.0_2.11-1.3.5"
val currentVersion = "2.4.0_2.11-1.4.4"
val userAgentSuffix = s" SparkConnector/$currentVersion"
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
package com.microsoft.azure.cosmosdb.spark

import java.lang.management.ManagementFactory
import java.util.Collection

import com.microsoft.azure.cosmosdb.spark.config._
import com.microsoft.azure.cosmosdb.spark.util.JacksonWrapper
import com.microsoft.azure.documentdb._
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor
import com.microsoft.azure.documentdb.internal._
Expand Down Expand Up @@ -192,6 +195,22 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
feedResponse.getQueryIterable.iterator()
}

def readSchema(schemaType : String) = {
val partitionKeyDefinition = getCollection.getPartitionKey
val partitionKeyPath = partitionKeyDefinition.getPaths
val partitionKeyProperty = partitionKeyPath.iterator.next.replaceFirst("^/", "")

val feedOptions = new FeedOptions()
feedOptions.setEnableCrossPartitionQuery(true)
var schemaDocument : ItemSchema = null
val response = documentClient.queryDocuments(collectionLink, new SqlQuerySpec("Select * from c where c.schemaType = '" + schemaType + "' and c." + partitionKeyProperty + " = '__schema__" + schemaType + "'"), feedOptions);
val schemaResponse = response.getQueryIterable.fetchNextBlock()
if(schemaResponse != null && !schemaResponse.isEmpty) {
schemaDocument = JacksonWrapper.deserialize[ItemSchema](schemaResponse.get(0).toJson());
}
schemaDocument
}

def readDocuments(feedOptions: FeedOptions): Iterator[Document] = {
documentClient.readDocuments(collectionLink, feedOptions).getQueryIterable.iterator()
}
Expand Down Expand Up @@ -234,6 +253,14 @@ private[spark] case class CosmosDBConnection(config: Config) extends CosmosDBLog
documentClient.upsertDocument(collectionLink, document, requestOptions, false)
}

def insertDocument(collectionLink: String,
document: Document,
requestOptions: RequestOptions): Unit = {
logTrace(s"Inserting document $document")
documentClient.createDocument(collectionLink, document, requestOptions, false)
}


def isDocumentCollectionEmpty: Boolean = {
logDebug(s"Reading collection $collectionLink")
var requestOptions = new RequestOptions
Expand Down
148 changes: 146 additions & 2 deletions src/main/scala/com/microsoft/azure/cosmosdb/spark/CosmosDBSpark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ package com.microsoft.azure.cosmosdb.spark
import java.util.UUID
import java.util.concurrent.TimeUnit

import com.fasterxml.jackson.databind.ObjectMapper
import com.microsoft.azure.cosmosdb.spark.config._
import com.microsoft.azure.cosmosdb.spark.rdd.{CosmosDBRDD, _}
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.util.{HdfsUtils, JacksonWrapper}
import com.microsoft.azure.cosmosdb.spark.util.HdfsUtils
import rx.Observable
import com.microsoft.azure.documentdb._
Expand All @@ -38,13 +40,15 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.types.StructType
import org.json4s.jackson.Json

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
import scala.util.Random
import scala.util.parsing.json.JSONObject

/**
* The CosmosDBSpark allow fast creation of RDDs, DataFrames or Datasets from CosmosDBSpark.
Expand Down Expand Up @@ -190,6 +194,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
}
else
savePartition(iter, writeConfig, numPartitions, offerThroughput), preservesPartitioning = true)

mapRdd.collect()

// // All tasks have been completed, clean up the file checkpoints
Expand Down Expand Up @@ -265,7 +270,9 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
rootPropertyToSave: Option[String],
partitionKeyDefinition: Option[String],
upsert: Boolean,
maxConcurrencyPerPartitionRange: Integer): Unit = {
maxConcurrencyPerPartitionRange: Integer,
config: Config,
executePreSave: (ItemSchema, String, Option[String], Document) => Unit): Unit = {

// Set retry options high for initialization (default values)
connection.setDefaultClientRetryPolicy
Expand All @@ -275,6 +282,16 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {

// Set retry options to 0 to pass control to BulkExecutor
// connection.setZeroClientRetryPolicy
var schemaDocument : ItemSchema = null;
var schemaWriteRequired= false;
if(config.get[String](CosmosDBConfig.SchemaType).isDefined) {
schemaDocument = connection.readSchema(config.get[String](CosmosDBConfig.SchemaType).get);
if(schemaDocument == null){

// This means that we are writing data with a schema which is not defined yet
schemaWriteRequired = true
}
}

val documents = new java.util.ArrayList[String](writingBatchSize)

Expand All @@ -293,6 +310,103 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
if (document.getId == null) {
document.setId(UUID.randomUUID().toString)
}

if(schemaWriteRequired) {
// Create the schema document by reading columns from the first document
// This needs to be done only once


val schemaType = config.get[String](CosmosDBConfig.SchemaType).get
var schemaCols : ListBuffer[ItemColumn] = new ListBuffer[ItemColumn]();
val keys = document.getHashMap().keySet().toArray;

val partitionKeyDefinition = connection.getCollection.getPartitionKey
val partitionKeyPath = partitionKeyDefinition.getPaths
val partitionKeyProperty = partitionKeyPath.iterator.next.replaceFirst("^/", "")

var knownDefaults = List("", " ", 0)
var fixedDefaults = List("000000000000000000", "00000000000000000", "0000000000000000", "000000000000000", "00000000000000", "0000000000000","000000000000", "00000000000" ,"0000000000", "000000000" ,"00000000", "0000000", "000000","00000","0000","000","00","0")
knownDefaults = knownDefaults ::: fixedDefaults
if(config.get[String](CosmosDBConfig.KnownDefaultValues).isDefined) {
val customDefaults = config.get[String](CosmosDBConfig.KnownDefaultValues).get.split('|').toList
knownDefaults = knownDefaults ::: customDefaults
}

keys.foreach(
key => {
// Don't add system properties to the schema

var documentSchemaProperty = config.getOrElse[String](CosmosDBConfig.SchemaPropertyColumn, CosmosDBConfig.DefaultSchemaPropertyColumn)

var systemProperties = List("_rid", "id", "_self", "_etag", "_attachments", "_ts");
systemProperties = documentSchemaProperty :: systemProperties

if(!systemProperties.contains(key)) {

var defaultVal : Object = null
var schemaType = "String"
val value = document.get(key.toString)
// defaultVal = value

if(knownDefaults.contains(value) || value == null) {
// Currently adding only known default values
defaultVal = value
}

if(value != null) {
val typeClass = value.getClass().toString.split('.').last;
schemaType = typeClass
}
schemaCols += new ItemColumn(key.toString, schemaType, defaultVal);
}
}
)
schemaDocument = new ItemSchema(schemaCols.toArray, schemaType);
val schemaDoc = new Document(JacksonWrapper.serialize(schemaDocument))

schemaDoc.set(partitionKeyProperty,"__schema__" + schemaType)
try {
logInfo("Writing schema")
connection.insertDocument(connection.collectionLink, schemaDoc, null);

logInfo("Successfully wrote schema" + schemaDoc)
}
catch {
// In case, the schema document already exists, then read the existing schema document

case ex : DocumentClientException => if (ex.getStatusCode == 409){
schemaDocument = null

val maxSchemaReadTime = 5000
var startTime = System.currentTimeMillis()
var elapsed : Long = 0

while(schemaDocument == null && elapsed < maxSchemaReadTime){
logInfo("Schema already present. Retrieving from collection.")
schemaDocument = connection.readSchema(config.get[String](CosmosDBConfig.SchemaType).get);
elapsed = System.currentTimeMillis() - startTime
}

if(schemaDocument == null){
throw new Exception("Unable to fetch schemaDocument after multiple attempts")
}

logInfo("Successfully retrieved schema from collection" + new Document(JacksonWrapper.serialize(schemaDocument)))
}
else {
throw new Exception("Unable to insert the schemaDocument", ex)
}

case ex : Throwable => throw ex
}

schemaWriteRequired = false
}

if(config.get[String](CosmosDBConfig.SchemaType).isDefined){
executePreSave(schemaDocument, config.getOrElse[String](CosmosDBConfig.SchemaPropertyColumn, CosmosDBConfig.DefaultSchemaPropertyColumn), config.get[String](CosmosDBConfig.IgnoreSchemaDefaults), document);
}

documents.add(document.toJson())
if (documents.size() >= writingBatchSize) {
bulkImportResponse = importer.importAll(documents, upsert, false, maxConcurrencyPerPartitionRange)
Expand Down Expand Up @@ -400,6 +514,7 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
iterator
}


private def savePartition[D: ClassTag](iter: Iterator[D],
config: Config,
partitionCount: Int,
Expand All @@ -408,6 +523,32 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
savePartition(connection, iter, config, partitionCount, offerThroughput)
}


private def executePreSave(schemaDocument : ItemSchema, documentSchemaProperty: String, ignoreDefaults : Option[String], item : Document): Unit =
{
// Add the schema property to the document
item.set(documentSchemaProperty, schemaDocument.schemaType)
var skipDefaults = false

if(ignoreDefaults.isDefined && ignoreDefaults.get.toBoolean){
skipDefaults = true
}

if(!skipDefaults) {
var docColumns = item.getHashMap().keySet().toArray();
var schemaColumns = schemaDocument.columns.map(col => (col.name, col.defaultValue));

//Remove columns from the document which have the same value as the defaultValue
schemaColumns.foreach(
col => if (docColumns.contains(col._1)) {
if (item.get(col._1) == col._2) {
item.remove(col._1)
}
}
)
}
}

private def savePartition[D: ClassTag](connection: CosmosDBConnection,
iter: Iterator[D],
config: Config,
Expand Down Expand Up @@ -445,6 +586,8 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
toInt
val partitionKeyDefinition = config
.get[String](CosmosDBConfig.PartitionKeyDefinition)
val documentSchemaProperty = config
.getOrElse[String](CosmosDBConfig.SchemaPropertyColumn, CosmosDBConfig.DefaultSchemaPropertyColumn)

val maxConcurrencyPerPartitionRangeStr = config.get[String](CosmosDBConfig.BulkImportMaxConcurrencyPerPartitionRange)
val maxConcurrencyPerPartitionRange = if (maxConcurrencyPerPartitionRangeStr.nonEmpty)
Expand All @@ -465,7 +608,8 @@ object CosmosDBSpark extends CosmosDBLoggingTrait {
} else if (isBulkImporting) {
logDebug(s"Writing partition with bulk import")
bulkImport(iter, connection, offerThroughput, writingBatchSize, rootPropertyToSave,
partitionKeyDefinition, upsert, maxConcurrencyPerPartitionRange)
partitionKeyDefinition, upsert, maxConcurrencyPerPartitionRange, config, executePreSave)

} else {
logDebug(s"Writing partition with rxjava")
asyncConnection.importWithRxJava(iter, asyncConnection, writingBatchSize, writingBatchDelayMs, rootPropertyToSave, upsert)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.microsoft.azure.cosmosdb.spark


/**
* Class encapsulating the schema for a document type.
*/
case class ItemSchema (columns : Array[ItemColumn], schemaType : String)

case class ItemColumn(name: String, dataType : String, defaultValue : Object )
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ object CosmosDBConfig {

val ApplicationName = "application_name"

val SchemaType = "schemaType"
val KnownDefaultValues = "knownDefaultValues"
val SchemaPropertyColumn = "schemapropertycolumn"
val IgnoreSchemaDefaults = "ignoreschemadefaults"

// When the streaming source is slow, there will be times when getting data from a specific continuation token
// returns no results and therefore no information on the next continuation token set is available.
// In those cases, the connector gives a delay and then trigger the next batch.
Expand Down Expand Up @@ -169,6 +174,8 @@ object CosmosDBConfig {

val DefaultMaxConnectionPoolSize = 500

val DefaultSchemaPropertyColumn = "documentSchema"

def parseParameters(parameters: Map[String, String]): Map[String, Any] = {
return parameters.map { case (x, v) => x -> v }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,10 @@ class CosmosDBPartitioner() extends Partitioner[Partition] with CosmosDBLoggingT
partitions.toArray
} else {
// CosmosDB source
var query: String = FilterConverter.createQueryString(requiredColumns, filters)
val schemaTypeName = config.get[String](CosmosDBConfig.SchemaType)
val documentSchemaProperty = config.getOrElse[String](CosmosDBConfig.SchemaPropertyColumn, CosmosDBConfig.DefaultSchemaPropertyColumn)

var query: String = FilterConverter.createQueryString(requiredColumns, filters, schemaTypeName, documentSchemaProperty)
var partitionKeyRanges = connection.getAllPartitions(query)
logDebug(s"CosmosDBPartitioner: This CosmosDB has ${partitionKeyRanges.length} partitions")
Array.tabulate(partitionKeyRanges.length) {
Expand Down
Loading