-
Couldn't load subscription status.
- Fork 1.9k
Description
Bug
Which Delta project/connector is this regarding?
- Spark
- Standalone
- Flink
- Kernel
- Other (fill in here)
Describe the problem
I'm trying to stream CDC for a table that has columnMapping property enabled. When I first stream the CDC for this table before the ColumMapping property was added, the stream works as expected. However, after setting the ColumnMapping property, the stream fails. I'm using delta-sharing-spark_2.12:3.3.0, which should be supporting this column mapping.
Steps to reproduce
- Create a small table on Databricks 16.4 LTS and enable deltasharing.
- Stream the CDC using open source delta-sharing-spark_2.12 version 3.3.0 connector.
- Ensure streaming works as expected and stop streaming query.
- Enable ColumnMapping property on shared table in Databricks and drop few columns.
- Run streaming query again.
**val df = spark.readStream.format("deltaSharing")
.option("readChangeFeed", value = true)
.option("responseFormat", "delta")
.option("schemaTrackingLocation", CheckPointLocation)
.load(DeltaShareConfigFile+DeltaShareEnv+"."+DeltaShareTable)
df.writeStream
.foreachBatch(writeToSQLDB _)
.outputMode("update")
.option("checkpointLocation", CheckPointLocation)
.trigger(Trigger.AvailableNow())
.start()
.awaitTermination()**
Observed results
When I first try to run the streaming query, I get the following error:
User class threw exception: io.delta.sharing.client.util.UnexpectedHttpStatus: HTTP request failed with status: HTTP/1.1 400 Bad Request for query(None). {"error_code":"INVALID_PARAMETER_VALUE","message":"DS_UNSUPPORTED_DELTA_TABLE_FEATURES: Table features delta.columnMapping.mode are found in table version: 4. historyShared:true, startVersion: 0. For ColumnMapping, use DBR with version 14.1(14.2 for CDF and streaming) or higher, or delta-sharing-spark with version 3.1 or higher, and set option ("responseFormat", "delta") to query the table. Or use delta_sharing python connector with version 1.1 or higher.","details":[{"@type":"type.googleapis.com/google.rpc.ErrorInfo","reason":"DS_UNSUPPORTED_DELTA_TABLE_FEATURES","domain":"data-sharing.databricks.com","metadata":{"historySharingStatusStr":" historyShared:true, startVersion: 0.","tableFeatures":"delta.columnMapping.mode","optionStr":"For ColumnMapping, use DBR with version 14.1(14.2 for CDF and streaming) or higher, or delta-sharing-spark with version 3.1 or higher, and set option ("responseFormat", "delta") to query the table. Or use delta_sharing python connector with version 1.1 or higher.","versionStr":" version: 4.","dsError":"DS_UNSUPPORTED_DELTA_TABLE_FEATURES"}}]}
However, when I add the responseFormat, "delta" to the streaming query, I get the following error:
User class threw exception: java.lang.UnsupportedOperationException: Delta sharing cdc streaming is not supported when responseforma=delta.
at io.delta.sharing.spark.DeltaSharingDataSource.sourceSchema(DeltaSharingDataSource.scala:94)
Expected results
CDF Streaming query to run successfully and get change records with the ColumMapping property enabled.
Further details
Environment information
- Delta Lake version: 3.3.0
- Spark version: 3.5.1
- Scala version: 2.12
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- Yes. I can contribute a fix for this bug independently.
- Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- No. I cannot contribute a bug fix at this time.