Skip to content

Commit 0cfe1d3

Browse files
linguoxuanguoxuanlinyuxiqian
authored
[FLINK-38889][pipeline][kafka] Support serializing complex types(MAP, ARRAY, ROW) to JSON (Debezium / Canal) (#4221)
Co-authored-by: guoxuanlin <guoxuanlin@tencent.com> Co-authored-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
1 parent df6893d commit 0cfe1d3

7 files changed

Lines changed: 1485 additions & 98 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/TableSchemaInfo.java

Lines changed: 2 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,15 @@
2121
import org.apache.flink.cdc.common.data.RecordData;
2222
import org.apache.flink.cdc.common.event.TableId;
2323
import org.apache.flink.cdc.common.schema.Schema;
24-
import org.apache.flink.cdc.common.types.DataType;
25-
import org.apache.flink.cdc.common.types.DataTypeChecks;
26-
import org.apache.flink.table.data.DecimalData;
24+
import org.apache.flink.cdc.connectors.kafka.json.utils.RecordDataConverter;
2725
import org.apache.flink.table.data.GenericRowData;
2826
import org.apache.flink.table.data.RowData;
2927
import org.apache.flink.table.data.StringData;
30-
import org.apache.flink.table.data.TimestampData;
31-
import org.apache.flink.table.data.binary.BinaryStringData;
3228

3329
import java.time.ZoneId;
3430
import java.util.ArrayList;
3531
import java.util.List;
3632

37-
import static org.apache.flink.cdc.common.types.DataTypeChecks.getPrecision;
38-
import static org.apache.flink.cdc.common.types.DataTypeChecks.getScale;
39-
4033
/** maintain the {@link SerializationSchema} of a specific {@link TableId}. */
4134
public class TableSchemaInfo {
4235

@@ -96,96 +89,7 @@ public RowData getRowDataFromRecordData(RecordData recordData, boolean primaryKe
9689
}
9790

9891
private static List<RecordData.FieldGetter> createFieldGetters(Schema schema, ZoneId zoneId) {
99-
List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(schema.getColumns().size());
100-
for (int i = 0; i < schema.getColumns().size(); i++) {
101-
fieldGetters.add(createFieldGetter(schema.getColumns().get(i).getType(), i, zoneId));
102-
}
103-
return fieldGetters;
104-
}
105-
106-
private static RecordData.FieldGetter createFieldGetter(
107-
DataType fieldType, int fieldPos, ZoneId zoneId) {
108-
final RecordData.FieldGetter fieldGetter;
109-
// ordered by type root definition
110-
switch (fieldType.getTypeRoot()) {
111-
case CHAR:
112-
case VARCHAR:
113-
fieldGetter =
114-
record ->
115-
BinaryStringData.fromString(record.getString(fieldPos).toString());
116-
break;
117-
case BOOLEAN:
118-
fieldGetter = record -> record.getBoolean(fieldPos);
119-
break;
120-
case BINARY:
121-
case VARBINARY:
122-
fieldGetter = record -> record.getBinary(fieldPos);
123-
break;
124-
case DECIMAL:
125-
final int decimalPrecision = getPrecision(fieldType);
126-
final int decimalScale = getScale(fieldType);
127-
fieldGetter =
128-
record ->
129-
DecimalData.fromBigDecimal(
130-
record.getDecimal(fieldPos, decimalPrecision, decimalScale)
131-
.toBigDecimal(),
132-
decimalPrecision,
133-
decimalScale);
134-
break;
135-
case TINYINT:
136-
fieldGetter = record -> record.getByte(fieldPos);
137-
break;
138-
case SMALLINT:
139-
fieldGetter = record -> record.getShort(fieldPos);
140-
break;
141-
case INTEGER:
142-
fieldGetter = record -> record.getInt(fieldPos);
143-
break;
144-
case DATE:
145-
fieldGetter = record -> (int) record.getDate(fieldPos).toEpochDay();
146-
break;
147-
case TIME_WITHOUT_TIME_ZONE:
148-
fieldGetter = record -> (int) record.getTime(fieldPos).toMillisOfDay();
149-
break;
150-
case BIGINT:
151-
fieldGetter = record -> record.getLong(fieldPos);
152-
break;
153-
case FLOAT:
154-
fieldGetter = record -> record.getFloat(fieldPos);
155-
break;
156-
case DOUBLE:
157-
fieldGetter = record -> record.getDouble(fieldPos);
158-
break;
159-
case TIMESTAMP_WITHOUT_TIME_ZONE:
160-
fieldGetter =
161-
record ->
162-
TimestampData.fromTimestamp(
163-
record.getTimestamp(fieldPos, getPrecision(fieldType))
164-
.toTimestamp());
165-
break;
166-
case TIMESTAMP_WITH_TIME_ZONE:
167-
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
168-
fieldGetter =
169-
record ->
170-
TimestampData.fromInstant(
171-
record.getLocalZonedTimestampData(
172-
fieldPos,
173-
DataTypeChecks.getPrecision(fieldType))
174-
.toInstant());
175-
break;
176-
default:
177-
throw new IllegalArgumentException(
178-
"don't support type of " + fieldType.getTypeRoot());
179-
}
180-
if (!fieldType.isNullable()) {
181-
return fieldGetter;
182-
}
183-
return row -> {
184-
if (row.isNullAt(fieldPos)) {
185-
return null;
186-
}
187-
return fieldGetter.getFieldOrNull(row);
188-
};
92+
return RecordDataConverter.createFieldGetters(schema, zoneId);
18993
}
19094

19195
public Schema getSchema() {

0 commit comments

Comments
 (0)