diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java index 130d2363b..ff2ee81d2 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java @@ -339,26 +339,40 @@ private RowData convertRowData(Map row, LogicalType type) { } private static List convertArrayData(ArrayData array, LogicalType type) { + LogicalType elementType = ((ArrayType) type).getElementType(); + List values; if (array instanceof GenericArrayData) { - return Arrays.asList(((GenericArrayData) array).toObjectArray()); + values = Arrays.asList(((GenericArrayData) array).toObjectArray()); + } else if (array instanceof BinaryArrayData) { + values = Arrays.asList(((BinaryArrayData) array).toObjectArray(elementType)); + } else { + throw new UnsupportedOperationException("Unsupported array data: " + array.getClass()); } - if (array instanceof BinaryArrayData) { - LogicalType elementType = ((ArrayType) type).getElementType(); - List values = - Arrays.asList(((BinaryArrayData) array).toObjectArray(elementType)); - if (LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) { - return values.stream() - .map(date -> Date.valueOf(LocalDate.ofEpochDay((Integer) date))) - .collect(Collectors.toList()); - } - if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) { - return values.stream() - .map(arr -> convertArrayData((ArrayData) arr, elementType)) - .collect(Collectors.toList()); - } - return values; + + if (LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) { + return values.stream() + .map(date -> Date.valueOf(LocalDate.ofEpochDay((Integer) date))) + .collect(Collectors.toList()); + } + if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) { + return values.stream() + .map(arr -> convertArrayData((ArrayData) arr, elementType)) + .collect(Collectors.toList()); + } + if (LogicalTypeRoot.MAP.equals(elementType.getTypeRoot())) { + return values.stream() + .map(arr -> writeValueAsString(convertMapData((MapData) arr, elementType))) + .collect(Collectors.toList()); + } + if (LogicalTypeRoot.ROW.equals(elementType.getTypeRoot())) { + return values.stream() + .map( + arr -> + writeValueAsString( + convertRowData(GenericRowData.of(arr), 0, elementType))) + .collect(Collectors.toList()); } - throw new UnsupportedOperationException("Unsupported array data: " + array.getClass()); + return values; } private static Object convertMapData(MapData map, LogicalType type) { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java index 46e47d585..100356b3d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java @@ -21,6 +21,7 @@ import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; @@ -366,4 +367,60 @@ public static ResolvedSchema getRowMapSchema() { Column.physical( "f16", DataTypes.MAP(DataTypes.VARCHAR(256), DataTypes.VARCHAR(256)))); } + + @Test + public void testArrayExternalConvert() { + ResolvedSchema schema = + ResolvedSchema.of( + // list with string + Column.physical("f1", DataTypes.ARRAY(DataTypes.STRING())), + // list with list + Column.physical("f2", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))), + // list with row + Column.physical( + "f3", + DataTypes.ARRAY( + DataTypes.ROW( + DataTypes.FIELD("l1", DataTypes.STRING()), + DataTypes.FIELD("l2", DataTypes.INT())))), + // list with map + Column.physical( + "f4", + DataTypes.ARRAY( + DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))), + // list with date + Column.physical("f5", DataTypes.ARRAY(DataTypes.DATE()))); + + DorisRowConverter converter = + new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType()); + + Map mapData1 = createMapAndPut(new HashMap<>(), "hello", 1); + Map mapData2 = createMapAndPut(new HashMap<>(), "world", 2); + GenericRowData rowData = + GenericRowData.of( + new GenericArrayData(new String[] {"1", "2", "3"}), + new GenericArrayData( + new GenericArrayData[] { + new GenericArrayData(new String[] {"1", "2", "3"}), + new GenericArrayData(new String[] {"4", "5", "6"}) + }), + new GenericArrayData( + new GenericRowData[] { + GenericRowData.of(StringData.fromString("on"), 1), + GenericRowData.of(StringData.fromString("off"), 2) + }), + new GenericArrayData( + new GenericMapData[] { + new GenericMapData(mapData1), new GenericMapData(mapData2) + }), + new GenericArrayData(new int[] {1, 2, 3})); + + List row = new ArrayList<>(); + for (int i = 0; i < rowData.getArity(); i++) { + row.add(converter.convertExternal(rowData, i)); + } + String expected = + "[[1, 2, 3], [[1, 2, 3], [4, 5, 6]], [{\"l1\":\"on\",\"l2\":\"1\"}, {\"l1\":\"off\",\"l2\":\"2\"}], [{\"hello\":\"1\"}, {\"world\":\"2\"}], [1970-01-02, 1970-01-03, 1970-01-04]]"; + Assert.assertEquals(expected, row.toString()); + } }