Skip to content

Commit 3be2f3d

Browse files
binary-signalbinary-signalluoyuxia
authored
[lake/paimon] Support Array types for tiering paimon (#2166)
Co-authored-by: binary-signal <[email protected]> Co-authored-by: luoyuxia <[email protected]>
1 parent a908690 commit 3be2f3d

File tree

6 files changed

+1015
-9
lines changed

6 files changed

+1015
-9
lines changed
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.paimon.source;
19+
20+
import org.apache.fluss.row.TimestampLtz;
21+
import org.apache.fluss.row.TimestampNtz;
22+
23+
import org.apache.paimon.data.BinaryString;
24+
import org.apache.paimon.data.Decimal;
25+
import org.apache.paimon.data.InternalArray;
26+
import org.apache.paimon.data.InternalMap;
27+
import org.apache.paimon.data.InternalRow;
28+
import org.apache.paimon.data.Timestamp;
29+
import org.apache.paimon.data.variant.Variant;
30+
import org.apache.paimon.types.ArrayType;
31+
import org.apache.paimon.types.DataType;
32+
33+
/** Adapter class for converting Fluss InternalArray to Paimon InternalArray. */
34+
public class FlussArrayAsPaimonArray implements InternalArray {
35+
36+
private final org.apache.fluss.row.InternalArray flussArray;
37+
private final DataType elementType;
38+
39+
public FlussArrayAsPaimonArray(
40+
org.apache.fluss.row.InternalArray flussArray, DataType elementType) {
41+
this.flussArray = flussArray;
42+
this.elementType = elementType;
43+
}
44+
45+
@Override
46+
public int size() {
47+
return flussArray.size();
48+
}
49+
50+
@Override
51+
public boolean isNullAt(int pos) {
52+
return flussArray.isNullAt(pos);
53+
}
54+
55+
@Override
56+
public boolean getBoolean(int pos) {
57+
return flussArray.getBoolean(pos);
58+
}
59+
60+
@Override
61+
public byte getByte(int pos) {
62+
return flussArray.getByte(pos);
63+
}
64+
65+
@Override
66+
public short getShort(int pos) {
67+
return flussArray.getShort(pos);
68+
}
69+
70+
@Override
71+
public int getInt(int pos) {
72+
return flussArray.getInt(pos);
73+
}
74+
75+
@Override
76+
public long getLong(int pos) {
77+
return flussArray.getLong(pos);
78+
}
79+
80+
@Override
81+
public float getFloat(int pos) {
82+
return flussArray.getFloat(pos);
83+
}
84+
85+
@Override
86+
public double getDouble(int pos) {
87+
return flussArray.getDouble(pos);
88+
}
89+
90+
@Override
91+
public BinaryString getString(int pos) {
92+
return BinaryString.fromBytes(flussArray.getString(pos).toBytes());
93+
}
94+
95+
@Override
96+
public Decimal getDecimal(int pos, int precision, int scale) {
97+
org.apache.fluss.row.Decimal flussDecimal = flussArray.getDecimal(pos, precision, scale);
98+
if (flussDecimal.isCompact()) {
99+
return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), precision, scale);
100+
} else {
101+
return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), precision, scale);
102+
}
103+
}
104+
105+
@Override
106+
public Timestamp getTimestamp(int pos, int precision) {
107+
// Default to TIMESTAMP_WITHOUT_TIME_ZONE behavior for arrays
108+
switch (elementType.getTypeRoot()) {
109+
case TIMESTAMP_WITHOUT_TIME_ZONE:
110+
if (TimestampNtz.isCompact(precision)) {
111+
return Timestamp.fromEpochMillis(
112+
flussArray.getTimestampNtz(pos, precision).getMillisecond());
113+
} else {
114+
TimestampNtz timestampNtz = flussArray.getTimestampNtz(pos, precision);
115+
return Timestamp.fromEpochMillis(
116+
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
117+
}
118+
119+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
120+
if (TimestampLtz.isCompact(precision)) {
121+
return Timestamp.fromEpochMillis(
122+
flussArray.getTimestampLtz(pos, precision).getEpochMillisecond());
123+
} else {
124+
TimestampLtz timestampLtz = flussArray.getTimestampLtz(pos, precision);
125+
return Timestamp.fromEpochMillis(
126+
timestampLtz.getEpochMillisecond(),
127+
timestampLtz.getNanoOfMillisecond());
128+
}
129+
default:
130+
throw new UnsupportedOperationException(
131+
"Unsupported array element type for getTimestamp. "
132+
+ "Only TIMESTAMP_WITHOUT_TIME_ZONE and "
133+
+ "TIMESTAMP_WITH_LOCAL_TIME_ZONE are supported, but got: "
134+
+ elementType.getTypeRoot()
135+
+ " ("
136+
+ elementType
137+
+ ").");
138+
}
139+
}
140+
141+
@Override
142+
public byte[] getBinary(int pos) {
143+
return flussArray.getBytes(pos);
144+
}
145+
146+
@Override
147+
public Variant getVariant(int pos) {
148+
throw new UnsupportedOperationException(
149+
"getVariant is not supported for Fluss array currently.");
150+
}
151+
152+
@Override
153+
public InternalArray getArray(int pos) {
154+
org.apache.fluss.row.InternalArray innerArray = flussArray.getArray(pos);
155+
return innerArray == null
156+
? null
157+
: new FlussArrayAsPaimonArray(
158+
innerArray, ((ArrayType) elementType).getElementType());
159+
}
160+
161+
@Override
162+
public InternalMap getMap(int pos) {
163+
throw new UnsupportedOperationException(
164+
"getMap is not supported for Fluss array currently.");
165+
}
166+
167+
@Override
168+
public InternalRow getRow(int pos, int numFields) {
169+
throw new UnsupportedOperationException(
170+
"getRow is not supported for Fluss array currently.");
171+
}
172+
173+
@Override
174+
public boolean[] toBooleanArray() {
175+
return flussArray.toBooleanArray();
176+
}
177+
178+
@Override
179+
public byte[] toByteArray() {
180+
return flussArray.toByteArray();
181+
}
182+
183+
@Override
184+
public short[] toShortArray() {
185+
return flussArray.toShortArray();
186+
}
187+
188+
@Override
189+
public int[] toIntArray() {
190+
return flussArray.toIntArray();
191+
}
192+
193+
@Override
194+
public long[] toLongArray() {
195+
return flussArray.toLongArray();
196+
}
197+
198+
@Override
199+
public float[] toFloatArray() {
200+
return flussArray.toFloatArray();
201+
}
202+
203+
@Override
204+
public double[] toDoubleArray() {
205+
return flussArray.toDoubleArray();
206+
}
207+
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.data.InternalRow;
2828
import org.apache.paimon.data.Timestamp;
2929
import org.apache.paimon.data.variant.Variant;
30+
import org.apache.paimon.types.ArrayType;
3031
import org.apache.paimon.types.DataType;
3132
import org.apache.paimon.types.RowKind;
3233
import org.apache.paimon.types.RowType;
@@ -159,8 +160,12 @@ public Variant getVariant(int i) {
159160

160161
@Override
161162
public InternalArray getArray(int pos) {
162-
throw new UnsupportedOperationException(
163-
"getArray is not support for Fluss record currently.");
163+
org.apache.fluss.row.InternalArray flussArray = internalRow.getArray(pos);
164+
return flussArray == null
165+
? null
166+
: new FlussArrayAsPaimonArray(
167+
flussArray,
168+
((ArrayType) tableRowType.getField(pos).type()).getElementType());
164169
}
165170

166171
@Override

0 commit comments

Comments
 (0)