Skip to content

Commit 90ca3f6

Browse files
xinlian12Copilot
andcommitted
Add change feed micro-batch streaming scenarios to Databricks live test notebooks
Add structured streaming scenarios using cosmos.oltp.changeFeed to both basicScenario.scala and basicScenarioAadManagedIdentity.scala notebooks. These scenarios exercise the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+). Each scenario: - Creates a sink container - Reads change feed from source via readStream with micro-batch - Writes to sink container via writeStream - Validates records were copied - Cleans up both containers Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 43ad5d5 commit 90ca3f6

2 files changed

Lines changed: 140 additions & 0 deletions

File tree

sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenario.scala

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,5 +111,69 @@ df.filter(col("isAlive") === true)
111111

112112
// COMMAND ----------
113113

114+
// Change Feed - micro-batch structured streaming
115+
// This exercises the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths
116+
// that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+)
117+
118+
import org.apache.spark.sql.streaming.Trigger
119+
120+
val sinkContainerName = cosmosContainerName + "Sink"
121+
spark.sql(s"CREATE TABLE IF NOT EXISTS cosmosCatalog.${cosmosDatabaseName}.${sinkContainerName} using cosmos.oltp " +
122+
s"TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '400')")
123+
124+
val changeFeedCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
125+
"spark.cosmos.accountKey" -> cosmosMasterKey,
126+
"spark.cosmos.database" -> cosmosDatabaseName,
127+
"spark.cosmos.container" -> cosmosContainerName,
128+
"spark.cosmos.read.inferSchema.enabled" -> "false",
129+
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
130+
"spark.cosmos.changeFeed.mode" -> "Incremental",
131+
"spark.cosmos.enforceNativeTransport" -> "true"
132+
)
133+
134+
val writeCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
135+
"spark.cosmos.accountKey" -> cosmosMasterKey,
136+
"spark.cosmos.database" -> cosmosDatabaseName,
137+
"spark.cosmos.container" -> sinkContainerName,
138+
"spark.cosmos.write.strategy" -> "ItemOverwrite",
139+
"spark.cosmos.write.bulk.enabled" -> "true",
140+
"spark.cosmos.enforceNativeTransport" -> "true"
141+
)
142+
143+
val testId = java.util.UUID.randomUUID().toString.replace("-", "")
144+
145+
val changeFeedDF = spark
146+
.readStream
147+
.format("cosmos.oltp.changeFeed")
148+
.options(changeFeedCfg)
149+
.load()
150+
151+
val microBatchQuery = changeFeedDF
152+
.writeStream
153+
.format("cosmos.oltp")
154+
.queryName(testId)
155+
.options(writeCfg)
156+
.option("checkpointLocation", s"/tmp/$testId/")
157+
.outputMode("append")
158+
.start()
159+
160+
microBatchQuery.processAllAvailable()
161+
162+
val sinkCount = spark.read.format("cosmos.oltp").options(Map(
163+
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
164+
"spark.cosmos.accountKey" -> cosmosMasterKey,
165+
"spark.cosmos.database" -> cosmosDatabaseName,
166+
"spark.cosmos.container" -> sinkContainerName,
167+
"spark.cosmos.enforceNativeTransport" -> "true"
168+
)).load().count()
169+
170+
println(s"Change Feed micro-batch streaming: $sinkCount records copied to sink container")
171+
assert(sinkCount >= 2, s"Expected at least 2 records in sink container but found $sinkCount")
172+
173+
microBatchQuery.stop()
174+
175+
// COMMAND ----------
176+
114177
// cleanup
178+
spark.sql(s"DROP TABLE cosmosCatalog.${cosmosDatabaseName}.${sinkContainerName};")
115179
spark.sql(s"DROP TABLE cosmosCatalog.${cosmosDatabaseName}.${cosmosContainerName};")

sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenarioAadManagedIdentity.scala

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,5 +96,81 @@ df.filter(col("isAlive") === true)
9696

9797
// COMMAND ----------
9898

99+
// Change Feed - micro-batch structured streaming
100+
// This exercises the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths
101+
// that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+)
102+
103+
import org.apache.spark.sql.streaming.Trigger
104+
105+
val sinkContainerName = cosmosContainerName + "Sink"
106+
spark.sql(s"CREATE TABLE IF NOT EXISTS cosmosCatalogMI.${cosmosDatabaseName}.${sinkContainerName} using cosmos.oltp " +
107+
s"TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '400')")
108+
109+
val changeFeedCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
110+
"spark.cosmos.auth.type" -> authType,
111+
"spark.cosmos.account.subscriptionId" -> subscriptionId,
112+
"spark.cosmos.account.tenantId" -> tenantId,
113+
"spark.cosmos.account.resourceGroupName" -> resourceGroupName,
114+
"spark.cosmos.database" -> cosmosDatabaseName,
115+
"spark.cosmos.container" -> cosmosContainerName,
116+
"spark.cosmos.read.inferSchema.enabled" -> "false",
117+
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
118+
"spark.cosmos.changeFeed.mode" -> "Incremental",
119+
"spark.cosmos.enforceNativeTransport" -> "true",
120+
"spark.cosmos.read.consistencyStrategy" -> "LatestCommitted",
121+
)
122+
123+
val writeCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
124+
"spark.cosmos.auth.type" -> authType,
125+
"spark.cosmos.account.subscriptionId" -> subscriptionId,
126+
"spark.cosmos.account.tenantId" -> tenantId,
127+
"spark.cosmos.account.resourceGroupName" -> resourceGroupName,
128+
"spark.cosmos.database" -> cosmosDatabaseName,
129+
"spark.cosmos.container" -> sinkContainerName,
130+
"spark.cosmos.write.strategy" -> "ItemOverwrite",
131+
"spark.cosmos.write.bulk.enabled" -> "true",
132+
"spark.cosmos.enforceNativeTransport" -> "true",
133+
"spark.cosmos.read.consistencyStrategy" -> "LatestCommitted",
134+
)
135+
136+
val testId = java.util.UUID.randomUUID().toString.replace("-", "")
137+
138+
val changeFeedDF = spark
139+
.readStream
140+
.format("cosmos.oltp.changeFeed")
141+
.options(changeFeedCfg)
142+
.load()
143+
144+
val microBatchQuery = changeFeedDF
145+
.writeStream
146+
.format("cosmos.oltp")
147+
.queryName(testId)
148+
.options(writeCfg)
149+
.option("checkpointLocation", s"/tmp/$testId/")
150+
.outputMode("append")
151+
.start()
152+
153+
microBatchQuery.processAllAvailable()
154+
155+
val sinkCount = spark.read.format("cosmos.oltp").options(Map(
156+
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
157+
"spark.cosmos.auth.type" -> authType,
158+
"spark.cosmos.account.subscriptionId" -> subscriptionId,
159+
"spark.cosmos.account.tenantId" -> tenantId,
160+
"spark.cosmos.account.resourceGroupName" -> resourceGroupName,
161+
"spark.cosmos.database" -> cosmosDatabaseName,
162+
"spark.cosmos.container" -> sinkContainerName,
163+
"spark.cosmos.enforceNativeTransport" -> "true",
164+
"spark.cosmos.read.consistencyStrategy" -> "LatestCommitted",
165+
)).load().count()
166+
167+
println(s"Change Feed micro-batch streaming: $sinkCount records copied to sink container")
168+
assert(sinkCount >= 2, s"Expected at least 2 records in sink container but found $sinkCount")
169+
170+
microBatchQuery.stop()
171+
172+
// COMMAND ----------
173+
99174
// cleanup
175+
spark.sql(s"DROP TABLE cosmosCatalogMI.${cosmosDatabaseName}.${sinkContainerName};")
100176
spark.sql(s"DROP TABLE cosmosCatalogMI.${cosmosDatabaseName}.${cosmosContainerName};")

0 commit comments

Comments
 (0)