Skip to content

Commit 5973558

Browse files
[lake/iceberg] Support tier array type for iceberg
This commit adds support for array type conversion between Fluss and Iceberg, enabling tiering of tables with array columns to Iceberg lakehouse. Key changes: - Updated FlussDataTypeToIcebergDataType to convert Fluss ARRAY to Iceberg LIST type - Created FlussArrayAsIcebergList adapter to convert Fluss InternalArray to Java List - Updated FlussRowAsIcebergRecord to handle array field conversion - Added array type support in IcebergConversions for bidirectional type mapping - Added comprehensive unit tests for array type conversion with various element types - Added integration tests for array type tiering with both primary key and log tables - Updated documentation to include ARRAY -> LIST type mapping
1 parent f577b4f commit 5973558

File tree

7 files changed

+784
-1
lines changed

7 files changed

+784
-1
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public Type visit(LocalZonedTimestampType localZonedTimestampType) {
129129

130130
@Override
131131
public Type visit(ArrayType arrayType) {
132-
throw new UnsupportedOperationException("Unsupported array type");
132+
return Types.ListType.ofOptional(0, arrayType.getElementType().accept(this));
133133
}
134134

135135
@Override
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.iceberg.source;
19+
20+
import org.apache.fluss.row.InternalArray;
21+
import org.apache.fluss.types.ArrayType;
22+
import org.apache.fluss.types.BigIntType;
23+
import org.apache.fluss.types.BinaryType;
24+
import org.apache.fluss.types.BooleanType;
25+
import org.apache.fluss.types.BytesType;
26+
import org.apache.fluss.types.CharType;
27+
import org.apache.fluss.types.DataType;
28+
import org.apache.fluss.types.DateType;
29+
import org.apache.fluss.types.DecimalType;
30+
import org.apache.fluss.types.DoubleType;
31+
import org.apache.fluss.types.FloatType;
32+
import org.apache.fluss.types.IntType;
33+
import org.apache.fluss.types.LocalZonedTimestampType;
34+
import org.apache.fluss.types.SmallIntType;
35+
import org.apache.fluss.types.StringType;
36+
import org.apache.fluss.types.TimeType;
37+
import org.apache.fluss.types.TimestampType;
38+
import org.apache.fluss.types.TinyIntType;
39+
import org.apache.fluss.utils.DateTimeUtils;
40+
41+
import java.nio.ByteBuffer;
42+
import java.time.Instant;
43+
import java.time.OffsetDateTime;
44+
import java.time.ZoneOffset;
45+
import java.util.AbstractList;
46+
47+
/** Adapter class for converting Fluss InternalArray to a Java List for Iceberg. */
48+
public class FlussArrayAsIcebergList extends AbstractList<Object> {
49+
50+
private final InternalArray flussArray;
51+
private final DataType elementType;
52+
53+
public FlussArrayAsIcebergList(InternalArray flussArray, DataType elementType) {
54+
this.flussArray = flussArray;
55+
this.elementType = elementType;
56+
}
57+
58+
@Override
59+
public Object get(int index) {
60+
if (flussArray.isNullAt(index)) {
61+
return null;
62+
}
63+
64+
if (elementType instanceof BooleanType) {
65+
return flussArray.getBoolean(index);
66+
} else if (elementType instanceof TinyIntType) {
67+
return (int) flussArray.getByte(index);
68+
} else if (elementType instanceof SmallIntType) {
69+
return (int) flussArray.getShort(index);
70+
} else if (elementType instanceof IntType) {
71+
return flussArray.getInt(index);
72+
} else if (elementType instanceof BigIntType) {
73+
return flussArray.getLong(index);
74+
} else if (elementType instanceof FloatType) {
75+
return flussArray.getFloat(index);
76+
} else if (elementType instanceof DoubleType) {
77+
return flussArray.getDouble(index);
78+
} else if (elementType instanceof StringType) {
79+
return flussArray.getString(index).toString();
80+
} else if (elementType instanceof CharType) {
81+
CharType charType = (CharType) elementType;
82+
return flussArray.getChar(index, charType.getLength()).toString();
83+
} else if (elementType instanceof BytesType || elementType instanceof BinaryType) {
84+
return ByteBuffer.wrap(flussArray.getBytes(index));
85+
} else if (elementType instanceof DecimalType) {
86+
DecimalType decimalType = (DecimalType) elementType;
87+
return flussArray
88+
.getDecimal(index, decimalType.getPrecision(), decimalType.getScale())
89+
.toBigDecimal();
90+
} else if (elementType instanceof LocalZonedTimestampType) {
91+
LocalZonedTimestampType ltzType = (LocalZonedTimestampType) elementType;
92+
return toIcebergTimestampLtz(
93+
flussArray.getTimestampLtz(index, ltzType.getPrecision()).toInstant());
94+
} else if (elementType instanceof TimestampType) {
95+
TimestampType tsType = (TimestampType) elementType;
96+
return flussArray.getTimestampNtz(index, tsType.getPrecision()).toLocalDateTime();
97+
} else if (elementType instanceof DateType) {
98+
return DateTimeUtils.toLocalDate(flussArray.getInt(index));
99+
} else if (elementType instanceof TimeType) {
100+
return DateTimeUtils.toLocalTime(flussArray.getInt(index));
101+
} else if (elementType instanceof ArrayType) {
102+
InternalArray innerArray = flussArray.getArray(index);
103+
return innerArray == null
104+
? null
105+
: new FlussArrayAsIcebergList(
106+
innerArray, ((ArrayType) elementType).getElementType());
107+
} else {
108+
throw new UnsupportedOperationException(
109+
"Unsupported array element type conversion for Fluss type: "
110+
+ elementType.getClass().getSimpleName());
111+
}
112+
}
113+
114+
@Override
115+
public int size() {
116+
return flussArray.size();
117+
}
118+
119+
private OffsetDateTime toIcebergTimestampLtz(Instant instant) {
120+
return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
121+
}
122+
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecord.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.fluss.lake.iceberg.source;
1919

20+
import org.apache.fluss.row.InternalArray;
2021
import org.apache.fluss.row.InternalRow;
22+
import org.apache.fluss.types.ArrayType;
2123
import org.apache.fluss.types.BigIntType;
2224
import org.apache.fluss.types.BinaryType;
2325
import org.apache.fluss.types.BooleanType;
@@ -169,6 +171,14 @@ private FlussRowToIcebergFieldConverter createTypeConverter(DataType flussType,
169171
return row -> DateTimeUtils.toLocalDate(row.getInt(pos));
170172
} else if (flussType instanceof TimeType) {
171173
return row -> DateTimeUtils.toLocalTime(row.getInt(pos));
174+
} else if (flussType instanceof ArrayType) {
175+
ArrayType arrayType = (ArrayType) flussType;
176+
return row -> {
177+
InternalArray array = row.getArray(pos);
178+
return array == null
179+
? null
180+
: new FlussArrayAsIcebergList(array, arrayType.getElementType());
181+
};
172182
} else {
173183
throw new UnsupportedOperationException(
174184
"Unsupported data type conversion for Fluss type: "

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ private static DataType convertIcebergTypeToFlussType(Type icebergType) {
125125
} else if (icebergType instanceof Types.DecimalType) {
126126
Types.DecimalType decimalType = (Types.DecimalType) icebergType;
127127
return DataTypes.DECIMAL(decimalType.precision(), decimalType.scale());
128+
} else if (icebergType instanceof Types.ListType) {
129+
Types.ListType listType = (Types.ListType) icebergType;
130+
return DataTypes.ARRAY(convertIcebergTypeToFlussType(listType.elementType()));
128131
}
129132
throw new UnsupportedOperationException(
130133
"Unsupported data type conversion for Iceberg type: "

0 commit comments

Comments
 (0)