From 37ab3ed8e9b88b23e1a85df7261e5cc0f3179d31 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Fri, 10 Apr 2026 17:40:00 +0800 Subject: [PATCH 1/9] [FLINK-39417] Fix GenericRecordData could not be [de]serialized in pipeline --- .../flink/FlinkPipelineComposerITCase.java | 234 ++++++++++ .../cdc/connectors/values/ValuesDatabase.java | 6 +- .../transform/PreTransformOperator.java | 5 +- .../transform/PreTransformProcessor.java | 2 +- .../data/GenericRecordDataSerializer.java | 417 ++++++++++++++++++ .../serializer/data/RecordDataSerializer.java | 64 ++- .../serializer/data/writer/BinaryWriter.java | 19 +- .../data/RecordDataSerializerTest.java | 277 +++++++++++- 8 files changed, 1007 insertions(+), 17 deletions(-) create mode 100644 flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 47e8fae5789..80606a38a80 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -18,8 +18,13 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.DateData; import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.GenericArrayData; +import org.apache.flink.cdc.common.data.GenericMapData; +import org.apache.flink.cdc.common.data.GenericRecordData; import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.TimeData; import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.ZonedTimestampData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; @@ -72,6 +77,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1733,6 +1739,234 @@ void testRouteModeAllMatch(ValuesDataSink.SinkApi sinkApi) throws Exception { .isGreaterThan(0); } + @ParameterizedTest + @EnumSource + void testGenericRecordDataEndToEnd(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1"); + Schema table1Schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("col_bool", DataTypes.BOOLEAN()) + .physicalColumn("col_tinyint", DataTypes.TINYINT()) + .physicalColumn("col_smallint", DataTypes.SMALLINT()) + .physicalColumn("col_bigint", DataTypes.BIGINT()) + .physicalColumn("col_float", DataTypes.FLOAT()) + .physicalColumn("col_double", DataTypes.DOUBLE()) + .physicalColumn("col_decimal", DataTypes.DECIMAL(10, 2)) + .physicalColumn("col_date", DataTypes.DATE()) + .physicalColumn("col_time", DataTypes.TIME()) + .physicalColumn("col_timestamp", DataTypes.TIMESTAMP(3)) + .physicalColumn("col_timestamp_ltz", DataTypes.TIMESTAMP_LTZ(3)) + .physicalColumn("col_timestamp_tz", DataTypes.TIMESTAMP_TZ(3)) + .physicalColumn("col_array", DataTypes.ARRAY(DataTypes.STRING())) + .physicalColumn( + "col_map", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())) + .physicalColumn( + "col_row", + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.STRING()))) + .primaryKey("id") + .build(); + + GenericArrayData testArray = + new GenericArrayData( + new Object[] { + BinaryStringData.fromString("x"), BinaryStringData.fromString("y") + }); + GenericMapData testMap = new GenericMapData(Map.of(BinaryStringData.fromString("k1"), 100)); + GenericRecordData testRow = GenericRecordData.of(77, BinaryStringData.fromString("inner")); + DecimalData testDecimal = DecimalData.fromBigDecimal(new BigDecimal("123.45"), 10, 2); + TimestampData testTs = TimestampData.fromMillis(1609459200000L); + LocalZonedTimestampData testTsLtz = LocalZonedTimestampData.fromEpochMillis(1609459200000L); + ZonedTimestampData testTsTz = ZonedTimestampData.of(1609459200000L, 0, "UTC"); + + List events = new ArrayList<>(); + events.add(new CreateTableEvent(myTable1, table1Schema)); + events.add( + DataChangeEvent.insertEvent( + myTable1, + GenericRecordData.of( + 1, + BinaryStringData.fromString("Alice"), + 18, + true, + (byte) 1, + (short) 100, + 1000L, + 1.0f, + 1.0, + testDecimal, + DateData.fromEpochDay(18628), + TimeData.fromMillisOfDay(43200000), + testTs, + testTsLtz, + testTsTz, + testArray, + testMap, + testRow))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + GenericRecordData.of( + 2, + BinaryStringData.fromString("Bob"), + 20, + true, + (byte) 42, + (short) 200, + 9876543210L, + 3.14f, + 2.718, + testDecimal, + DateData.fromEpochDay(18628), + TimeData.fromMillisOfDay(43200000), + testTs, + testTsLtz, + testTsTz, + testArray, + testMap, + testRow))); + events.add( + DataChangeEvent.updateEvent( + myTable1, + GenericRecordData.of( + 2, + BinaryStringData.fromString("Bob"), + 20, + true, + (byte) 42, + (short) 200, + 9876543210L, + 3.14f, + 2.718, + testDecimal, + DateData.fromEpochDay(18628), + TimeData.fromMillisOfDay(43200000), + testTs, + testTsLtz, + testTsTz, + testArray, + testMap, + testRow), + GenericRecordData.of( + 2, + BinaryStringData.fromString("Bob"), + 30, + false, + (byte) 43, + (short) 201, + 9876543211L, + 3.15f, + 2.719, + testDecimal, + DateData.fromEpochDay(18629), + TimeData.fromMillisOfDay(43201000), + testTs, + testTsLtz, + testTsTz, + testArray, + testMap, + testRow))); + events.add( + DataChangeEvent.insertEvent( + myTable1, + GenericRecordData.of( + 3, null, null, null, null, null, null, null, null, null, null, null, + null, null, null, null, null, null))); + events.add( + DataChangeEvent.deleteEvent( + myTable1, + GenericRecordData.of( + 1, + BinaryStringData.fromString("Alice"), + 18, + true, + (byte) 1, + (short) 100, + 1000L, + 1.0f, + 1.0, + testDecimal, + DateData.fromEpochDay(18628), + TimeData.fromMillisOfDay(43200000), + testTs, + testTsLtz, + testTsTz, + testArray, + testMap, + testRow))); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + TransformDef transformDef = + new TransformDef( + "default_namespace.default_schema.mytable1", + "*, 'test_tag' as tag", + "id <> 1", + null, + null, + null, + "", + null); + + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + new ArrayList<>(Collections.singletonList(transformDef)), + Collections.emptyList(), + pipelineConfig); + + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + List results = ValuesDatabase.getResults(myTable1); + assertThat(results).hasSize(2); + assertThat(results) + .anySatisfy( + r -> + assertThat(r) + .startsWith( + "default_namespace.default_schema.mytable1:id=2;name=Bob;age=30;col_bool=false;col_tinyint=43;col_smallint=201;col_bigint=9876543211;col_float=3.15;col_double=2.719;col_decimal=123.45;col_date=2021-01-02;col_time=12:00:01;col_timestamp=2021-01-01T00:00;col_timestamp_ltz=2021-01-01T00:00;col_timestamp_tz=2021-01-01T00:00:00Z;") + .endsWith("tag=test_tag")) + .anySatisfy( + r -> + assertThat(r) + .isEqualTo( + "default_namespace.default_schema.mytable1:id=3;name=;age=;col_bool=;col_tinyint=;col_smallint=;col_bigint=;col_float=;col_double=;col_decimal=;col_date=;col_time=;col_timestamp=;col_timestamp_ltz=;col_timestamp_tz=;col_array=;col_map=;col_row=;tag=test_tag")); + + String[] outputEvents = outCaptor.toString().trim().split("\n"); + assertThat(Arrays.asList(outputEvents)) + .containsExactly( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`col_bool` BOOLEAN,`col_tinyint` TINYINT,`col_smallint` SMALLINT,`col_bigint` BIGINT,`col_float` FLOAT,`col_double` DOUBLE,`col_decimal` DECIMAL(10, 2),`col_date` DATE,`col_time` TIME(0),`col_timestamp` TIMESTAMP(3),`col_timestamp_ltz` TIMESTAMP_LTZ(3),`col_timestamp_tz` TIMESTAMP(3) WITH TIME ZONE,`col_array` ARRAY,`col_map` MAP,`col_row` ROW<`f0` INT, `f1` STRING>,`tag` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, true, 42, 200, 9876543210, 3.14, 2.718, 123.45, 2021-01-01, 12:00, 2021-01-01T00:00, 2021-01-01T00:00, 2021-01-01T00:00:00Z, [x, y], {k1 -> 100}, {f0: INT -> 77, f1: STRING -> inner}, test_tag], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, true, 42, 200, 9876543210, 3.14, 2.718, 123.45, 2021-01-01, 12:00, 2021-01-01T00:00, 2021-01-01T00:00, 2021-01-01T00:00:00Z, [x, y], {k1 -> 100}, {f0: INT -> 77, f1: STRING -> inner}, test_tag], after=[2, Bob, 30, false, 43, 201, 9876543211, 3.15, 2.719, 123.45, 2021-01-02, 12:00:01, 2021-01-01T00:00, 2021-01-01T00:00, 2021-01-01T00:00:00Z, [x, y], {k1 -> 100}, {f0: INT -> 77, f1: STRING -> inner}, test_tag], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, test_tag], op=INSERT, meta=()}"); + } + BinaryRecordData generate(Schema schema, Object... fields) { return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) .generate( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java index 698203e91cc..7689c637ddd 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java @@ -422,7 +422,11 @@ private void updatePrimaryKeyIndexes() { private String buildPrimaryKeyStr(RecordData recordData) { StringBuilder stringBuilder = new StringBuilder(); for (Integer primaryKeyIndex : primaryKeyIndexes) { - stringBuilder.append(recordData.getString(primaryKeyIndex).toString()).append(","); + RecordData.FieldGetter fieldGetter = + RecordData.createFieldGetter( + columns.get(primaryKeyIndex).getType(), primaryKeyIndex); + Object value = fieldGetter.getFieldOrNull(recordData); + stringBuilder.append(value != null ? value.toString() : "null").append(","); } stringBuilder.deleteCharAt(stringBuilder.length() - 1); return stringBuilder.toString(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index cb8748d12a0..7c6db17354e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -19,6 +19,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.event.ChangeEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -391,8 +392,8 @@ private DataChangeEvent processDataChangeEvent(DataChangeEvent dataChangeEvent) + "This is likely a bug, please consider filing an issue.", tableId); - BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before(); - BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after(); + RecordData before = dataChangeEvent.before(); + RecordData after = dataChangeEvent.after(); if (before != null) { BinaryRecordData projectedBefore = processor.processFillDataField(before); dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java index cd679fcd5b4..00caa1e6c9b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java @@ -59,7 +59,7 @@ public CreateTableEvent preTransformCreateTableEvent(CreateTableEvent createTabl return new CreateTableEvent(createTableEvent.tableId(), schema); } - public BinaryRecordData processFillDataField(BinaryRecordData data) { + public BinaryRecordData processFillDataField(RecordData data) { List valueList = new ArrayList<>(); List columns = tableChangeInfo.getPreTransformedSchema().getColumns(); Map sourceFieldGettersMap = diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java new file mode 100644 index 00000000000..88d4c75b580 --- /dev/null +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.serializer.data; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.data.ArrayData; +import org.apache.flink.cdc.common.data.DateData; +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.GenericArrayData; +import org.apache.flink.cdc.common.data.GenericMapData; +import org.apache.flink.cdc.common.data.GenericRecordData; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.MapData; +import org.apache.flink.cdc.common.data.StringData; +import org.apache.flink.cdc.common.data.TimeData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; +import org.apache.flink.cdc.common.data.binary.BinaryStringData; +import org.apache.flink.cdc.common.types.variant.BinaryVariant; +import org.apache.flink.cdc.common.types.variant.Variant; +import org.apache.flink.cdc.runtime.serializer.data.binary.BinaryRecordDataSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Serializer for {@link GenericRecordData}. Uses a self-describing format where each field is + * prefixed with a type tag, so no schema information is needed at serialization time. + */ +@Internal +public class GenericRecordDataSerializer { + + // Type tags for self-describing format + static final byte TAG_NULL = 0; + static final byte TAG_BOOLEAN = 1; + static final byte TAG_BYTE = 2; + static final byte TAG_SHORT = 3; + static final byte TAG_INT = 4; + static final byte TAG_LONG = 5; + static final byte TAG_FLOAT = 6; + static final byte TAG_DOUBLE = 7; + static final byte TAG_STRING = 8; + static final byte TAG_BINARY = 9; + static final byte TAG_DECIMAL = 10; + static final byte TAG_TIMESTAMP = 11; + static final byte TAG_ZONED_TIMESTAMP = 12; + static final byte TAG_LOCAL_ZONED_TIMESTAMP = 13; + static final byte TAG_DATE = 14; + static final byte TAG_TIME = 15; + static final byte TAG_GENERIC_RECORD = 16; + static final byte TAG_BINARY_RECORD = 17; + static final byte TAG_ARRAY = 18; + static final byte TAG_MAP = 19; + static final byte TAG_VARIANT = 20; + + private GenericRecordDataSerializer() {} + + /** Serializes a {@link GenericRecordData} to the given output view. */ + public static void serialize(GenericRecordData record, DataOutputView target) + throws IOException { + int arity = record.getArity(); + target.writeInt(arity); + for (int i = 0; i < arity; i++) { + serializeField(record.getField(i), target); + } + } + + /** Deserializes a {@link GenericRecordData} from the given input view. */ + public static GenericRecordData deserialize(DataInputView source) throws IOException { + int arity = source.readInt(); + GenericRecordData record = new GenericRecordData(arity); + for (int i = 0; i < arity; i++) { + record.setField(i, deserializeField(source)); + } + return record; + } + + /** Creates a deep copy of the given {@link GenericRecordData}. */ + public static GenericRecordData copy(GenericRecordData from) { + int arity = from.getArity(); + GenericRecordData copy = new GenericRecordData(arity); + for (int i = 0; i < arity; i++) { + copy.setField(i, copyField(from.getField(i))); + } + return copy; + } + + // ---- Field-level serialization ---- + + static void serializeField(Object field, DataOutputView target) throws IOException { + if (field == null) { + target.writeByte(TAG_NULL); + } else if (field instanceof Boolean) { + target.writeByte(TAG_BOOLEAN); + target.writeBoolean((Boolean) field); + } else if (field instanceof Byte) { + target.writeByte(TAG_BYTE); + target.writeByte((Byte) field); + } else if (field instanceof Short) { + target.writeByte(TAG_SHORT); + target.writeShort((Short) field); + } else if (field instanceof Integer) { + target.writeByte(TAG_INT); + target.writeInt((Integer) field); + } else if (field instanceof Long) { + target.writeByte(TAG_LONG); + target.writeLong((Long) field); + } else if (field instanceof Float) { + target.writeByte(TAG_FLOAT); + target.writeFloat((Float) field); + } else if (field instanceof Double) { + target.writeByte(TAG_DOUBLE); + target.writeDouble((Double) field); + } else if (field instanceof StringData) { + target.writeByte(TAG_STRING); + byte[] bytes = field.toString().getBytes(StandardCharsets.UTF_8); + target.writeInt(bytes.length); + target.write(bytes); + } else if (field instanceof byte[]) { + target.writeByte(TAG_BINARY); + byte[] bytes = (byte[]) field; + target.writeInt(bytes.length); + target.write(bytes); + } else if (field instanceof DecimalData) { + target.writeByte(TAG_DECIMAL); + DecimalData decimal = (DecimalData) field; + BigDecimal bd = decimal.toBigDecimal(); + target.writeInt(bd.precision()); + target.writeInt(bd.scale()); + byte[] unscaled = bd.unscaledValue().toByteArray(); + target.writeInt(unscaled.length); + target.write(unscaled); + } else if (field instanceof TimestampData) { + target.writeByte(TAG_TIMESTAMP); + TimestampData ts = (TimestampData) field; + target.writeLong(ts.getMillisecond()); + target.writeInt(ts.getNanoOfMillisecond()); + } else if (field instanceof ZonedTimestampData) { + target.writeByte(TAG_ZONED_TIMESTAMP); + ZonedTimestampData zts = (ZonedTimestampData) field; + target.writeLong(zts.getMillisecond()); + target.writeInt(zts.getNanoOfMillisecond()); + byte[] zoneBytes = zts.getZoneId().getBytes(StandardCharsets.UTF_8); + target.writeInt(zoneBytes.length); + target.write(zoneBytes); + } else if (field instanceof LocalZonedTimestampData) { + target.writeByte(TAG_LOCAL_ZONED_TIMESTAMP); + LocalZonedTimestampData lzts = (LocalZonedTimestampData) field; + target.writeLong(lzts.getEpochMillisecond()); + target.writeInt(lzts.getEpochNanoOfMillisecond()); + } else if (field instanceof DateData) { + target.writeByte(TAG_DATE); + target.writeInt(((DateData) field).toEpochDay()); + } else if (field instanceof TimeData) { + target.writeByte(TAG_TIME); + target.writeInt(((TimeData) field).toMillisOfDay()); + } else if (field instanceof GenericRecordData) { + target.writeByte(TAG_GENERIC_RECORD); + serialize((GenericRecordData) field, target); + } else if (field instanceof BinaryRecordData) { + target.writeByte(TAG_BINARY_RECORD); + BinaryRecordDataSerializer.INSTANCE.serialize((BinaryRecordData) field, target); + } else if (field instanceof ArrayData) { + target.writeByte(TAG_ARRAY); + serializeArrayData((ArrayData) field, target); + } else if (field instanceof MapData) { + target.writeByte(TAG_MAP); + serializeMapData((MapData) field, target); + } else if (field instanceof Variant) { + target.writeByte(TAG_VARIANT); + serializeVariant((Variant) field, target); + } else { + throw new IOException( + "Unsupported field type in GenericRecordData: " + field.getClass().getName()); + } + } + + static Object deserializeField(DataInputView source) throws IOException { + byte tag = source.readByte(); + switch (tag) { + case TAG_NULL: + return null; + case TAG_BOOLEAN: + return source.readBoolean(); + case TAG_BYTE: + return source.readByte(); + case TAG_SHORT: + return source.readShort(); + case TAG_INT: + return source.readInt(); + case TAG_LONG: + return source.readLong(); + case TAG_FLOAT: + return source.readFloat(); + case TAG_DOUBLE: + return source.readDouble(); + case TAG_STRING: + { + int len = source.readInt(); + byte[] bytes = new byte[len]; + source.readFully(bytes); + return BinaryStringData.fromString(new String(bytes, StandardCharsets.UTF_8)); + } + case TAG_BINARY: + { + int len = source.readInt(); + byte[] bytes = new byte[len]; + source.readFully(bytes); + return bytes; + } + case TAG_DECIMAL: + { + int precision = source.readInt(); + int scale = source.readInt(); + int len = source.readInt(); + byte[] unscaled = new byte[len]; + source.readFully(unscaled); + return DecimalData.fromBigDecimal( + new BigDecimal(new BigInteger(unscaled), scale), precision, scale); + } + case TAG_TIMESTAMP: + return TimestampData.fromMillis(source.readLong(), source.readInt()); + case TAG_ZONED_TIMESTAMP: + { + long millis = source.readLong(); + int nanos = source.readInt(); + int zoneLen = source.readInt(); + byte[] zoneBytes = new byte[zoneLen]; + source.readFully(zoneBytes); + return ZonedTimestampData.of( + millis, nanos, new String(zoneBytes, StandardCharsets.UTF_8)); + } + case TAG_LOCAL_ZONED_TIMESTAMP: + return LocalZonedTimestampData.fromEpochMillis(source.readLong(), source.readInt()); + case TAG_DATE: + return DateData.fromEpochDay(source.readInt()); + case TAG_TIME: + return TimeData.fromMillisOfDay(source.readInt()); + case TAG_GENERIC_RECORD: + return deserialize(source); + case TAG_BINARY_RECORD: + return BinaryRecordDataSerializer.INSTANCE.deserialize(source); + case TAG_ARRAY: + return deserializeArrayData(source); + case TAG_MAP: + return deserializeMapData(source); + case TAG_VARIANT: + return deserializeVariant(source); + default: + throw new IOException("Unknown field type tag: " + tag); + } + } + + // ---- ArrayData serialization ---- + + private static void serializeArrayData(ArrayData arrayData, DataOutputView target) + throws IOException { + if (arrayData instanceof GenericArrayData) { + Object[] elements = ((GenericArrayData) arrayData).toObjectArray(); + target.writeInt(elements.length); + for (Object element : elements) { + serializeField(element, target); + } + } else { + // For BinaryArrayData or other implementations, convert to object array via getters + int size = arrayData.size(); + target.writeInt(size); + // We serialize each element as a generic object; since we don't know element types, + // we attempt to read them generically. For binary array data, the safest approach is + // to serialize the raw binary data. + // However, we don't have direct access to the underlying binary data in a generic way. + // So we fall back to reading elements as objects. + throw new IOException( + "Serialization of non-generic ArrayData is not supported in GenericRecordDataSerializer. " + + "Actual type: " + + arrayData.getClass().getName()); + } + } + + private static ArrayData deserializeArrayData(DataInputView source) throws IOException { + int size = source.readInt(); + Object[] elements = new Object[size]; + for (int i = 0; i < size; i++) { + elements[i] = deserializeField(source); + } + return new GenericArrayData(elements); + } + + // ---- MapData serialization ---- + + private static void serializeMapData(MapData mapData, DataOutputView target) + throws IOException { + if (mapData instanceof GenericMapData) { + ArrayData keyArray = mapData.keyArray(); + ArrayData valueArray = mapData.valueArray(); + int size = mapData.size(); + target.writeInt(size); + if (keyArray instanceof GenericArrayData && valueArray instanceof GenericArrayData) { + Object[] keys = ((GenericArrayData) keyArray).toObjectArray(); + Object[] values = ((GenericArrayData) valueArray).toObjectArray(); + for (int i = 0; i < size; i++) { + serializeField(keys[i], target); + serializeField(values[i], target); + } + } else { + throw new IOException( + "MapData with non-generic key/value arrays is not supported in GenericRecordDataSerializer."); + } + } else { + throw new IOException( + "Serialization of non-generic MapData is not supported in GenericRecordDataSerializer. " + + "Actual type: " + + mapData.getClass().getName()); + } + } + + private static MapData deserializeMapData(DataInputView source) throws IOException { + int size = source.readInt(); + Map map = new LinkedHashMap<>(size); + for (int i = 0; i < size; i++) { + Object key = deserializeField(source); + Object value = deserializeField(source); + map.put(key, value); + } + return new GenericMapData(map); + } + + // ---- Variant serialization ---- + + private static void serializeVariant(Variant variant, DataOutputView target) + throws IOException { + if (variant instanceof BinaryVariant) { + BinaryVariant bv = (BinaryVariant) variant; + byte[] value = bv.getValue(); + byte[] metadata = bv.getMetadata(); + target.writeInt(value.length); + target.write(value); + target.writeInt(metadata.length); + target.write(metadata); + } else { + throw new IOException("Unsupported Variant type: " + variant.getClass().getName()); + } + } + + private static Variant deserializeVariant(DataInputView source) throws IOException { + int valueLen = source.readInt(); + byte[] value = new byte[valueLen]; + source.readFully(value); + int metadataLen = source.readInt(); + byte[] metadata = new byte[metadataLen]; + source.readFully(metadata); + return new BinaryVariant(value, metadata); + } + + // ---- Field copy ---- + + private static Object copyField(Object field) { + if (field == null) { + return null; + } + // Most CDC internal data types are immutable, so shallow copy is safe + if (field instanceof byte[]) { + return ((byte[]) field).clone(); + } else if (field instanceof GenericRecordData) { + return copy((GenericRecordData) field); + } else if (field instanceof BinaryRecordData) { + return ((BinaryRecordData) field).copy(); + } else if (field instanceof GenericArrayData) { + Object[] elements = ((GenericArrayData) field).toObjectArray(); + Object[] copied = new Object[elements.length]; + for (int i = 0; i < elements.length; i++) { + copied[i] = copyField(elements[i]); + } + return new GenericArrayData(copied); + } else if (field instanceof GenericMapData) { + GenericMapData mapData = (GenericMapData) field; + ArrayData keyArray = mapData.keyArray(); + ArrayData valueArray = mapData.valueArray(); + if (keyArray instanceof GenericArrayData && valueArray instanceof GenericArrayData) { + Object[] keys = ((GenericArrayData) keyArray).toObjectArray(); + Object[] values = ((GenericArrayData) valueArray).toObjectArray(); + Map newMap = new LinkedHashMap<>(keys.length); + for (int i = 0; i < keys.length; i++) { + newMap.put(copyField(keys[i]), copyField(values[i])); + } + return new GenericMapData(newMap); + } + return field; + } + // Immutable types: Boolean, Byte, Short, Integer, Long, Float, Double, + // StringData, DecimalData, TimestampData, ZonedTimestampData, + // LocalZonedTimestampData, DateData, TimeData, Variant + return field; + } +} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java index 6348c5c29c5..f2bb92a3293 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.cdc.common.data.GenericRecordData; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.runtime.serializer.TypeSerializerSingleton; @@ -33,43 +34,86 @@ public class RecordDataSerializer extends TypeSerializerSingleton { private static final long serialVersionUID = 1L; + /** Type tag for BinaryRecordData serialization. */ + private static final byte BINARY_RECORD_TYPE = 0; + + /** Type tag for GenericRecordData serialization. */ + private static final byte GENERIC_RECORD_TYPE = 1; + private final BinaryRecordDataSerializer binarySerializer = BinaryRecordDataSerializer.INSTANCE; public static final RecordDataSerializer INSTANCE = new RecordDataSerializer(); @Override public RecordData createInstance() { - // BinaryRecordData is the only implementation of RecordData return new BinaryRecordData(1); } @Override public void serialize(RecordData recordData, DataOutputView target) throws IOException { - // BinaryRecordData is the only implementation of RecordData - binarySerializer.serialize((BinaryRecordData) recordData, target); + if (recordData instanceof BinaryRecordData) { + target.writeByte(BINARY_RECORD_TYPE); + binarySerializer.serialize((BinaryRecordData) recordData, target); + } else if (recordData instanceof GenericRecordData) { + target.writeByte(GENERIC_RECORD_TYPE); + GenericRecordDataSerializer.serialize((GenericRecordData) recordData, target); + } else { + throw new IOException( + "Unsupported RecordData type: " + recordData.getClass().getName()); + } } @Override public RecordData deserialize(DataInputView source) throws IOException { - // BinaryRecordData is the only implementation of RecordData - return binarySerializer.deserialize(source); + byte type = source.readByte(); + if (type == BINARY_RECORD_TYPE) { + return binarySerializer.deserialize(source); + } else if (type == GENERIC_RECORD_TYPE) { + return GenericRecordDataSerializer.deserialize(source); + } else { + throw new IOException("Unknown RecordData type tag: " + type); + } } @Override public RecordData deserialize(RecordData reuse, DataInputView source) throws IOException { - return binarySerializer.deserialize((BinaryRecordData) reuse, source); + byte type = source.readByte(); + if (type == BINARY_RECORD_TYPE) { + if (reuse instanceof BinaryRecordData) { + return binarySerializer.deserialize((BinaryRecordData) reuse, source); + } + return binarySerializer.deserialize(source); + } else if (type == GENERIC_RECORD_TYPE) { + return GenericRecordDataSerializer.deserialize(source); + } else { + throw new IOException("Unknown RecordData type tag: " + type); + } } @Override public RecordData copy(RecordData from) { - // BinaryRecordData is the only implementation of RecordData - return ((BinaryRecordData) from).copy(); + if (from instanceof BinaryRecordData) { + return ((BinaryRecordData) from).copy(); + } else if (from instanceof GenericRecordData) { + return GenericRecordDataSerializer.copy((GenericRecordData) from); + } else { + throw new RuntimeException("Unsupported RecordData type: " + from.getClass().getName()); + } } @Override public RecordData copy(RecordData from, RecordData reuse) { - // BinaryRecordData is the only implementation of RecordData - return ((BinaryRecordData) from).copy((BinaryRecordData) reuse); + if (from instanceof BinaryRecordData) { + BinaryRecordData reuseRecord = + (reuse instanceof BinaryRecordData) + ? (BinaryRecordData) reuse + : new BinaryRecordData(from.getArity()); + return ((BinaryRecordData) from).copy(reuseRecord); + } else if (from instanceof GenericRecordData) { + return GenericRecordDataSerializer.copy((GenericRecordData) from); + } else { + throw new RuntimeException("Unsupported RecordData type: " + from.getClass().getName()); + } } @Override diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java index f6afad75630..1dccf9bc1e4 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java @@ -29,9 +29,11 @@ import org.apache.flink.cdc.common.data.TimeData; import org.apache.flink.cdc.common.data.TimestampData; import org.apache.flink.cdc.common.data.ZonedTimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DecimalType; import org.apache.flink.cdc.common.types.LocalZonedTimestampType; +import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.common.types.TimeType; import org.apache.flink.cdc.common.types.TimestampType; import org.apache.flink.cdc.common.types.ZonedTimestampType; @@ -39,6 +41,9 @@ import org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper; import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer; import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer; +import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; + +import java.util.List; /** * Writer to write a composite data format, like row, array. 1. Invoke {@link #reset()}. 2. Write @@ -171,7 +176,19 @@ static void write( writer.writeMap(pos, (MapData) o, (MapDataSerializer) serializer); break; case ROW: - writer.writeRecord(pos, (RecordData) o, (TypeSerializer) serializer); + RecordData recordData = (RecordData) o; + if (!(recordData instanceof BinaryRecordData)) { + RowType rowType = (RowType) type; + List childTypes = rowType.getChildren(); + Object[] fields = new Object[recordData.getArity()]; + for (int i = 0; i < fields.length; i++) { + fields[i] = + RecordData.createFieldGetter(childTypes.get(i), i) + .getFieldOrNull(recordData); + } + recordData = new BinaryRecordDataGenerator(rowType).generate(fields); + } + writer.writeRecord(pos, recordData, (TypeSerializer) serializer); break; case BINARY: case VARBINARY: diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java index ed84c46654e..05d4e3ee3ac 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java @@ -17,14 +17,34 @@ package org.apache.flink.cdc.runtime.serializer.data; +import org.apache.flink.cdc.common.data.ArrayData; +import org.apache.flink.cdc.common.data.DateData; +import org.apache.flink.cdc.common.data.DecimalData; +import org.apache.flink.cdc.common.data.GenericArrayData; +import org.apache.flink.cdc.common.data.GenericMapData; +import org.apache.flink.cdc.common.data.GenericRecordData; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.MapData; import org.apache.flink.cdc.common.data.RecordData; +import org.apache.flink.cdc.common.data.TimeData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; import org.apache.flink.cdc.runtime.serializer.SerializerTestBase; import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; -/** A test for the {@link StringDataSerializer}. */ class RecordDataSerializerTest extends SerializerTestBase { @Override @@ -49,7 +69,260 @@ protected RecordData[] getTestData() { return new RecordData[] { generator.generate(new Object[] {1L, BinaryStringData.fromString("test1")}), generator.generate(new Object[] {2L, BinaryStringData.fromString("test2")}), - generator.generate(new Object[] {3L, null}) + generator.generate(new Object[] {3L, null}), + GenericRecordData.of(1L, BinaryStringData.fromString("test1")), + GenericRecordData.of(2L, BinaryStringData.fromString("test2")), + GenericRecordData.of(3L, null) }; } + + @Test + void testGenericRecordDataWithVariousTypes() throws Exception { + RecordDataSerializer serializer = RecordDataSerializer.INSTANCE; + + GenericRecordData record = + GenericRecordData.of( + true, + (byte) 42, + (short) 1024, + 123456, + 789L, + 3.14f, + 2.718281828, + BinaryStringData.fromString("hello"), + new byte[] {1, 2, 3}, + DecimalData.fromBigDecimal(new BigDecimal("12345.6789"), 10, 4), + TimestampData.fromMillis(1609459200000L, 123456), + LocalZonedTimestampData.fromEpochMillis(1609459200000L, 654321), + null); + + DataOutputSerializer out = new DataOutputSerializer(256); + serializer.serialize(record, out); + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RecordData deserialized = serializer.deserialize(in); + + assertThat(deserialized).isInstanceOf(GenericRecordData.class); + assertThat(deserialized.getArity()).isEqualTo(record.getArity()); + assertThat(deserialized.getBoolean(0)).isTrue(); + assertThat(deserialized.getByte(1)).isEqualTo((byte) 42); + assertThat(deserialized.getShort(2)).isEqualTo((short) 1024); + assertThat(deserialized.getInt(3)).isEqualTo(123456); + assertThat(deserialized.getLong(4)).isEqualTo(789L); + assertThat(deserialized.getFloat(5)).isEqualTo(3.14f); + assertThat(deserialized.getDouble(6)).isEqualTo(2.718281828); + assertThat(deserialized.getString(7).toString()).isEqualTo("hello"); + assertThat(deserialized.getBinary(8)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(deserialized.getDecimal(9, 10, 4).toBigDecimal()) + .isEqualByComparingTo(new BigDecimal("12345.6789")); + assertThat(deserialized.getTimestamp(10, 6).getMillisecond()).isEqualTo(1609459200000L); + assertThat(deserialized.getTimestamp(10, 6).getNanoOfMillisecond()).isEqualTo(123456); + assertThat(deserialized.getLocalZonedTimestampData(11, 6).getEpochMillisecond()) + .isEqualTo(1609459200000L); + assertThat(deserialized.isNullAt(12)).isTrue(); + } + + @Test + void testBinaryRecordDataWithVariousTypes() throws Exception { + RecordDataSerializer serializer = RecordDataSerializer.INSTANCE; + + RowType rowType = + RowType.of( + DataTypes.BOOLEAN(), + DataTypes.TINYINT(), + DataTypes.SMALLINT(), + DataTypes.INT(), + DataTypes.BIGINT(), + DataTypes.FLOAT(), + DataTypes.DOUBLE(), + DataTypes.STRING(), + DataTypes.BYTES(), + DataTypes.DECIMAL(10, 4), + DataTypes.TIMESTAMP(6), + DataTypes.TIMESTAMP_LTZ(6), + DataTypes.TIMESTAMP_TZ(6), + DataTypes.DATE(), + DataTypes.TIME(), + DataTypes.STRING()); + + BinaryRecordData record = + new BinaryRecordDataGenerator(rowType) + .generate( + new Object[] { + true, + (byte) 42, + (short) 1024, + 123456, + 789L, + 3.14f, + 2.718281828, + BinaryStringData.fromString("hello"), + new byte[] {1, 2, 3}, + DecimalData.fromBigDecimal(new BigDecimal("12345.6789"), 10, 4), + TimestampData.fromMillis(1609459200000L, 123456), + LocalZonedTimestampData.fromEpochMillis(1609459200000L, 654321), + ZonedTimestampData.of(1609459200000L, 789012, "UTC"), + DateData.fromEpochDay(18628), + TimeData.fromMillisOfDay(43200000), + null + }); + + DataOutputSerializer out = new DataOutputSerializer(256); + serializer.serialize(record, out); + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RecordData deserialized = serializer.deserialize(in); + + assertThat(deserialized).isInstanceOf(BinaryRecordData.class); + assertThat(deserialized.getArity()).isEqualTo(record.getArity()); + assertThat(deserialized.getBoolean(0)).isTrue(); + assertThat(deserialized.getByte(1)).isEqualTo((byte) 42); + assertThat(deserialized.getShort(2)).isEqualTo((short) 1024); + assertThat(deserialized.getInt(3)).isEqualTo(123456); + assertThat(deserialized.getLong(4)).isEqualTo(789L); + assertThat(deserialized.getFloat(5)).isEqualTo(3.14f); + assertThat(deserialized.getDouble(6)).isEqualTo(2.718281828); + assertThat(deserialized.getString(7).toString()).isEqualTo("hello"); + assertThat(deserialized.getBinary(8)).isEqualTo(new byte[] {1, 2, 3}); + assertThat(deserialized.getDecimal(9, 10, 4).toBigDecimal()) + .isEqualByComparingTo(new BigDecimal("12345.6789")); + assertThat(deserialized.getTimestamp(10, 6).getMillisecond()).isEqualTo(1609459200000L); + assertThat(deserialized.getTimestamp(10, 6).getNanoOfMillisecond()).isEqualTo(123456); + assertThat(deserialized.getLocalZonedTimestampData(11, 6).getEpochMillisecond()) + .isEqualTo(1609459200000L); + assertThat(deserialized.getZonedTimestamp(12, 6).getMillisecond()) + .isEqualTo(1609459200000L); + assertThat(deserialized.getDate(13).toEpochDay()).isEqualTo(18628); + assertThat(deserialized.getTime(14).toMillisOfDay()).isEqualTo(43200000); + assertThat(deserialized.isNullAt(15)).isTrue(); + } + + @Test + void testGenericRecordDataWithNestedTypes() throws Exception { + RecordDataSerializer serializer = RecordDataSerializer.INSTANCE; + + GenericRecordData nestedGeneric = + GenericRecordData.of(42, BinaryStringData.fromString("nested")); + + BinaryRecordData nestedBinary = + new BinaryRecordDataGenerator(RowType.of(DataTypes.INT(), DataTypes.STRING())) + .generate(new Object[] {99, BinaryStringData.fromString("binary-nested")}); + + GenericArrayData intArray = new GenericArrayData(new int[] {1, 2, 3, 4, 5}); + GenericArrayData stringArray = + new GenericArrayData( + new Object[] { + BinaryStringData.fromString("a"), BinaryStringData.fromString("b") + }); + GenericMapData map = + new GenericMapData( + Map.of( + BinaryStringData.fromString("k1"), + 100, + BinaryStringData.fromString("k2"), + 200)); + + GenericRecordData record = + GenericRecordData.of(nestedGeneric, nestedBinary, intArray, stringArray, map); + + DataOutputSerializer out = new DataOutputSerializer(512); + serializer.serialize(record, out); + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RecordData deserialized = serializer.deserialize(in); + + assertThat(deserialized).isInstanceOf(GenericRecordData.class); + assertThat(deserialized.getArity()).isEqualTo(5); + + RecordData dNestedGeneric = deserialized.getRow(0, 2); + assertThat(dNestedGeneric.getInt(0)).isEqualTo(42); + assertThat(dNestedGeneric.getString(1).toString()).isEqualTo("nested"); + + RecordData dNestedBinary = deserialized.getRow(1, 2); + assertThat(dNestedBinary.getInt(0)).isEqualTo(99); + assertThat(dNestedBinary.getString(1).toString()).isEqualTo("binary-nested"); + + ArrayData dIntArray = deserialized.getArray(2); + assertThat(dIntArray.size()).isEqualTo(5); + assertThat(dIntArray.getInt(0)).isEqualTo(1); + assertThat(dIntArray.getInt(4)).isEqualTo(5); + + ArrayData dStringArray = deserialized.getArray(3); + assertThat(dStringArray.size()).isEqualTo(2); + assertThat(dStringArray.getString(0).toString()).isEqualTo("a"); + assertThat(dStringArray.getString(1).toString()).isEqualTo("b"); + + MapData dMap = deserialized.getMap(4); + assertThat(dMap.size()).isEqualTo(2); + } + + @Test + void testBinaryRecordDataWithNestedTypes() throws Exception { + RecordDataSerializer serializer = RecordDataSerializer.INSTANCE; + + RowType rowType = + RowType.of( + DataTypes.ARRAY(DataTypes.STRING()), + DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()), + DataTypes.ROW( + DataTypes.FIELD("f1", DataTypes.INT()), + DataTypes.FIELD("f2", DataTypes.STRING()))); + + BinaryRecordData record = + new BinaryRecordDataGenerator(rowType) + .generate( + new Object[] { + new GenericArrayData( + new Object[] { + BinaryStringData.fromString("x"), + BinaryStringData.fromString("y"), + BinaryStringData.fromString("z") + }), + new GenericMapData( + Map.of( + BinaryStringData.fromString("p"), + 10, + BinaryStringData.fromString("q"), + 20)), + new BinaryRecordDataGenerator( + RowType.of(DataTypes.INT(), DataTypes.STRING())) + .generate( + new Object[] { + 77, BinaryStringData.fromString("inner") + }) + }); + + DataOutputSerializer out = new DataOutputSerializer(512); + serializer.serialize(record, out); + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RecordData deserialized = serializer.deserialize(in); + + assertThat(deserialized).isInstanceOf(BinaryRecordData.class); + + ArrayData array = deserialized.getArray(0); + assertThat(array.size()).isEqualTo(3); + assertThat(array.getString(0).toString()).isEqualTo("x"); + assertThat(array.getString(2).toString()).isEqualTo("z"); + + MapData map = deserialized.getMap(1); + assertThat(map.size()).isEqualTo(2); + + RecordData nested = deserialized.getRow(2, 2); + assertThat(nested.getInt(0)).isEqualTo(77); + assertThat(nested.getString(1).toString()).isEqualTo("inner"); + } + + @Test + void testGenericRecordDataCopy() { + RecordDataSerializer serializer = RecordDataSerializer.INSTANCE; + + GenericRecordData record = + GenericRecordData.of( + 42, BinaryStringData.fromString("copy-test"), new byte[] {9, 8, 7}); + + RecordData copied = serializer.copy(record); + + assertThat(copied).isInstanceOf(GenericRecordData.class).isNotSameAs(record); + assertThat(copied.getInt(0)).isEqualTo(42); + assertThat(copied.getString(1).toString()).isEqualTo("copy-test"); + assertThat(copied.getBinary(2)).isEqualTo(new byte[] {9, 8, 7}); + assertThat(copied.getBinary(2)).isNotSameAs(record.getBinary(2)); + } } From 8978f6dcb53d69415eec675a9bc7d97cb56111ec Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Fri, 10 Apr 2026 17:59:13 +0800 Subject: [PATCH 2/9] fix: preserve DecimalType precision and scale --- .../data/GenericRecordDataSerializer.java | 13 +++---- .../data/RecordDataSerializerTest.java | 35 +++++++++++++++++++ 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java index 88d4c75b580..7dad52b45d9 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java @@ -39,8 +39,6 @@ import org.apache.flink.core.memory.DataOutputView; import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; import java.nio.charset.StandardCharsets; import java.util.LinkedHashMap; import java.util.Map; @@ -146,10 +144,10 @@ static void serializeField(Object field, DataOutputView target) throws IOExcepti } else if (field instanceof DecimalData) { target.writeByte(TAG_DECIMAL); DecimalData decimal = (DecimalData) field; - BigDecimal bd = decimal.toBigDecimal(); - target.writeInt(bd.precision()); - target.writeInt(bd.scale()); - byte[] unscaled = bd.unscaledValue().toByteArray(); + // Use DecimalData's precision/scale (SQL DECIMAL(p,s)) instead of BigDecimal's + target.writeInt(decimal.precision()); + target.writeInt(decimal.scale()); + byte[] unscaled = decimal.toUnscaledBytes(); target.writeInt(unscaled.length); target.write(unscaled); } else if (field instanceof TimestampData) { @@ -237,8 +235,7 @@ static Object deserializeField(DataInputView source) throws IOException { int len = source.readInt(); byte[] unscaled = new byte[len]; source.readFully(unscaled); - return DecimalData.fromBigDecimal( - new BigDecimal(new BigInteger(unscaled), scale), precision, scale); + return DecimalData.fromUnscaledBytes(unscaled, precision, scale); } case TAG_TIMESTAMP: return TimestampData.fromMillis(source.readLong(), source.readInt()); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java index 05d4e3ee3ac..ecaab8f2829 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java @@ -325,4 +325,39 @@ void testGenericRecordDataCopy() { assertThat(copied.getBinary(2)).isEqualTo(new byte[] {9, 8, 7}); assertThat(copied.getBinary(2)).isNotSameAs(record.getBinary(2)); } + + @Test + void testDecimalDataPreservesPrecisionAndScale() throws Exception { + RecordDataSerializer serializer = RecordDataSerializer.INSTANCE; + + DecimalData decimal1 = DecimalData.fromBigDecimal(new BigDecimal("1.23"), 20, 4); + DecimalData decimal2 = DecimalData.fromBigDecimal(new BigDecimal("42"), 15, 0); + DecimalData decimal3 = DecimalData.fromBigDecimal(new BigDecimal("0.0010"), 10, 4); + + GenericRecordData record = GenericRecordData.of(decimal1, decimal2, decimal3); + + DataOutputSerializer out = new DataOutputSerializer(256); + serializer.serialize(record, out); + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); + RecordData deserialized = serializer.deserialize(in); + + assertThat(deserialized).isInstanceOf(GenericRecordData.class); + + DecimalData dDecimal1 = deserialized.getDecimal(0, 20, 4); + assertThat(dDecimal1.precision()).isEqualTo(20); + assertThat(dDecimal1.scale()).isEqualTo(4); + assertThat(dDecimal1.toBigDecimal()).isEqualByComparingTo(new BigDecimal("1.23")); + + DecimalData dDecimal2 = deserialized.getDecimal(1, 15, 0); + assertThat(dDecimal2.precision()).isEqualTo(15); + assertThat(dDecimal2.scale()).isEqualTo(0); + assertThat(dDecimal2.toBigDecimal()).isEqualByComparingTo(new BigDecimal("42")); + + DecimalData dDecimal3 = deserialized.getDecimal(2, 10, 4); + assertThat(dDecimal3.precision()).isEqualTo(10); + assertThat(dDecimal3.scale()).isEqualTo(4); + assertThat(dDecimal3.toBigDecimal()).isEqualByComparingTo(new BigDecimal("0.0010")); + + assertThat(dDecimal2.isCompact()).isTrue(); + } } From e83b87d73afa03dc9d320f63e63dd4f0c51496eb Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Fri, 10 Apr 2026 18:06:04 +0800 Subject: [PATCH 3/9] nit: add assertions to avoid corruption --- .../runtime/serializer/data/writer/BinaryWriter.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java index 1dccf9bc1e4..b698c0845c4 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java @@ -38,6 +38,7 @@ import org.apache.flink.cdc.common.types.TimestampType; import org.apache.flink.cdc.common.types.ZonedTimestampType; import org.apache.flink.cdc.common.types.variant.Variant; +import org.apache.flink.cdc.common.utils.Preconditions; import org.apache.flink.cdc.runtime.serializer.NullableSerializerWrapper; import org.apache.flink.cdc.runtime.serializer.data.ArrayDataSerializer; import org.apache.flink.cdc.runtime.serializer.data.MapDataSerializer; @@ -180,8 +181,14 @@ static void write( if (!(recordData instanceof BinaryRecordData)) { RowType rowType = (RowType) type; List childTypes = rowType.getChildren(); - Object[] fields = new Object[recordData.getArity()]; - for (int i = 0; i < fields.length; i++) { + int arity = recordData.getArity(); + Preconditions.checkArgument( + arity == childTypes.size(), + "RecordData arity (%s) does not match row type field count (%s)", + arity, + childTypes.size()); + Object[] fields = new Object[arity]; + for (int i = 0; i < arity; i++) { fields[i] = RecordData.createFieldGetter(childTypes.get(i), i) .getFieldOrNull(recordData); From 85bec6bfa6c043b69fa7fc21831b161b1cfd8516 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Fri, 10 Apr 2026 18:07:56 +0800 Subject: [PATCH 4/9] nit: fail fast --- .../data/GenericRecordDataSerializer.java | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java index 7dad52b45d9..92a761e6b82 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java @@ -281,14 +281,6 @@ private static void serializeArrayData(ArrayData arrayData, DataOutputView targe serializeField(element, target); } } else { - // For BinaryArrayData or other implementations, convert to object array via getters - int size = arrayData.size(); - target.writeInt(size); - // We serialize each element as a generic object; since we don't know element types, - // we attempt to read them generically. For binary array data, the safest approach is - // to serialize the raw binary data. - // However, we don't have direct access to the underlying binary data in a generic way. - // So we fall back to reading elements as objects. throw new IOException( "Serialization of non-generic ArrayData is not supported in GenericRecordDataSerializer. " + "Actual type: " @@ -312,19 +304,19 @@ private static void serializeMapData(MapData mapData, DataOutputView target) if (mapData instanceof GenericMapData) { ArrayData keyArray = mapData.keyArray(); ArrayData valueArray = mapData.valueArray(); - int size = mapData.size(); - target.writeInt(size); - if (keyArray instanceof GenericArrayData && valueArray instanceof GenericArrayData) { - Object[] keys = ((GenericArrayData) keyArray).toObjectArray(); - Object[] values = ((GenericArrayData) valueArray).toObjectArray(); - for (int i = 0; i < size; i++) { - serializeField(keys[i], target); - serializeField(values[i], target); - } - } else { + if (!(keyArray instanceof GenericArrayData) + || !(valueArray instanceof GenericArrayData)) { throw new IOException( "MapData with non-generic key/value arrays is not supported in GenericRecordDataSerializer."); } + int size = mapData.size(); + target.writeInt(size); + Object[] keys = ((GenericArrayData) keyArray).toObjectArray(); + Object[] values = ((GenericArrayData) valueArray).toObjectArray(); + for (int i = 0; i < size; i++) { + serializeField(keys[i], target); + serializeField(values[i], target); + } } else { throw new IOException( "Serialization of non-generic MapData is not supported in GenericRecordDataSerializer. " From d684a83f2be10a8946d11d2ca08974e476ae13aa Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Fri, 10 Apr 2026 18:09:51 +0800 Subject: [PATCH 5/9] optimize: avoid UTF-8 encode / decode --- .../runtime/serializer/data/GenericRecordDataSerializer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java index 92a761e6b82..52c6178da37 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java @@ -133,7 +133,7 @@ static void serializeField(Object field, DataOutputView target) throws IOExcepti target.writeDouble((Double) field); } else if (field instanceof StringData) { target.writeByte(TAG_STRING); - byte[] bytes = field.toString().getBytes(StandardCharsets.UTF_8); + byte[] bytes = ((StringData) field).toBytes(); target.writeInt(bytes.length); target.write(bytes); } else if (field instanceof byte[]) { @@ -219,7 +219,7 @@ static Object deserializeField(DataInputView source) throws IOException { int len = source.readInt(); byte[] bytes = new byte[len]; source.readFully(bytes); - return BinaryStringData.fromString(new String(bytes, StandardCharsets.UTF_8)); + return BinaryStringData.fromBytes(bytes); } case TAG_BINARY: { From 86123412fcf55e9bd55dfba8e7769cf4cde756bb Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Mon, 13 Apr 2026 09:22:16 +0800 Subject: [PATCH 6/9] add map data type guard --- .../data/GenericRecordDataSerializer.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java index 52c6178da37..e3c589a1d8c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java @@ -387,16 +387,21 @@ private static Object copyField(Object field) { GenericMapData mapData = (GenericMapData) field; ArrayData keyArray = mapData.keyArray(); ArrayData valueArray = mapData.valueArray(); - if (keyArray instanceof GenericArrayData && valueArray instanceof GenericArrayData) { - Object[] keys = ((GenericArrayData) keyArray).toObjectArray(); - Object[] values = ((GenericArrayData) valueArray).toObjectArray(); - Map newMap = new LinkedHashMap<>(keys.length); - for (int i = 0; i < keys.length; i++) { - newMap.put(copyField(keys[i]), copyField(values[i])); - } - return new GenericMapData(newMap); + if (!(keyArray instanceof GenericArrayData) + || !(valueArray instanceof GenericArrayData)) { + throw new IllegalArgumentException( + "Expected GenericArrayData for key and value arrays in GenericMapData, but got: keyArray=" + + keyArray.getClass().getName() + + ", valueArray=" + + valueArray.getClass().getName()); + } + Object[] keys = ((GenericArrayData) keyArray).toObjectArray(); + Object[] values = ((GenericArrayData) valueArray).toObjectArray(); + Map newMap = new LinkedHashMap<>(keys.length); + for (int i = 0; i < keys.length; i++) { + newMap.put(copyField(keys[i]), copyField(values[i])); } - return field; + return new GenericMapData(newMap); } // Immutable types: Boolean, Byte, Short, Integer, Long, Float, Double, // StringData, DecimalData, TimestampData, ZonedTimestampData, From 8a73f92b202ee2df6fa8ab5430753f5ca8e5e2c9 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Mon, 13 Apr 2026 09:28:38 +0800 Subject: [PATCH 7/9] make tag fields private --- .../data/GenericRecordDataSerializer.java | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java index e3c589a1d8c..b6854a8524d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java @@ -51,27 +51,27 @@ public class GenericRecordDataSerializer { // Type tags for self-describing format - static final byte TAG_NULL = 0; - static final byte TAG_BOOLEAN = 1; - static final byte TAG_BYTE = 2; - static final byte TAG_SHORT = 3; - static final byte TAG_INT = 4; - static final byte TAG_LONG = 5; - static final byte TAG_FLOAT = 6; - static final byte TAG_DOUBLE = 7; - static final byte TAG_STRING = 8; - static final byte TAG_BINARY = 9; - static final byte TAG_DECIMAL = 10; - static final byte TAG_TIMESTAMP = 11; - static final byte TAG_ZONED_TIMESTAMP = 12; - static final byte TAG_LOCAL_ZONED_TIMESTAMP = 13; - static final byte TAG_DATE = 14; - static final byte TAG_TIME = 15; - static final byte TAG_GENERIC_RECORD = 16; - static final byte TAG_BINARY_RECORD = 17; - static final byte TAG_ARRAY = 18; - static final byte TAG_MAP = 19; - static final byte TAG_VARIANT = 20; + private static final byte TAG_NULL = 0; + private static final byte TAG_BOOLEAN = 1; + private static final byte TAG_BYTE = 2; + private static final byte TAG_SHORT = 3; + private static final byte TAG_INT = 4; + private static final byte TAG_LONG = 5; + private static final byte TAG_FLOAT = 6; + private static final byte TAG_DOUBLE = 7; + private static final byte TAG_STRING = 8; + private static final byte TAG_BINARY = 9; + private static final byte TAG_DECIMAL = 10; + private static final byte TAG_TIMESTAMP = 11; + private static final byte TAG_ZONED_TIMESTAMP = 12; + private static final byte TAG_LOCAL_ZONED_TIMESTAMP = 13; + private static final byte TAG_DATE = 14; + private static final byte TAG_TIME = 15; + private static final byte TAG_GENERIC_RECORD = 16; + private static final byte TAG_BINARY_RECORD = 17; + private static final byte TAG_ARRAY = 18; + private static final byte TAG_MAP = 19; + private static final byte TAG_VARIANT = 20; private GenericRecordDataSerializer() {} From 85e79d64f5d697c4724a5ee47043039b7c6c23b1 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Mon, 13 Apr 2026 09:40:24 +0800 Subject: [PATCH 8/9] respect reuse option --- .../serializer/data/GenericRecordDataSerializer.java | 11 ++++++++--- .../runtime/serializer/data/RecordDataSerializer.java | 6 +++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java index b6854a8524d..4f5c406fb30 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/GenericRecordDataSerializer.java @@ -97,12 +97,17 @@ public static GenericRecordData deserialize(DataInputView source) throws IOExcep /** Creates a deep copy of the given {@link GenericRecordData}. */ public static GenericRecordData copy(GenericRecordData from) { + return copy(from, new GenericRecordData(from.getArity())); + } + + public static GenericRecordData copy(GenericRecordData from, GenericRecordData reuse) { int arity = from.getArity(); - GenericRecordData copy = new GenericRecordData(arity); + GenericRecordData target = + (reuse.getArity() == arity) ? reuse : new GenericRecordData(arity); for (int i = 0; i < arity; i++) { - copy.setField(i, copyField(from.getField(i))); + target.setField(i, copyField(from.getField(i))); } - return copy; + return target; } // ---- Field-level serialization ---- diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java index f2bb92a3293..42948e3fa0a 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializer.java @@ -110,7 +110,11 @@ public RecordData copy(RecordData from, RecordData reuse) { : new BinaryRecordData(from.getArity()); return ((BinaryRecordData) from).copy(reuseRecord); } else if (from instanceof GenericRecordData) { - return GenericRecordDataSerializer.copy((GenericRecordData) from); + GenericRecordData reuseRecord = + (reuse instanceof GenericRecordData) + ? (GenericRecordData) reuse + : new GenericRecordData(from.getArity()); + return GenericRecordDataSerializer.copy((GenericRecordData) from, reuseRecord); } else { throw new RuntimeException("Unsupported RecordData type: " + from.getClass().getName()); } From 8e9568c9008d07a0ab514c8808ae6107648746be Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Mon, 13 Apr 2026 09:44:51 +0800 Subject: [PATCH 9/9] add tests --- .../serializer/data/RecordDataSerializerTest.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java index ecaab8f2829..15cbde474ec 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/serializer/data/RecordDataSerializerTest.java @@ -94,6 +94,9 @@ void testGenericRecordDataWithVariousTypes() throws Exception { DecimalData.fromBigDecimal(new BigDecimal("12345.6789"), 10, 4), TimestampData.fromMillis(1609459200000L, 123456), LocalZonedTimestampData.fromEpochMillis(1609459200000L, 654321), + ZonedTimestampData.of(1609459200000L, 789012, "UTC"), + DateData.fromEpochDay(18628), + TimeData.fromMillisOfDay(43200000), null); DataOutputSerializer out = new DataOutputSerializer(256); @@ -118,7 +121,13 @@ void testGenericRecordDataWithVariousTypes() throws Exception { assertThat(deserialized.getTimestamp(10, 6).getNanoOfMillisecond()).isEqualTo(123456); assertThat(deserialized.getLocalZonedTimestampData(11, 6).getEpochMillisecond()) .isEqualTo(1609459200000L); - assertThat(deserialized.isNullAt(12)).isTrue(); + assertThat(deserialized.getZonedTimestamp(12, 6).getMillisecond()) + .isEqualTo(1609459200000L); + assertThat(deserialized.getZonedTimestamp(12, 6).getNanoOfMillisecond()).isEqualTo(789012); + assertThat(deserialized.getZonedTimestamp(12, 6).getZoneId()).isEqualTo("UTC"); + assertThat(deserialized.getDate(13).toEpochDay()).isEqualTo(18628); + assertThat(deserialized.getTime(14).toMillisOfDay()).isEqualTo(43200000); + assertThat(deserialized.isNullAt(15)).isTrue(); } @Test