Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -339,26 +339,40 @@ private RowData convertRowData(Map<String, ?> row, LogicalType type) {
}

private static List<Object> convertArrayData(ArrayData array, LogicalType type) {
LogicalType elementType = ((ArrayType) type).getElementType();
List<Object> values;
if (array instanceof GenericArrayData) {
return Arrays.asList(((GenericArrayData) array).toObjectArray());
values = Arrays.asList(((GenericArrayData) array).toObjectArray());
Comment thread
JNSimba marked this conversation as resolved.
} else if (array instanceof BinaryArrayData) {
values = Arrays.asList(((BinaryArrayData) array).toObjectArray(elementType));
} else {
throw new UnsupportedOperationException("Unsupported array data: " + array.getClass());
}
if (array instanceof BinaryArrayData) {
LogicalType elementType = ((ArrayType) type).getElementType();
List<Object> values =
Arrays.asList(((BinaryArrayData) array).toObjectArray(elementType));
if (LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) {
return values.stream()
.map(date -> Date.valueOf(LocalDate.ofEpochDay((Integer) date)))
.collect(Collectors.toList());
}
if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) {
return values.stream()
.map(arr -> convertArrayData((ArrayData) arr, elementType))
.collect(Collectors.toList());
}
return values;

if (LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) {
return values.stream()
.map(date -> Date.valueOf(LocalDate.ofEpochDay((Integer) date)))
.collect(Collectors.toList());
}
if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) {
return values.stream()
.map(arr -> convertArrayData((ArrayData) arr, elementType))
.collect(Collectors.toList());
}
if (LogicalTypeRoot.MAP.equals(elementType.getTypeRoot())) {
return values.stream()
.map(arr -> writeValueAsString(convertMapData((MapData) arr, elementType)))
.collect(Collectors.toList());
}
if (LogicalTypeRoot.ROW.equals(elementType.getTypeRoot())) {
return values.stream()
.map(
arr ->
writeValueAsString(
convertRowData(GenericRowData.of(arr), 0, elementType)))
.collect(Collectors.toList());
}
throw new UnsupportedOperationException("Unsupported array data: " + array.getClass());
return values;
}

private static Object convertMapData(MapData map, LogicalType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
Expand Down Expand Up @@ -366,4 +367,60 @@ public static ResolvedSchema getRowMapSchema() {
Column.physical(
"f16", DataTypes.MAP(DataTypes.VARCHAR(256), DataTypes.VARCHAR(256))));
}

@Test
public void testArrayExternalConvert() {
ResolvedSchema schema =
ResolvedSchema.of(
// list with string
Column.physical("f1", DataTypes.ARRAY(DataTypes.STRING())),
// list with list
Column.physical("f2", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))),
// list with row
Column.physical(
"f3",
DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.FIELD("l1", DataTypes.STRING()),
DataTypes.FIELD("l2", DataTypes.INT())))),
// list with map
Column.physical(
"f4",
DataTypes.ARRAY(
DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))),
// list with date
Column.physical("f5", DataTypes.ARRAY(DataTypes.DATE())));

DorisRowConverter converter =
new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());

Map<String, Integer> mapData1 = createMapAndPut(new HashMap<>(), "hello", 1);
Map<String, Integer> mapData2 = createMapAndPut(new HashMap<>(), "world", 2);
GenericRowData rowData =
GenericRowData.of(
new GenericArrayData(new String[] {"1", "2", "3"}),
new GenericArrayData(
new GenericArrayData[] {
new GenericArrayData(new String[] {"1", "2", "3"}),
new GenericArrayData(new String[] {"4", "5", "6"})
}),
new GenericArrayData(
new GenericRowData[] {
GenericRowData.of(StringData.fromString("on"), 1),
GenericRowData.of(StringData.fromString("off"), 2)
}),
new GenericArrayData(
new GenericMapData[] {
new GenericMapData(mapData1), new GenericMapData(mapData2)
}),
new GenericArrayData(new int[] {1, 2, 3}));

List<Object> row = new ArrayList<>();
for (int i = 0; i < rowData.getArity(); i++) {
row.add(converter.convertExternal(rowData, i));
}
String expected =
"[[1, 2, 3], [[1, 2, 3], [4, 5, 6]], [{\"l1\":\"on\",\"l2\":\"1\"}, {\"l1\":\"off\",\"l2\":\"2\"}], [{\"hello\":\"1\"}, {\"world\":\"2\"}], [1970-01-02, 1970-01-03, 1970-01-04]]";
Assert.assertEquals(expected, row.toString());
}
}