Skip to content

Commit e2e2285

Browse files
fix failed cases
1 parent e8be1f7 commit e2e2285

File tree

15 files changed

+129
-138
lines changed

15 files changed

+129
-138
lines changed

fluss-client/pom.xml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,6 @@
113113
<include>*:*</include>
114114
</includes>
115115
</artifactSet>
116-
<filters>
117-
<filter>
118-
<artifact>*</artifact>
119-
<excludes>
120-
<exclude>LICENSE*</exclude>
121-
</excludes>
122-
</filter>
123-
</filters>
124116
</configuration>
125117
</execution>
126118
</executions>

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,21 +227,32 @@ void testComplexTypeFetch() throws Exception {
227227
new Object[] {
228228
1,
229229
new String[] {"a", "b"},
230-
new Object[] {new int[] {1, 2}, new int[] {3, 4}}
230+
new Object[] {new int[] {1, 2}, new int[] {3, 4}},
231+
new Object[] {10, new Object[] {20, "nested"}, "row1"}
231232
},
232233
new Object[] {
233-
2, new String[] {"c", null}, new Object[] {null, new int[] {3, 4}}
234+
2,
235+
new String[] {"c", null},
236+
new Object[] {null, new int[] {3, 4}},
237+
new Object[] {30, new Object[] {40, "test"}, "row2"}
234238
},
235239
new Object[] {
236240
3,
237241
new String[] {"e", "f"},
238-
new Object[] {new int[] {5, 6, 7}, new int[] {8}}
242+
new Object[] {new int[] {5, 6, 7}, new int[] {8}},
243+
new Object[] {50, new Object[] {60, "value"}, "row3"}
239244
});
240245
Schema schema =
241246
Schema.newBuilder()
242247
.column("a", DataTypes.INT())
243248
.column("b", DataTypes.ARRAY(DataTypes.STRING()))
244249
.column("c", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
250+
.column(
251+
"d",
252+
DataTypes.ROW(
253+
DataTypes.INT(),
254+
DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()),
255+
DataTypes.STRING()))
245256
.build();
246257
TableInfo tableInfo =
247258
TableInfo.of(
@@ -299,6 +310,17 @@ void testComplexTypeFetch() throws Exception {
299310
.isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[1]));
300311
assertThat(row.getArray(2).toString())
301312
.isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[2]));
313+
InternalRow nestedRow = row.getRow(3, 3);
314+
assertThat(nestedRow).isNotNull();
315+
assertThat(nestedRow.getInt(0)).isEqualTo(((Object[]) complexData.get(i)[3])[0]);
316+
InternalRow deeplyNestedRow = nestedRow.getRow(1, 2);
317+
assertThat(deeplyNestedRow).isNotNull();
318+
assertThat(deeplyNestedRow.getInt(0))
319+
.isEqualTo(((Object[]) ((Object[]) complexData.get(i)[3])[1])[0]);
320+
assertThat(deeplyNestedRow.getString(1).toString())
321+
.isEqualTo(((Object[]) ((Object[]) complexData.get(i)[3])[1])[1]);
322+
assertThat(nestedRow.getString(2).toString())
323+
.isEqualTo(((Object[]) complexData.get(i)[3])[2]);
302324
}
303325
}
304326

fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,16 @@ public static GenericRow row(Object... objects) {
104104
return row;
105105
}
106106

107+
public static GenericRow row(RowType rowType, Object... objects) {
108+
GenericRow row = new GenericRow(objects.length);
109+
List<DataType> fieldTypes = rowType.getChildren();
110+
for (int i = 0; i < objects.length; i++) {
111+
Object value = toInternalObject(objects[i], fieldTypes.get(i));
112+
row.setField(i, value);
113+
}
114+
return row;
115+
}
116+
107117
private static Object toInternalObject(Object obj) {
108118
if (obj == null) {
109119
return null;
@@ -124,6 +134,40 @@ private static Object toInternalObject(Object obj) {
124134
}
125135
}
126136

137+
private static Object toInternalObject(Object obj, DataType dataType) {
138+
if (obj == null) {
139+
return null;
140+
}
141+
if (obj instanceof String) {
142+
return BinaryString.fromString((String) obj);
143+
} else if (obj instanceof Object[]) {
144+
DataTypeRoot typeRoot = dataType.getTypeRoot();
145+
if (typeRoot == DataTypeRoot.ROW) {
146+
RowType rowType = (RowType) dataType;
147+
return row(rowType, (Object[]) obj);
148+
} else if (typeRoot == DataTypeRoot.ARRAY) {
149+
DataType elementType = dataType.getChildren().get(0);
150+
Object[] array = (Object[]) obj;
151+
Object[] internalArray = new Object[array.length];
152+
for (int j = 0; j < array.length; j++) {
153+
internalArray[j] = toInternalObject(array[j], elementType);
154+
}
155+
return new GenericArray(internalArray);
156+
} else {
157+
Object[] array = (Object[]) obj;
158+
Object[] internalArray = new Object[array.length];
159+
for (int j = 0; j < array.length; j++) {
160+
internalArray[j] = toInternalObject(array[j], dataType);
161+
}
162+
return new GenericArray(internalArray);
163+
}
164+
} else if (obj instanceof int[]) {
165+
return new GenericArray((int[]) obj);
166+
} else {
167+
return obj;
168+
}
169+
}
170+
127171
public static CompactedRow compactedRow(RowType rowType, Object[] objects) {
128172
return genCompacted(rowType, objects);
129173
}
@@ -474,7 +518,7 @@ public static MemoryLogRecords createMemoryLogRecords(
474518
throws Exception {
475519
if (logFormat == LogFormat.ARROW) {
476520
List<InternalRow> rows =
477-
objects.stream().map(DataTestUtils::row).collect(Collectors.toList());
521+
objects.stream().map(objs -> row(rowType, objs)).collect(Collectors.toList());
478522
return createArrowMemoryLogRecords(
479523
rowType,
480524
offsetBase,

fluss-filesystems/fluss-fs-obs/pom.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,6 @@
230230
<exclude>.gitkeep</exclude>
231231
<exclude>mime.types</exclude>
232232
<exclude>mozilla/**</exclude>
233-
<exclude>LICENSE.txt</exclude>
234-
<exclude>license/LICENSE*</exclude>
235233
<exclude>okhttp3/internal/publicsuffix/NOTICE</exclude>
236234
<exclude>NOTICE</exclude>
237235
</excludes>

fluss-filesystems/fluss-fs-oss/pom.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,6 @@
214214
<exclude>.gitkeep</exclude>
215215
<exclude>mime.types</exclude>
216216
<exclude>mozilla/**</exclude>
217-
<exclude>LICENSE.txt</exclude>
218-
<exclude>license/LICENSE*</exclude>
219217
<exclude>okhttp3/internal/publicsuffix/NOTICE</exclude>
220218
<exclude>NOTICE</exclude>
221219
</excludes>

fluss-flink/fluss-flink-1.18/pom.xml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,6 @@
219219
<include>org.apache.fluss:fluss-client</include>
220220
</includes>
221221
</artifactSet>
222-
<filters>
223-
<filter>
224-
<artifact>*</artifact>
225-
<excludes>
226-
<exclude>LICENSE*</exclude>
227-
</excludes>
228-
</filter>
229-
</filters>
230222
</configuration>
231223
</execution>
232224
</executions>

fluss-flink/fluss-flink-1.19/pom.xml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -213,14 +213,6 @@
213213
<include>org.apache.fluss:fluss-client</include>
214214
</includes>
215215
</artifactSet>
216-
<filters>
217-
<filter>
218-
<artifact>*</artifact>
219-
<excludes>
220-
<exclude>LICENSE*</exclude>
221-
</excludes>
222-
</filter>
223-
</filters>
224216
</configuration>
225217
</execution>
226218
</executions>

fluss-flink/fluss-flink-1.20/pom.xml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -234,14 +234,6 @@
234234
<include>org.apache.fluss:fluss-client</include>
235235
</includes>
236236
</artifactSet>
237-
<filters>
238-
<filter>
239-
<artifact>*</artifact>
240-
<excludes>
241-
<exclude>LICENSE*</exclude>
242-
</excludes>
243-
</filter>
244-
</filters>
245237
</configuration>
246238
</execution>
247239
</executions>

fluss-flink/fluss-flink-2.1/pom.xml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -244,14 +244,6 @@
244244
<include>org.apache.fluss:fluss-client</include>
245245
</includes>
246246
</artifactSet>
247-
<filters>
248-
<filter>
249-
<artifact>*</artifact>
250-
<excludes>
251-
<exclude>LICENSE*</exclude>
252-
</excludes>
253-
</filter>
254-
</filters>
255247
</configuration>
256248
</execution>
257249
</executions>

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkArrayConverter.java

Lines changed: 29 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@
3232
import org.apache.flink.table.data.TimestampData;
3333

3434
import static org.apache.fluss.flink.utils.FlussRowToFlinkRowConverter.createInternalConverter;
35-
import static org.apache.fluss.types.DataTypeChecks.getLength;
36-
import static org.apache.fluss.types.DataTypeChecks.getPrecision;
37-
import static org.apache.fluss.types.DataTypeChecks.getScale;
3835

3936
/** Flink Array Converter. */
4037
public class FlinkArrayConverter implements ArrayData {
@@ -48,32 +45,42 @@ public class FlinkArrayConverter implements ArrayData {
4845
private ArrayData copyArray(org.apache.fluss.row.InternalArray from, DataType eleType) {
4946
FlussRowToFlinkRowConverter.FlussDeserializationConverter converter =
5047
createInternalConverter(eleType);
48+
InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(eleType);
49+
5150
if (!eleType.isNullable()) {
52-
switch (eleType.getTypeRoot()) {
53-
case BOOLEAN:
54-
return new GenericArrayData(from.toBooleanArray());
55-
case TINYINT:
56-
return new GenericArrayData(from.toByteArray());
57-
case SMALLINT:
58-
return new GenericArrayData(from.toShortArray());
59-
case INTEGER:
60-
case DATE:
61-
case TIME_WITHOUT_TIME_ZONE:
62-
return new GenericArrayData(from.toIntArray());
63-
case BIGINT:
64-
return new GenericArrayData(from.toLongArray());
65-
case FLOAT:
66-
return new GenericArrayData(from.toFloatArray());
67-
case DOUBLE:
68-
return new GenericArrayData(from.toDoubleArray());
51+
try {
52+
switch (eleType.getTypeRoot()) {
53+
case BOOLEAN:
54+
return new GenericArrayData(from.toBooleanArray());
55+
case TINYINT:
56+
return new GenericArrayData(from.toByteArray());
57+
case SMALLINT:
58+
return new GenericArrayData(from.toShortArray());
59+
case INTEGER:
60+
case DATE:
61+
case TIME_WITHOUT_TIME_ZONE:
62+
return new GenericArrayData(from.toIntArray());
63+
case BIGINT:
64+
return new GenericArrayData(from.toLongArray());
65+
case FLOAT:
66+
return new GenericArrayData(from.toFloatArray());
67+
case DOUBLE:
68+
return new GenericArrayData(from.toDoubleArray());
69+
default:
70+
// Fall through to element-by-element conversion for complex types
71+
break;
72+
}
73+
} catch (Exception e) {
74+
// Fall back to element-by-element conversion if primitive array access fails
6975
}
7076
}
7177

7278
Object[] newArray = new Object[from.size()];
7379

7480
for (int i = 0; i < newArray.length; ++i) {
75-
if (!from.isNullAt(i)) {
76-
newArray[i] = converter.deserialize(getFieldValue(from, i, eleType));
81+
Object element = elementGetter.getElementOrNull(from, i);
82+
if (element != null) {
83+
newArray[i] = converter.deserialize(element);
7784
} else {
7885
newArray[i] = null;
7986
}
@@ -209,54 +216,4 @@ public ArrayData getArrayData() {
209216
public static ArrayData deserialize(DataType flussDataType, Object flussField) {
210217
return new FlinkArrayConverter(flussDataType, flussField).getArrayData();
211218
}
212-
213-
private static Object getFieldValue(InternalArray array, int pos, DataType dataType) {
214-
if (array.isNullAt(pos)) {
215-
return null;
216-
}
217-
218-
switch (dataType.getTypeRoot()) {
219-
case CHAR:
220-
return array.getChar(pos, getLength(dataType));
221-
case STRING:
222-
return array.getString(pos);
223-
case BOOLEAN:
224-
return array.getBoolean(pos);
225-
case BINARY:
226-
return array.getBinary(pos, getLength(dataType));
227-
case BYTES:
228-
return array.getBytes(pos);
229-
case DECIMAL:
230-
return array.getDecimal(pos, getPrecision(dataType), getScale(dataType));
231-
case TINYINT:
232-
return array.getByte(pos);
233-
case SMALLINT:
234-
return array.getShort(pos);
235-
case INTEGER:
236-
case DATE:
237-
case TIME_WITHOUT_TIME_ZONE:
238-
return array.getInt(pos);
239-
case BIGINT:
240-
return array.getLong(pos);
241-
case FLOAT:
242-
return array.getFloat(pos);
243-
case DOUBLE:
244-
return array.getDouble(pos);
245-
case TIMESTAMP_WITHOUT_TIME_ZONE:
246-
return array.getTimestampNtz(pos, getPrecision(dataType));
247-
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
248-
return array.getTimestampLtz(pos, getPrecision(dataType));
249-
case ARRAY:
250-
return array.getArray(pos);
251-
// TODO: Add Map type support in future
252-
case MAP:
253-
throw new UnsupportedOperationException("Map type not supported yet");
254-
// TODO: Requires InternalArray.getRow() method from fluss-common Row type support
255-
case ROW:
256-
// return array.getRow(pos, getFieldCount(dataType));
257-
throw new UnsupportedOperationException("Row type requires fluss-common update");
258-
default:
259-
throw new IllegalArgumentException("Unsupported data type: " + dataType);
260-
}
261-
}
262219
}

0 commit comments

Comments
 (0)