Skip to content

Commit 6b505ee

Browse files
fix failed cases
1 parent 4cb71d4 commit 6b505ee

File tree

4 files changed

+45
-7
lines changed

4 files changed

+45
-7
lines changed

fluss-common/src/main/java/org/apache/fluss/utils/ArrowUtils.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,14 @@ private static Field toArrowField(String fieldName, DataType logicalType) {
452452
}
453453
} else if (logicalType instanceof MapType) {
454454
MapType mapType = (MapType) logicalType;
455-
Field keyField = toArrowField("key", mapType.getKeyType());
455+
DataType keyType = mapType.getKeyType();
456+
if (keyType.isNullable()) {
457+
throw new IllegalArgumentException(
458+
String.format(
459+
"Map key type must be non-nullable for Arrow conversion, but got: %s",
460+
keyType.asSummaryString()));
461+
}
462+
Field keyField = toArrowField("key", keyType);
456463
Field valueField = toArrowField("value", mapType.getValueType());
457464
FieldType structFieldType = new FieldType(false, ArrowType.Struct.INSTANCE, null);
458465
List<Field> structChildren = new ArrayList<>();

fluss-common/src/main/java/org/apache/fluss/utils/json/DataTypeJsonSerde.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,8 @@ private static DataType deserializeMap(JsonNode dataTypeNode) {
285285
final DataType keyType = DataTypeJsonSerde.INSTANCE.deserialize(keyNode);
286286
final JsonNode valueNode = dataTypeNode.get(FIELD_NAME_VALUE_TYPE);
287287
final DataType valueType = DataTypeJsonSerde.INSTANCE.deserialize(valueNode);
288-
return new MapType(keyType, valueType);
288+
final DataType nonNullableKeyType = keyType.isNullable() ? keyType.copy(false) : keyType;
289+
return new MapType(nonNullableKeyType, valueType);
289290
}
290291

291292
private static DataType deserializeRow(JsonNode dataTypeNode) {

fluss-common/src/test/java/org/apache/fluss/utils/json/DataTypeJsonSerdeTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ protected DataType[] createObjects() {
7676
new LocalZonedTimestampType(),
7777
new LocalZonedTimestampType(3),
7878
new ArrayType(new IntType(false)),
79-
new MapType(new BigIntType(), new IntType(false)),
79+
new MapType(new BigIntType(false), new IntType(false)),
8080
RowType.of(new BigIntType(), new IntType(false), new StringType()));
8181

8282
final List<DataType> allTypes = new ArrayList<>();
@@ -137,8 +137,8 @@ protected String[] expectedJsons() {
137137
"{\"type\":\"TIMESTAMP_WITH_LOCAL_TIME_ZONE\",\"nullable\":false,\"precision\":3}",
138138
"{\"type\":\"ARRAY\",\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
139139
"{\"type\":\"ARRAY\",\"nullable\":false,\"element_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
140-
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\"},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
141-
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\"},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
140+
"{\"type\":\"MAP\",\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
141+
"{\"type\":\"MAP\",\"nullable\":false,\"key_type\":{\"type\":\"BIGINT\",\"nullable\":false},\"value_type\":{\"type\":\"INTEGER\",\"nullable\":false}}",
142142
"{\"type\":\"ROW\",\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
143143
"{\"type\":\"ROW\",\"nullable\":false,\"fields\":[{\"name\":\"f0\",\"field_type\":{\"type\":\"BIGINT\"}},{\"name\":\"f1\",\"field_type\":{\"type\":\"INTEGER\",\"nullable\":false}},{\"name\":\"f2\",\"field_type\":{\"type\":\"STRING\"}}]}",
144144
};

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkComplexTypeITCase.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ void testMapTypesInLogTable() throws Exception {
323323
+ "MAP['arr1', ARRAY[1, 2, 3]], "
324324
+ "MAP[1, ROW(100, 'row1')]), "
325325
+ "(2, "
326-
+ "MAP[3, CAST(NULL AS STRING)], "
326+
+ "MAP[3, 'three'], "
327327
+ "MAP['k3', MAP[40, 'v4']], "
328328
+ "MAP['arr3', ARRAY[6]], "
329329
+ "MAP[3, ROW(300, 'row3')]), "
@@ -338,7 +338,7 @@ void testMapTypesInLogTable() throws Exception {
338338
List<String> expectedRows =
339339
Arrays.asList(
340340
"+I[1, {1=one}, {k1={10=v1}}, {arr1=[1, 2, 3]}, {1=+I[100, row1]}]",
341-
"+I[2, {3=null}, {k3={40=v4}}, {arr3=[6]}, {3=+I[300, row3]}]",
341+
"+I[2, {3=three}, {k3={40=v4}}, {arr3=[6]}, {3=+I[300, row3]}]",
342342
"+I[3, null, {k4={50=v5}}, {arr4=[7, 8, 9]}, {4=+I[400, row4]}]");
343343
assertResultsIgnoreOrder(rowIter, expectedRows, true);
344344
}
@@ -376,6 +376,36 @@ void testMapTypesInPartitionedLogTable() throws Exception {
376376
assertResultsIgnoreOrder(rowIter, expectedRows, true);
377377
}
378378

379+
@Test
380+
void testMapWithNullValue() throws Exception {
381+
tEnv.executeSql(
382+
"create table map_null_test ("
383+
+ "id int, "
384+
+ "simple_map map<int, string>, "
385+
+ "nested_map map<string, map<int, string>>, "
386+
+ "map_with_array_value map<string, array<int>>"
387+
+ ") with ('bucket.num' = '3')");
388+
389+
tEnv.executeSql(
390+
"INSERT INTO map_null_test VALUES "
391+
+ "(1, "
392+
+ "MAP[1, CAST(NULL AS STRING)], "
393+
+ "MAP['k1', MAP[10, CAST(NULL AS STRING)]], "
394+
+ "MAP['arr1', CAST(NULL AS ARRAY<INT>)]), "
395+
+ "(2, "
396+
+ "MAP[2, 'two', 3, CAST(NULL AS STRING)], "
397+
+ "MAP['k2', CAST(NULL AS MAP<INT, STRING>)], "
398+
+ "MAP['arr2', ARRAY[1, 2], 'arr3', CAST(NULL AS ARRAY<INT>)])")
399+
.await();
400+
401+
CloseableIterator<Row> rowIter = tEnv.executeSql("select * from map_null_test").collect();
402+
List<String> expectedRows =
403+
Arrays.asList(
404+
"+I[1, {1=null}, {k1={10=null}}, {arr1=null}]",
405+
"+I[2, {2=two, 3=null}, {k2=null}, {arr2=[1, 2], arr3=null}]");
406+
assertResultsIgnoreOrder(rowIter, expectedRows, true);
407+
}
408+
379409
@Test
380410
void testExceptionsForArrayTypeUsage() {
381411
assertThatThrownBy(

0 commit comments

Comments
 (0)