Skip to content

Commit 22a638b

Browse files
committed
address the comment
1 parent b9015c6 commit 22a638b

2 files changed

Lines changed: 129 additions & 65 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java

Lines changed: 25 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -411,68 +411,30 @@ void testSerializeWithSchemaComplexTypes() throws Exception {
411411
byte[] serialized = serializationSchema.serialize(insertEvent);
412412
JsonNode actual = mapper.readTree(serialized);
413413

414-
// Verify the schema is present
415-
assertThat(actual.has("schema")).isTrue();
416-
assertThat(actual.has("payload")).isTrue();
417-
418-
JsonNode schemaNode = actual.get("schema");
419-
assertThat(schemaNode.has("fields")).isTrue();
420-
421-
// Get before/after schema fields
422-
JsonNode fields = schemaNode.get("fields");
423-
JsonNode afterSchema = null;
424-
for (JsonNode field : fields) {
425-
if ("after".equals(field.get("field").asText())) {
426-
afterSchema = field;
427-
break;
428-
}
429-
}
430-
assertThat(afterSchema).isNotNull();
431-
432-
// Verify schema contains complex type definitions
433-
JsonNode afterFields = afterSchema.get("fields");
434-
assertThat(afterFields).isNotNull();
435-
436-
// Find and verify array schema
437-
JsonNode arrSchema = null;
438-
JsonNode mapSchema = null;
439-
JsonNode rowSchema = null;
440-
for (JsonNode f : afterFields) {
441-
String fieldName = f.get("field").asText();
442-
if ("arr".equals(fieldName)) {
443-
arrSchema = f;
444-
} else if ("map".equals(fieldName)) {
445-
mapSchema = f;
446-
} else if ("row".equals(fieldName)) {
447-
rowSchema = f;
448-
}
449-
}
450-
451-
// Verify array schema type
452-
assertThat(arrSchema).isNotNull();
453-
assertThat(arrSchema.get("type").asText()).isEqualTo("array");
454-
assertThat(arrSchema.has("items")).isTrue();
455-
assertThat(arrSchema.get("items").get("type").asText()).isEqualTo("string");
456-
457-
// Verify map schema type
458-
assertThat(mapSchema).isNotNull();
459-
assertThat(mapSchema.get("type").asText()).isEqualTo("map");
460-
assertThat(mapSchema.has("keys")).isTrue();
461-
assertThat(mapSchema.has("values")).isTrue();
462-
assertThat(mapSchema.get("keys").get("type").asText()).isEqualTo("string");
463-
assertThat(mapSchema.get("values").get("type").asText()).isEqualTo("int32");
464-
465-
// Verify row schema type (struct)
466-
assertThat(rowSchema).isNotNull();
467-
assertThat(rowSchema.get("type").asText()).isEqualTo("struct");
468-
assertThat(rowSchema.has("fields")).isTrue();
469-
470-
// Verify payload data
471-
JsonNode payload = actual.get("payload");
472-
assertThat(payload.has("after")).isTrue();
473-
JsonNode afterData = payload.get("after");
474-
assertThat(afterData.has("arr")).isTrue();
475-
assertThat(afterData.has("map")).isTrue();
476-
assertThat(afterData.has("row")).isTrue();
414+
JsonNode expected =
415+
mapper.readTree(
416+
"{\"schema\":{\"type\":\"struct\",\"fields\":["
417+
+ "{\"type\":\"struct\",\"fields\":["
418+
+ "{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
419+
+ "{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
420+
+ "{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
421+
+ "{\"type\":\"struct\",\"fields\":["
422+
+ "{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
423+
+ "{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
424+
+ "],\"optional\":true,\"field\":\"row\"}"
425+
+ "],\"optional\":true,\"field\":\"before\"},"
426+
+ "{\"type\":\"struct\",\"fields\":["
427+
+ "{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
428+
+ "{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
429+
+ "{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
430+
+ "{\"type\":\"struct\",\"fields\":["
431+
+ "{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
432+
+ "{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
433+
+ "],\"optional\":true,\"field\":\"row\"}"
434+
+ "],\"optional\":true,\"field\":\"after\"}"
435+
+ "],\"optional\":false},"
436+
+ "\"payload\":{\"before\":null,\"after\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"table1\"}}}");
437+
438+
assertThat(actual).isEqualTo(expected);
477439
}
478440
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import java.util.concurrent.ExecutionException;
8686
import java.util.stream.Collectors;
8787

88+
import static org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
8889
import static org.apache.flink.util.DockerImageVersions.KAFKA;
8990
import static org.assertj.core.api.Assertions.assertThat;
9091
import static org.assertj.core.api.Assertions.fail;
@@ -710,6 +711,16 @@ void runGenericComplexTypeSerializationTest(
710711
List<Event> eventsToSerialize,
711712
List<String> expectedJson)
712713
throws Exception {
714+
runGenericComplexTypeSerializationTest(
715+
serializationType, eventsToSerialize, expectedJson, false);
716+
}
717+
718+
void runGenericComplexTypeSerializationTest(
719+
JsonSerializationType serializationType,
720+
List<Event> eventsToSerialize,
721+
List<String> expectedJson,
722+
boolean includeSchema)
723+
throws Exception {
713724
try (StreamExecutionEnvironment env = new LocalStreamEnvironment()) {
714725
env.enableCheckpointing(1000L);
715726
env.setRestartStrategy(RestartStrategies.noRestart());
@@ -726,6 +737,9 @@ void runGenericComplexTypeSerializationTest(
726737
KafkaDataSinkOptions.VALUE_FORMAT.key(),
727738
JsonSerializationType.CANAL_JSON.toString());
728739
}
740+
if (includeSchema) {
741+
config.put(DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key(), "true");
742+
}
729743
source.sinkTo(
730744
((FlinkSinkProvider)
731745
(new KafkaDataSinkFactory()
@@ -757,8 +771,9 @@ void runGenericComplexTypeSerializationTest(
757771
}
758772
})
759773
.collect(Collectors.toList());
760-
assertThat(deserializeValues(collectedRecords))
761-
.containsExactlyElementsOf(expectedJsonNodes);
774+
List<JsonNode> actualJsonNodes = deserializeValues(collectedRecords);
775+
776+
assertThat(actualJsonNodes).containsExactlyElementsOf(expectedJsonNodes);
762777
checkProducerLeak();
763778
}
764779

@@ -1072,4 +1087,91 @@ void testDeepNestedStructureSerialization(JsonSerializationType type) throws Exc
10721087
}
10731088
runGenericComplexTypeSerializationTest(type, eventsToSerialize, expectedOutput);
10741089
}
1090+
1091+
@Test
1092+
void testDebeziumJsonWithSchemaComplexTypes() throws Exception {
1093+
Schema schema =
1094+
Schema.newBuilder()
1095+
.physicalColumn("id", DataTypes.INT())
1096+
.physicalColumn("arr", DataTypes.ARRAY(DataTypes.STRING()))
1097+
.physicalColumn("map", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
1098+
.physicalColumn(
1099+
"row",
1100+
DataTypes.ROW(
1101+
DataTypes.FIELD("f1", DataTypes.STRING()),
1102+
DataTypes.FIELD("f2", DataTypes.INT())))
1103+
.primaryKey("id")
1104+
.build();
1105+
BinaryRecordDataGenerator generator =
1106+
new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
1107+
BinaryRecordDataGenerator nestedRowGenerator =
1108+
new BinaryRecordDataGenerator(
1109+
((RowType) (schema.getColumn("row").get().getType()))
1110+
.getFieldTypes()
1111+
.toArray(new DataType[0]));
1112+
1113+
BinaryRecordData recordData =
1114+
generator.generate(
1115+
new Object[] {
1116+
1,
1117+
new GenericArrayData(
1118+
new Object[] {
1119+
BinaryStringData.fromString("item1"),
1120+
BinaryStringData.fromString("item2")
1121+
}),
1122+
new GenericMapData(
1123+
Map.of(
1124+
BinaryStringData.fromString("key1"), 100,
1125+
BinaryStringData.fromString("key2"), 200)),
1126+
nestedRowGenerator.generate(
1127+
new Object[] {BinaryStringData.fromString("nested"), 42})
1128+
});
1129+
1130+
List<Event> eventsToSerialize =
1131+
List.of(
1132+
new CreateTableEvent(table1, schema),
1133+
DataChangeEvent.insertEvent(table1, recordData),
1134+
DataChangeEvent.updateEvent(table1, recordData, recordData),
1135+
DataChangeEvent.deleteEvent(table1, recordData));
1136+
1137+
String schemaJson =
1138+
"{\"type\":\"struct\",\"fields\":["
1139+
+ "{\"type\":\"struct\",\"fields\":["
1140+
+ "{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
1141+
+ "{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
1142+
+ "{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
1143+
+ "{\"type\":\"struct\",\"fields\":["
1144+
+ "{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
1145+
+ "{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
1146+
+ "],\"optional\":true,\"field\":\"row\"}"
1147+
+ "],\"optional\":true,\"field\":\"before\"},"
1148+
+ "{\"type\":\"struct\",\"fields\":["
1149+
+ "{\"type\":\"int32\",\"optional\":true,\"field\":\"id\"},"
1150+
+ "{\"type\":\"array\",\"items\":{\"type\":\"string\",\"optional\":false},\"optional\":true,\"field\":\"arr\"},"
1151+
+ "{\"type\":\"map\",\"keys\":{\"type\":\"string\",\"optional\":false},\"values\":{\"type\":\"int32\",\"optional\":false},\"optional\":true,\"field\":\"map\"},"
1152+
+ "{\"type\":\"struct\",\"fields\":["
1153+
+ "{\"type\":\"string\",\"optional\":false,\"field\":\"f1\"},"
1154+
+ "{\"type\":\"int32\",\"optional\":false,\"field\":\"f2\"}"
1155+
+ "],\"optional\":true,\"field\":\"row\"}"
1156+
+ "],\"optional\":true,\"field\":\"after\"}"
1157+
+ "],\"optional\":false}";
1158+
1159+
List<String> expectedJsonWithSchema =
1160+
List.of(
1161+
"{\"schema\":"
1162+
+ schemaJson
1163+
+ ",\"payload\":{\"before\":null,\"after\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"op\":\"c\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}}",
1164+
"{\"schema\":"
1165+
+ schemaJson
1166+
+ ",\"payload\":{\"before\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"after\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"op\":\"u\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}}",
1167+
"{\"schema\":"
1168+
+ schemaJson
1169+
+ ",\"payload\":{\"before\":{\"id\":1,\"arr\":[\"item1\",\"item2\"],\"map\":{\"key1\":100,\"key2\":200},\"row\":{\"f1\":\"nested\",\"f2\":42}},\"after\":null,\"op\":\"d\",\"source\":{\"db\":\"default_schema\",\"table\":\"%s\"}}}");
1170+
1171+
runGenericComplexTypeSerializationTest(
1172+
JsonSerializationType.DEBEZIUM_JSON,
1173+
eventsToSerialize,
1174+
expectedJsonWithSchema,
1175+
true);
1176+
}
10751177
}

0 commit comments

Comments
 (0)