[FLINK-38691] [cdc connector mysql] Support for MySQL Transaction Boundary Events in Flink CDC Connector#4170
Conversation
ruanhang1993
left a comment
There was a problem hiding this comment.
@tejanshrana Thanks for the PR.
LGTM totally. @lvyanquan Could you help to review and see if it could be included in 3.6 version?
There was a problem hiding this comment.
Pull request overview
This pull request adds support for MySQL transaction boundary events (BEGIN/END) in the Flink CDC MySQL connector. Previously, when Debezium's provide.transaction.metadata was enabled, transaction metadata events were logged as "unknown" and skipped, causing loss of transaction boundary information.
Changes:
- Added detection logic for transaction metadata events using the Debezium schema name pattern
- Transaction events now update binlog offsets regardless of configuration
- Added optional emission of transaction events controlled by
includeTransactionMetadataEventsflag (default: false)
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| RecordUtils.java | Added constant and detection method for transaction metadata events using keySchema pattern |
| MySqlRecordEmitter.java | Added conditional processing and emission logic for transaction metadata events |
| MySqlSourceConfig.java | Added includeTransactionMetadataEvents configuration field |
| MySqlSourceConfigFactory.java | Added builder method and Debezium property configuration for transaction metadata |
| MySqlSourceBuilder.java | Added public API method to configure transaction metadata event inclusion |
| MySqlSource.java | Updated MySqlRecordEmitter instantiation to pass transaction metadata flag |
| MySqlPipelineRecordEmitter.java | Explicitly disabled transaction metadata events for pipeline connector |
| MySqlRecordEmitterTest.java | Added comprehensive test coverage for transaction metadata event handling |
| MySqlSourceReaderTest.java | Updated test fixtures to pass new constructor parameter |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
lvyanquan
left a comment
There was a problem hiding this comment.
LGTM, left one minor suggestion.
|
Thanks @tejanshrana's contribution, merged. |
…ndary Events in Flink CDC Connector (apache#4170)
Transaction BEGIN/END events (when provide.transaction.metadata is enabled) are currently treated as unknown and skipped, causing loss of transaction boundaries.
Solution
Logs from before the change:
2025-11-04 14:52:28 2025-11-04 14:52:28,827 INFO org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter [] - Meet unknown element SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=4541146d-b988-11f0-87f6-0242ac140006:33, ts_sec=1762267948, file=mysql-bin.000003, pos=13381, gtids=4541146d-b988-11f0-87f6-0242ac140006:1-32, server_id=1}} ConnectRecord{topic='mysql_binlog_source.transaction', kafkaPartition=null, key=Struct{id=4541146d-b988-11f0-87f6-0242ac140006:33}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=BEGIN,id=4541146d-b988-11f0-87f6-0242ac140006:33}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, just skip.After the change, the transaction start and end events look like below:
{"status":"BEGIN","id":"4541146d-b988-11f0-87f6-0242ac140006:47","event_count":null,"data_collections":null} {"status":"END","id":"4541146d-b988-11f0-87f6-0242ac140006:47","event_count":1,"data_collections":[{"data_collection":"sample.sample","event_count":1}]}