Skip to content

Commit 880994d

Browse files
xinlian12Copilot
andcommitted
Simplify change feed streaming test to use memory sink
Replace cosmos.oltp sink with in-memory sink to eliminate the need for a separate sink container. This avoids 404 errors from sink container creation/resolution and removes checkpoint path concerns. The test still exercises the full ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths (readStream with cosmos.oltp.changeFeed), which is the goal for validating the MetadataVersionUtil fix. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 50e51e8 commit 880994d

2 files changed

Lines changed: 8 additions & 62 deletions

File tree

sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenario.scala

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,6 @@ df.filter(col("isAlive") === true)
115115
// This exercises the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths
116116
// that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+)
117117

118-
val sinkContainerName = cosmosContainerName + "Sink"
119-
spark.sql(s"CREATE TABLE IF NOT EXISTS cosmosCatalog.${cosmosDatabaseName}.${sinkContainerName} using cosmos.oltp " +
120-
s"TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '400')")
121-
122118
val changeFeedCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
123119
"spark.cosmos.accountKey" -> cosmosMasterKey,
124120
"spark.cosmos.database" -> cosmosDatabaseName,
@@ -129,15 +125,6 @@ val changeFeedCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
129125
"spark.cosmos.enforceNativeTransport" -> "true"
130126
)
131127

132-
val writeCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
133-
"spark.cosmos.accountKey" -> cosmosMasterKey,
134-
"spark.cosmos.database" -> cosmosDatabaseName,
135-
"spark.cosmos.container" -> sinkContainerName,
136-
"spark.cosmos.write.strategy" -> "ItemOverwrite",
137-
"spark.cosmos.write.bulk.enabled" -> "true",
138-
"spark.cosmos.enforceNativeTransport" -> "true"
139-
)
140-
141128
val testId = java.util.UUID.randomUUID().toString.replace("-", "")
142129

143130
val changeFeedDF = spark
@@ -148,29 +135,19 @@ val changeFeedDF = spark
148135

149136
val microBatchQuery = changeFeedDF
150137
.writeStream
151-
.format("cosmos.oltp")
138+
.format("memory")
152139
.queryName(testId)
153-
.options(writeCfg)
154-
.option("checkpointLocation", s"file:/tmp/$testId/")
155140
.outputMode("append")
156141
.start()
157142

158143
microBatchQuery.processAllAvailable()
159144
microBatchQuery.stop()
160145

161-
val sinkCount = spark.read.format("cosmos.oltp").options(Map(
162-
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
163-
"spark.cosmos.accountKey" -> cosmosMasterKey,
164-
"spark.cosmos.database" -> cosmosDatabaseName,
165-
"spark.cosmos.container" -> sinkContainerName,
166-
"spark.cosmos.enforceNativeTransport" -> "true"
167-
)).load().count()
168-
169-
println(s"Change Feed micro-batch streaming: $sinkCount records copied to sink container")
170-
assert(sinkCount >= 2, s"Expected at least 2 records in sink container but found $sinkCount")
146+
val sinkCount = spark.sql(s"SELECT * FROM $testId").count()
147+
println(s"Change Feed micro-batch streaming: $sinkCount records read via change feed")
148+
assert(sinkCount >= 2, s"Expected at least 2 records from change feed but found $sinkCount")
171149

172150
// COMMAND ----------
173151

174152
// cleanup
175-
spark.sql(s"DROP TABLE cosmosCatalog.${cosmosDatabaseName}.${sinkContainerName};")
176153
spark.sql(s"DROP TABLE cosmosCatalog.${cosmosDatabaseName}.${cosmosContainerName};")

sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenarioAadManagedIdentity.scala

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,6 @@ df.filter(col("isAlive") === true)
100100
// This exercises the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths
101101
// that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+)
102102

103-
val sinkContainerName = cosmosContainerName + "Sink"
104-
spark.sql(s"CREATE TABLE IF NOT EXISTS cosmosCatalogMI.${cosmosDatabaseName}.${sinkContainerName} using cosmos.oltp " +
105-
s"TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '400')")
106-
107103
val changeFeedCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
108104
"spark.cosmos.auth.type" -> authType,
109105
"spark.cosmos.account.subscriptionId" -> subscriptionId,
@@ -118,19 +114,6 @@ val changeFeedCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
118114
"spark.cosmos.read.consistencyStrategy" -> "LatestCommitted",
119115
)
120116

121-
val writeCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
122-
"spark.cosmos.auth.type" -> authType,
123-
"spark.cosmos.account.subscriptionId" -> subscriptionId,
124-
"spark.cosmos.account.tenantId" -> tenantId,
125-
"spark.cosmos.account.resourceGroupName" -> resourceGroupName,
126-
"spark.cosmos.database" -> cosmosDatabaseName,
127-
"spark.cosmos.container" -> sinkContainerName,
128-
"spark.cosmos.write.strategy" -> "ItemOverwrite",
129-
"spark.cosmos.write.bulk.enabled" -> "true",
130-
"spark.cosmos.enforceNativeTransport" -> "true",
131-
"spark.cosmos.read.consistencyStrategy" -> "LatestCommitted",
132-
)
133-
134117
val testId = java.util.UUID.randomUUID().toString.replace("-", "")
135118

136119
val changeFeedDF = spark
@@ -141,33 +124,19 @@ val changeFeedDF = spark
141124

142125
val microBatchQuery = changeFeedDF
143126
.writeStream
144-
.format("cosmos.oltp")
127+
.format("memory")
145128
.queryName(testId)
146-
.options(writeCfg)
147-
.option("checkpointLocation", s"file:/tmp/$testId/")
148129
.outputMode("append")
149130
.start()
150131

151132
microBatchQuery.processAllAvailable()
152133
microBatchQuery.stop()
153134

154-
val sinkCount = spark.read.format("cosmos.oltp").options(Map(
155-
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
156-
"spark.cosmos.auth.type" -> authType,
157-
"spark.cosmos.account.subscriptionId" -> subscriptionId,
158-
"spark.cosmos.account.tenantId" -> tenantId,
159-
"spark.cosmos.account.resourceGroupName" -> resourceGroupName,
160-
"spark.cosmos.database" -> cosmosDatabaseName,
161-
"spark.cosmos.container" -> sinkContainerName,
162-
"spark.cosmos.enforceNativeTransport" -> "true",
163-
"spark.cosmos.read.consistencyStrategy" -> "LatestCommitted",
164-
)).load().count()
165-
166-
println(s"Change Feed micro-batch streaming: $sinkCount records copied to sink container")
167-
assert(sinkCount >= 2, s"Expected at least 2 records in sink container but found $sinkCount")
135+
val sinkCount = spark.sql(s"SELECT * FROM $testId").count()
136+
println(s"Change Feed micro-batch streaming: $sinkCount records read via change feed")
137+
assert(sinkCount >= 2, s"Expected at least 2 records from change feed but found $sinkCount")
168138

169139
// COMMAND ----------
170140

171141
// cleanup
172-
spark.sql(s"DROP TABLE cosmosCatalogMI.${cosmosDatabaseName}.${sinkContainerName};")
173142
spark.sql(s"DROP TABLE cosmosCatalogMI.${cosmosDatabaseName}.${cosmosContainerName};")

0 commit comments

Comments
 (0)