Skip to content

Commit b999e3f

Browse files
xinlian12Copilot
andcommitted
Remove change feed streaming from AAD/MSI notebook
The MSI notebook shares a cluster with basicScenario, and the Cosmos client cache retains references from the first notebook's proactive connection init. When basicScenario drops the source container during cleanup, the MSI notebook's change feed streaming fails with 404 on the cached (now-deleted) container. The change feed streaming test in basicScenario already provides sufficient coverage for the ChangeFeedInitialOffsetWriter code paths. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 51bb567 commit b999e3f

1 file changed

Lines changed: 0 additions & 34 deletions

File tree

sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenarioAadManagedIdentity.scala

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -96,39 +96,5 @@ df.filter(col("isAlive") === true)
9696

9797
// COMMAND ----------
9898

99-
// Change Feed - micro-batch structured streaming
100-
// This exercises the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths
101-
// that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+)
102-
103-
val changeFeedCfg = cfg ++ Map(
104-
"spark.cosmos.read.inferSchema.enabled" -> "false",
105-
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
106-
"spark.cosmos.changeFeed.mode" -> "Incremental"
107-
)
108-
109-
val testId = java.util.UUID.randomUUID().toString.replace("-", "")
110-
111-
val changeFeedDF = spark
112-
.readStream
113-
.format("cosmos.oltp.changeFeed")
114-
.options(changeFeedCfg)
115-
.load()
116-
117-
val microBatchQuery = changeFeedDF
118-
.writeStream
119-
.format("memory")
120-
.queryName(testId)
121-
.outputMode("append")
122-
.start()
123-
124-
microBatchQuery.processAllAvailable()
125-
microBatchQuery.stop()
126-
127-
val sinkCount = spark.sql(s"SELECT * FROM $testId").count()
128-
println(s"Change Feed micro-batch streaming: $sinkCount records read via change feed")
129-
assert(sinkCount >= 2, s"Expected at least 2 records from change feed but found $sinkCount")
130-
131-
// COMMAND ----------
132-
13399
// cleanup
134100
spark.sql(s"DROP TABLE cosmosCatalogMI.${cosmosDatabaseName}.${cosmosContainerName};")

0 commit comments

Comments
 (0)