Skip to content

Commit 9128a6b

Browse files
authored
[Kafka] Clean up code in the flink-cdc-pipeline-connector-kafka module that is compatible with older versions of Flink. (apache#4347)
Signed-off-by: Pei Yu <125331682@qq.com>
1 parent 6741c1d commit 9128a6b

File tree

6 files changed

+5
-154
lines changed

6 files changed

+5
-154
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.cdc.common.event.Event;
2323
import org.apache.flink.cdc.connectors.kafka.json.canal.CanalJsonSerializationSchema;
2424
import org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonSerializationSchema;
25-
import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
2625
import org.apache.flink.configuration.ReadableConfig;
2726
import org.apache.flink.formats.common.TimestampFormat;
2827
import org.apache.flink.formats.json.JsonFormatOptions;
@@ -70,7 +69,7 @@ public static SerializationSchema<Event> createSerializationSchema(
7069
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
7170

7271
final boolean ignoreNullFields =
73-
JsonRowDataSerializationSchemaUtils.enableIgnoreNullFields(formatOptions);
72+
formatOptions.get(JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS);
7473

7574
switch (type) {
7675
case DEBEZIUM_JSON:

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
2828
import org.apache.flink.cdc.common.utils.SchemaUtils;
2929
import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
30-
import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
3130
import org.apache.flink.formats.common.TimestampFormat;
3231
import org.apache.flink.formats.json.JsonFormatOptions;
3332
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
@@ -119,7 +118,7 @@ public byte[] serialize(Event event) {
119118
LogicalType rowType =
120119
DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType();
121120
JsonRowDataSerializationSchema jsonSerializer =
122-
JsonRowDataSerializationSchemaUtils.createSerializationSchema(
121+
new JsonRowDataSerializationSchema(
123122
createJsonRowType(fromLogicalToDataType(rowType)),
124123
timestampFormat,
125124
mapNullKeyMode,

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.flink.cdc.connectors.kafka.json.debezium;
1919

2020
import org.apache.flink.api.common.serialization.SerializationSchema;
21-
import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
2221
import org.apache.flink.formats.common.TimestampFormat;
2322
import org.apache.flink.formats.json.JsonFormatOptions;
2423
import org.apache.flink.formats.json.JsonParserRowDataDeserializationSchema;
@@ -90,7 +89,7 @@ public DebeziumJsonRowDataSerializationSchema(
9089
this.mapNullKeyLiteral = mapNullKeyLiteral;
9190
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
9291
this.runtimeConverter =
93-
JsonRowDataSerializationSchemaUtils.createRowDataToJsonConverters(
92+
new RowDataToJsonConverters(
9493
timestampFormat,
9594
mapNullKeyMode,
9695
mapNullKeyLiteral,

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
3434
import org.apache.flink.cdc.common.utils.SchemaUtils;
3535
import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
36-
import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
3736
import org.apache.flink.formats.common.TimestampFormat;
3837
import org.apache.flink.formats.json.JsonFormatOptions;
3938
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
@@ -136,7 +135,7 @@ private JsonRowDataSerializationSchema buildSerializationForPrimaryKey(Schema sc
136135
// the row should never be null
137136
DataType dataType = DataTypes.ROW(fields).notNull();
138137
LogicalType rowType = DataTypeUtils.toFlinkDataType(dataType).getLogicalType();
139-
return JsonRowDataSerializationSchemaUtils.createSerializationSchema(
138+
return new JsonRowDataSerializationSchema(
140139
(RowType) rowType,
141140
timestampFormat,
142141
mapNullKeyMode,

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.cdc.common.event.Event;
2222
import org.apache.flink.cdc.connectors.kafka.serialization.CsvSerializationSchema;
2323
import org.apache.flink.cdc.connectors.kafka.serialization.JsonSerializationSchema;
24-
import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
2524
import org.apache.flink.configuration.ReadableConfig;
2625
import org.apache.flink.formats.common.TimestampFormat;
2726
import org.apache.flink.formats.json.JsonFormatOptions;
@@ -56,8 +55,7 @@ public static SerializationSchema<Event> createSerializationSchema(
5655
final boolean encodeDecimalAsPlainNumber =
5756
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
5857
final boolean ignoreNullFields =
59-
JsonRowDataSerializationSchemaUtils.enableIgnoreNullFields(
60-
formatOptions);
58+
formatOptions.get(JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS);
6159
return new JsonSerializationSchema(
6260
timestampFormat,
6361
mapNullKeyMode,

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java

Lines changed: 0 additions & 143 deletions
This file was deleted.

0 commit comments

Comments
 (0)