[FLINK-38889][pipeline][kafka] Support serializing complex types(MAP, ARRAY, ROW) to JSON (Debezium / Canal)#4221
Conversation
yuxiqian
left a comment
There was a problem hiding this comment.
Thanks Skyler for the quick fix! Just left some trivial comments.
… ARRAY, ROW) to JSON (Debezium / Canal)
701a1c7 to
8ab67e5
Compare
|
Thanks @yuxiqian for the review! I have made the changes as suggested. Since the suggestions focused on code formatting, I force-pushed the code to make it clearer. PTAL. |
|
Thanks for the quick response! Just pushed another commit to simplify IT case and docs style. Would @lvyanquan like to take another look? |
There was a problem hiding this comment.
Pull request overview
This PR adds support for serializing complex types (MAP, ARRAY, ROW) to JSON format in the Kafka sink connector for both Debezium and Canal JSON formats. Previously, only the Kafka SQL connector supported these complex types, while the YAML-configured Kafka sink connector would fail when encountering them.
Changes:
- Refactored type conversion logic from TableSchemaInfo into a new RecordDataConverter utility class
- Added conversion support for ARRAY, MAP, and ROW types with recursive handling for nested structures
- Added comprehensive test coverage including unit tests and integration tests for various complex type scenarios
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| RecordDataConverter.java | New utility class that handles conversion of CDC RecordData to Flink SQL RowData, including support for complex types (ARRAY, MAP, ROW) with recursive nesting |
| TableSchemaInfo.java | Refactored to delegate field getter creation to RecordDataConverter, removing duplicate conversion logic |
| TableSchemaInfoTest.java | Added test for nested ROW types within ARRAY to verify complex type conversion |
| DebeziumJsonSerializationSchemaTest.java | Added test to verify Debezium JSON serialization of complex types |
| CanalJsonSerializationSchemaTest.java | Added test to verify Canal JSON serialization of complex types |
| KafkaDataSinkITCase.java | Added comprehensive integration tests covering basic complex types, nested arrays, maps with array values, null/empty collections, and deeply nested structures |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
lvyanquan
left a comment
There was a problem hiding this comment.
+1.
It would be even better if you could update the documentation to reflect this change.
yuxiqian
left a comment
There was a problem hiding this comment.
LGTM. Would @linguoxuan like to append docs in this PR, or open an individual PR for it?
|
Merged. Feel free to open a follow-up PR to update the documentation. |
I will do it. |
… ARRAY, ROW) to JSON (Debezium / Canal) (apache#4221) Co-authored-by: guoxuanlin <guoxuanlin@tencent.com> Co-authored-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
This closes FLINK-38889.
Purpose
This PR fixes the issue where YAML Kafka sink connector does not support serializing complex types (MAP, ARRAY, ROW) to JSON format (Debezium / Canal), while Kafka SQL connector handles them without problem.
Root Cause
The issue was in the TableSchemaInfo class, which is responsible for converting CDC's RecordData format to Flink's RowData format before JSON serialization. The createFieldGetter() method lacked the necessary conversion logic for complex types.
Changes
Testing