Skip to content

Commit 11bfba7

Browse files
xinlian12Copilot
andcommitted
Remove change feed streaming scenarios from Databricks notebooks
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 880994d commit 11bfba7

2 files changed

Lines changed: 0 additions & 80 deletions

File tree

sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenario.scala

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -111,43 +111,5 @@ df.filter(col("isAlive") === true)
111111

112112
// COMMAND ----------
113113

114-
// Change Feed - micro-batch structured streaming
115-
// This exercises the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths
116-
// that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+)
117-
118-
val changeFeedCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
119-
"spark.cosmos.accountKey" -> cosmosMasterKey,
120-
"spark.cosmos.database" -> cosmosDatabaseName,
121-
"spark.cosmos.container" -> cosmosContainerName,
122-
"spark.cosmos.read.inferSchema.enabled" -> "false",
123-
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
124-
"spark.cosmos.changeFeed.mode" -> "Incremental",
125-
"spark.cosmos.enforceNativeTransport" -> "true"
126-
)
127-
128-
val testId = java.util.UUID.randomUUID().toString.replace("-", "")
129-
130-
val changeFeedDF = spark
131-
.readStream
132-
.format("cosmos.oltp.changeFeed")
133-
.options(changeFeedCfg)
134-
.load()
135-
136-
val microBatchQuery = changeFeedDF
137-
.writeStream
138-
.format("memory")
139-
.queryName(testId)
140-
.outputMode("append")
141-
.start()
142-
143-
microBatchQuery.processAllAvailable()
144-
microBatchQuery.stop()
145-
146-
val sinkCount = spark.sql(s"SELECT * FROM $testId").count()
147-
println(s"Change Feed micro-batch streaming: $sinkCount records read via change feed")
148-
assert(sinkCount >= 2, s"Expected at least 2 records from change feed but found $sinkCount")
149-
150-
// COMMAND ----------
151-
152114
// cleanup
153115
spark.sql(s"DROP TABLE cosmosCatalog.${cosmosDatabaseName}.${cosmosContainerName};")

sdk/cosmos/azure-cosmos-spark_3/test-databricks/notebooks/basicScenarioAadManagedIdentity.scala

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -96,47 +96,5 @@ df.filter(col("isAlive") === true)
9696

9797
// COMMAND ----------
9898

99-
// Change Feed - micro-batch structured streaming
100-
// This exercises the ChangeFeedInitialOffsetWriter and HDFSMetadataLog code paths
101-
// that can break on certain Spark distributions (e.g. Databricks Runtime 17.3+)
102-
103-
val changeFeedCfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
104-
"spark.cosmos.auth.type" -> authType,
105-
"spark.cosmos.account.subscriptionId" -> subscriptionId,
106-
"spark.cosmos.account.tenantId" -> tenantId,
107-
"spark.cosmos.account.resourceGroupName" -> resourceGroupName,
108-
"spark.cosmos.database" -> cosmosDatabaseName,
109-
"spark.cosmos.container" -> cosmosContainerName,
110-
"spark.cosmos.read.inferSchema.enabled" -> "false",
111-
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
112-
"spark.cosmos.changeFeed.mode" -> "Incremental",
113-
"spark.cosmos.enforceNativeTransport" -> "true",
114-
"spark.cosmos.read.consistencyStrategy" -> "LatestCommitted",
115-
)
116-
117-
val testId = java.util.UUID.randomUUID().toString.replace("-", "")
118-
119-
val changeFeedDF = spark
120-
.readStream
121-
.format("cosmos.oltp.changeFeed")
122-
.options(changeFeedCfg)
123-
.load()
124-
125-
val microBatchQuery = changeFeedDF
126-
.writeStream
127-
.format("memory")
128-
.queryName(testId)
129-
.outputMode("append")
130-
.start()
131-
132-
microBatchQuery.processAllAvailable()
133-
microBatchQuery.stop()
134-
135-
val sinkCount = spark.sql(s"SELECT * FROM $testId").count()
136-
println(s"Change Feed micro-batch streaming: $sinkCount records read via change feed")
137-
assert(sinkCount >= 2, s"Expected at least 2 records from change feed but found $sinkCount")
138-
139-
// COMMAND ----------
140-
14199
// cleanup
142100
spark.sql(s"DROP TABLE cosmosCatalogMI.${cosmosDatabaseName}.${cosmosContainerName};")

0 commit comments

Comments
 (0)