Skip to content

Commit 343778f

Browse files
haruki-830春栖
andauthored
[FLINK-39756] Fix unable to coerce complex types to STRING (#4414)
Added complex data type coercion support (Array, Map, Record) to SchemaMergingUtils, along with corresponding unit tests to verify these conversion behaviors. Also included related scenario tests in integration tests. Additionally, fixed missing imports for ArrayData and JavaObjectConverter classes that were causing compilation errors during the complex type string coercion process. Co-authored-by: 春栖 <chunxi.mjy@U-4KXDP7CK-0015.local>
1 parent 26e6b73 commit 343778f

3 files changed

Lines changed: 180 additions & 1 deletion

File tree

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020
import org.apache.flink.api.java.tuple.Tuple2;
2121
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2222
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
23+
import org.apache.flink.cdc.common.converter.JavaObjectConverter;
24+
import org.apache.flink.cdc.common.data.ArrayData;
2325
import org.apache.flink.cdc.common.data.DateData;
2426
import org.apache.flink.cdc.common.data.DecimalData;
2527
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
28+
import org.apache.flink.cdc.common.data.MapData;
29+
import org.apache.flink.cdc.common.data.RecordData;
2630
import org.apache.flink.cdc.common.data.StringData;
2731
import org.apache.flink.cdc.common.data.TimeData;
2832
import org.apache.flink.cdc.common.data.TimestampData;
@@ -609,6 +613,12 @@ private static Object coerceToString(Object originalField, DataType originalType
609613
return BinaryStringData.fromString(((Variant) originalField).toJson());
610614
}
611615

616+
if (originalField instanceof MapData
617+
|| originalField instanceof ArrayData
618+
|| originalField instanceof RecordData) {
619+
Object javaObject = JavaObjectConverter.convertToJava(originalField, originalType);
620+
return BinaryStringData.fromString(javaObject.toString());
621+
}
612622
return BinaryStringData.fromString(originalField.toString());
613623
}
614624

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaMergingUtilsTest.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,15 @@
2121
import org.apache.flink.api.java.tuple.Tuple4;
2222
import org.apache.flink.cdc.common.data.DateData;
2323
import org.apache.flink.cdc.common.data.DecimalData;
24+
import org.apache.flink.cdc.common.data.GenericArrayData;
25+
import org.apache.flink.cdc.common.data.GenericMapData;
26+
import org.apache.flink.cdc.common.data.GenericRecordData;
2427
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2528
import org.apache.flink.cdc.common.data.TimeData;
2629
import org.apache.flink.cdc.common.data.TimestampData;
2730
import org.apache.flink.cdc.common.data.ZonedTimestampData;
31+
import org.apache.flink.cdc.common.data.binary.BinaryArrayData;
32+
import org.apache.flink.cdc.common.data.binary.BinaryMapData;
2833
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
2934
import org.apache.flink.cdc.common.event.AddColumnEvent;
3035
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
@@ -823,7 +828,29 @@ DATE, dateOf(2020, 4, 4), TIMESTAMP_TZ, zTsOf("2020", "04", "04")),
823828
TIMESTAMP_TZ,
824829
zTsOf("2019", "02", "02"),
825830
STRING,
826-
binStrOf("2019-02-02T00:00:00Z")));
831+
binStrOf("2019-02-02T00:00:00Z")),
832+
833+
// From ARRAY
834+
Tuple4.of(
835+
ARRAY,
836+
new GenericArrayData(
837+
new Object[] {binStrOf("hello"), binStrOf("world")}),
838+
STRING,
839+
binStrOf("[hello, world]")),
840+
841+
// From MAP
842+
Tuple4.of(
843+
MAP,
844+
new GenericMapData(Collections.singletonMap(42, binStrOf("value"))),
845+
STRING,
846+
binStrOf("{42=value}")),
847+
848+
// From ROW
849+
Tuple4.of(
850+
ROW,
851+
GenericRecordData.of(42, binStrOf("Alice")),
852+
STRING,
853+
binStrOf("[42, Alice]")));
827854

828855
conversionExpects.forEach(
829856
rule ->
@@ -832,6 +859,26 @@ DATE, dateOf(2020, 4, 4), TIMESTAMP_TZ, zTsOf("2020", "04", "04")),
832859
.isEqualTo(rule.f3));
833860
}
834861

862+
@Test
863+
void testCoerceObjectBinaryTypes() {
864+
DataType intArrayType = DataTypes.ARRAY(DataTypes.INT());
865+
DataType intIntMapType = DataTypes.MAP(DataTypes.INT(), DataTypes.INT());
866+
867+
// BinaryArrayData to STRING
868+
BinaryArrayData binaryArray = BinaryArrayData.fromPrimitiveArray(new int[] {1, 2, 3});
869+
Assertions.assertThat(coerceObject("UTC", binaryArray, intArrayType, STRING))
870+
.as("Try coercing BinaryArrayData to STRING")
871+
.isEqualTo(binStrOf("[1, 2, 3]"));
872+
873+
// BinaryMapData to STRING
874+
BinaryArrayData keys = BinaryArrayData.fromPrimitiveArray(new int[] {10});
875+
BinaryArrayData values = BinaryArrayData.fromPrimitiveArray(new int[] {100});
876+
BinaryMapData binaryMap = BinaryMapData.valueOf(keys, values);
877+
Assertions.assertThat(coerceObject("UTC", binaryMap, intIntMapType, STRING))
878+
.as("Try coercing BinaryMapData to STRING")
879+
.isEqualTo(binStrOf("{10=100}"));
880+
}
881+
835882
@Test
836883
void testCoerceRow() {
837884
Assertions.assertThat(

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
import org.apache.flink.cdc.common.configuration.Configuration;
2121
import org.apache.flink.cdc.common.data.DecimalData;
22+
import org.apache.flink.cdc.common.data.GenericArrayData;
23+
import org.apache.flink.cdc.common.data.GenericMapData;
24+
import org.apache.flink.cdc.common.data.GenericRecordData;
2225
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2326
import org.apache.flink.cdc.common.data.TimestampData;
2427
import org.apache.flink.cdc.common.data.ZonedTimestampData;
@@ -886,6 +889,125 @@ void testMergingWithRouteInBatchMode(ValuesDataSink.SinkApi sinkApi) throws Exce
886889
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[4, Donald, 25, student], op=INSERT, meta=()}");
887890
}
888891

892+
@ParameterizedTest
893+
@EnumSource
894+
void testMergingComplexTypesWithRouteInBatchMode(ValuesDataSink.SinkApi sinkApi)
895+
throws Exception {
896+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
897+
898+
// Setup value source
899+
Configuration sourceConfig = new Configuration();
900+
sourceConfig.set(
901+
ValuesDataSourceOptions.EVENT_SET_ID,
902+
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
903+
sourceConfig.set(ValuesDataSourceOptions.BATCH_MODE_ENABLED, true);
904+
905+
TableId myTable1 = TableId.tableId("default_namespace", "default_schema", "mytable1");
906+
TableId myTable2 = TableId.tableId("default_namespace", "default_schema", "mytable2");
907+
908+
// Table 1 has complex types: ARRAY, MAP, ROW
909+
Schema table1Schema =
910+
Schema.newBuilder()
911+
.physicalColumn("id", DataTypes.INT())
912+
.physicalColumn("arr", DataTypes.ARRAY(DataTypes.INT()))
913+
.physicalColumn("mp", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))
914+
.physicalColumn("rw", DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()))
915+
.primaryKey("id")
916+
.build();
917+
918+
// Table 2 has STRING columns at the same positions, forcing coercion
919+
Schema table2Schema =
920+
Schema.newBuilder()
921+
.physicalColumn("id", DataTypes.INT())
922+
.physicalColumn("arr", DataTypes.STRING())
923+
.physicalColumn("mp", DataTypes.STRING())
924+
.physicalColumn("rw", DataTypes.STRING())
925+
.primaryKey("id")
926+
.build();
927+
928+
BinaryRecordDataGenerator table1dataGenerator =
929+
new BinaryRecordDataGenerator(
930+
table1Schema.getColumnDataTypes().toArray(new DataType[0]));
931+
BinaryRecordDataGenerator table2dataGenerator =
932+
new BinaryRecordDataGenerator(
933+
table2Schema.getColumnDataTypes().toArray(new DataType[0]));
934+
935+
List<Event> events = new ArrayList<>();
936+
events.add(new CreateTableEvent(myTable1, table1Schema));
937+
events.add(new CreateTableEvent(myTable2, table2Schema));
938+
939+
// Table 1: insert with complex typed data
940+
events.add(
941+
DataChangeEvent.insertEvent(
942+
myTable1,
943+
table1dataGenerator.generate(
944+
new Object[] {
945+
1,
946+
new GenericArrayData(new int[] {10, 20, 30}),
947+
new GenericMapData(
948+
Collections.singletonMap(
949+
BinaryStringData.fromString("key"), 42)),
950+
GenericRecordData.of(7, BinaryStringData.fromString("hello"))
951+
})));
952+
953+
// Table 2: insert with plain strings
954+
events.add(
955+
DataChangeEvent.insertEvent(
956+
myTable2,
957+
table2dataGenerator.generate(
958+
new Object[] {
959+
2,
960+
BinaryStringData.fromString("plain_arr"),
961+
BinaryStringData.fromString("plain_mp"),
962+
BinaryStringData.fromString("plain_rw")
963+
})));
964+
965+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
966+
967+
SourceDef sourceDef =
968+
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
969+
970+
// Setup value sink
971+
Configuration sinkConfig = new Configuration();
972+
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
973+
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
974+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
975+
976+
// Setup route
977+
TableId mergedTable = TableId.tableId("default_namespace", "default_schema", "merged");
978+
List<RouteDef> routeDef =
979+
Collections.singletonList(
980+
new RouteDef(
981+
"default_namespace.default_schema.mytable[0-9]",
982+
mergedTable.toString(),
983+
null,
984+
null));
985+
986+
// Setup pipeline
987+
Configuration pipelineConfig = new Configuration();
988+
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
989+
pipelineConfig.set(
990+
PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE, RuntimeExecutionMode.BATCH);
991+
PipelineDef pipelineDef =
992+
new PipelineDef(
993+
sourceDef,
994+
sinkDef,
995+
routeDef,
996+
Collections.emptyList(),
997+
Collections.emptyList(),
998+
pipelineConfig);
999+
1000+
// Execute the pipeline
1001+
PipelineExecution execution = composer.compose(pipelineDef);
1002+
execution.execute();
1003+
String[] outputEvents = outCaptor.toString().trim().split("\n");
1004+
assertThat(outputEvents)
1005+
.containsExactly(
1006+
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`arr` STRING,`mp` STRING,`rw` STRING}, primaryKeys=id, options=()}",
1007+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, [10, 20, 30], {key=42}, [7, hello]], op=INSERT, meta=()}",
1008+
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, plain_arr, plain_mp, plain_rw], op=INSERT, meta=()}");
1009+
}
1010+
8891011
@ParameterizedTest
8901012
@EnumSource
8911013
void testTransformMergingWithRoute(ValuesDataSink.SinkApi sinkApi) throws Exception {

0 commit comments

Comments
 (0)