Skip to content

Commit 1b2580c

Browse files
binary-signalluoyuxia
authored andcommitted
add adapters for converting fluss arrays to paimon arrays and vice versa
Signed-off-by: binary-signal <[email protected]>
1 parent 57c0de7 commit 1b2580c

File tree

5 files changed

+707
-2
lines changed

5 files changed

+707
-2
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.paimon.source;
19+
20+
import org.apache.fluss.row.TimestampNtz;
21+
22+
import org.apache.paimon.data.BinaryString;
23+
import org.apache.paimon.data.Decimal;
24+
import org.apache.paimon.data.InternalArray;
25+
import org.apache.paimon.data.InternalMap;
26+
import org.apache.paimon.data.InternalRow;
27+
import org.apache.paimon.data.Timestamp;
28+
import org.apache.paimon.data.variant.Variant;
29+
30+
/** Adapter class for converting Fluss InternalArray to Paimon InternalArray. */
31+
public class FlussArrayAsPaimonArray implements InternalArray {
32+
33+
private final org.apache.fluss.row.InternalArray flussArray;
34+
35+
public FlussArrayAsPaimonArray(org.apache.fluss.row.InternalArray flussArray) {
36+
this.flussArray = flussArray;
37+
}
38+
39+
@Override
40+
public int size() {
41+
return flussArray.size();
42+
}
43+
44+
@Override
45+
public boolean isNullAt(int pos) {
46+
return flussArray.isNullAt(pos);
47+
}
48+
49+
@Override
50+
public boolean getBoolean(int pos) {
51+
return flussArray.getBoolean(pos);
52+
}
53+
54+
@Override
55+
public byte getByte(int pos) {
56+
return flussArray.getByte(pos);
57+
}
58+
59+
@Override
60+
public short getShort(int pos) {
61+
return flussArray.getShort(pos);
62+
}
63+
64+
@Override
65+
public int getInt(int pos) {
66+
return flussArray.getInt(pos);
67+
}
68+
69+
@Override
70+
public long getLong(int pos) {
71+
return flussArray.getLong(pos);
72+
}
73+
74+
@Override
75+
public float getFloat(int pos) {
76+
return flussArray.getFloat(pos);
77+
}
78+
79+
@Override
80+
public double getDouble(int pos) {
81+
return flussArray.getDouble(pos);
82+
}
83+
84+
@Override
85+
public BinaryString getString(int pos) {
86+
return BinaryString.fromBytes(flussArray.getString(pos).toBytes());
87+
}
88+
89+
@Override
90+
public Decimal getDecimal(int pos, int precision, int scale) {
91+
org.apache.fluss.row.Decimal flussDecimal = flussArray.getDecimal(pos, precision, scale);
92+
if (flussDecimal.isCompact()) {
93+
return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), precision, scale);
94+
} else {
95+
return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), precision, scale);
96+
}
97+
}
98+
99+
@Override
100+
public Timestamp getTimestamp(int pos, int precision) {
101+
// Default to TIMESTAMP_WITHOUT_TIME_ZONE behavior for arrays
102+
if (TimestampNtz.isCompact(precision)) {
103+
return Timestamp.fromEpochMillis(
104+
flussArray.getTimestampNtz(pos, precision).getMillisecond());
105+
} else {
106+
TimestampNtz timestampNtz = flussArray.getTimestampNtz(pos, precision);
107+
return Timestamp.fromEpochMillis(
108+
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
109+
}
110+
}
111+
112+
@Override
113+
public byte[] getBinary(int pos) {
114+
return flussArray.getBytes(pos);
115+
}
116+
117+
@Override
118+
public Variant getVariant(int pos) {
119+
throw new UnsupportedOperationException(
120+
"getVariant is not supported for Fluss array currently.");
121+
}
122+
123+
@Override
124+
public InternalArray getArray(int pos) {
125+
org.apache.fluss.row.InternalArray innerArray = flussArray.getArray(pos);
126+
return innerArray == null ? null : new FlussArrayAsPaimonArray(innerArray);
127+
}
128+
129+
@Override
130+
public InternalMap getMap(int pos) {
131+
throw new UnsupportedOperationException(
132+
"getMap is not supported for Fluss array currently.");
133+
}
134+
135+
@Override
136+
public InternalRow getRow(int pos, int numFields) {
137+
throw new UnsupportedOperationException(
138+
"getRow is not supported for Fluss array currently.");
139+
}
140+
141+
@Override
142+
public boolean[] toBooleanArray() {
143+
return flussArray.toBooleanArray();
144+
}
145+
146+
@Override
147+
public byte[] toByteArray() {
148+
return flussArray.toByteArray();
149+
}
150+
151+
@Override
152+
public short[] toShortArray() {
153+
return flussArray.toShortArray();
154+
}
155+
156+
@Override
157+
public int[] toIntArray() {
158+
return flussArray.toIntArray();
159+
}
160+
161+
@Override
162+
public long[] toLongArray() {
163+
return flussArray.toLongArray();
164+
}
165+
166+
@Override
167+
public float[] toFloatArray() {
168+
return flussArray.toFloatArray();
169+
}
170+
171+
@Override
172+
public double[] toDoubleArray() {
173+
return flussArray.toDoubleArray();
174+
}
175+
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ public Variant getVariant(int i) {
159159

160160
@Override
161161
public InternalArray getArray(int pos) {
162-
throw new UnsupportedOperationException(
163-
"getArray is not support for Fluss record currently.");
162+
org.apache.fluss.row.InternalArray flussArray = internalRow.getArray(pos);
163+
return flussArray == null ? null : new FlussArrayAsPaimonArray(flussArray);
164164
}
165165

166166
@Override

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import org.apache.fluss.record.LogRecord;
2525
import org.apache.fluss.row.BinaryString;
2626
import org.apache.fluss.row.Decimal;
27+
import org.apache.fluss.row.GenericArray;
2728
import org.apache.fluss.row.GenericRow;
2829
import org.apache.fluss.row.TimestampLtz;
2930
import org.apache.fluss.row.TimestampNtz;
3031

32+
import org.apache.paimon.data.InternalArray;
3133
import org.apache.paimon.types.RowKind;
3234
import org.apache.paimon.types.RowType;
3335
import org.junit.jupiter.api.Test;
@@ -140,4 +142,171 @@ void testPrimaryKeyTableRecord() {
140142
assertThat(new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType).getRowKind())
141143
.isEqualTo(RowKind.INSERT);
142144
}
145+
146+
@Test
147+
void testArrayTypeWithIntElements() {
148+
RowType tableRowType =
149+
RowType.of(
150+
new org.apache.paimon.types.IntType(),
151+
new org.apache.paimon.types.ArrayType(
152+
new org.apache.paimon.types.IntType()));
153+
154+
long logOffset = 0;
155+
long timeStamp = System.currentTimeMillis();
156+
GenericRow genericRow = new GenericRow(2);
157+
genericRow.setField(0, 42);
158+
genericRow.setField(1, new GenericArray(new int[] {1, 2, 3, 4, 5}));
159+
160+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
161+
FlussRowAsPaimonRow flussRowAsPaimonRow =
162+
new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType);
163+
164+
assertThat(flussRowAsPaimonRow.getInt(0)).isEqualTo(42);
165+
InternalArray array = flussRowAsPaimonRow.getArray(1);
166+
assertThat(array).isNotNull();
167+
assertThat(array.size()).isEqualTo(5);
168+
assertThat(array.getInt(0)).isEqualTo(1);
169+
assertThat(array.getInt(1)).isEqualTo(2);
170+
assertThat(array.getInt(2)).isEqualTo(3);
171+
assertThat(array.getInt(3)).isEqualTo(4);
172+
assertThat(array.getInt(4)).isEqualTo(5);
173+
}
174+
175+
@Test
176+
void testArrayTypeWithStringElements() {
177+
RowType tableRowType =
178+
RowType.of(
179+
new org.apache.paimon.types.VarCharType(),
180+
new org.apache.paimon.types.ArrayType(
181+
new org.apache.paimon.types.VarCharType()));
182+
183+
long logOffset = 0;
184+
long timeStamp = System.currentTimeMillis();
185+
GenericRow genericRow = new GenericRow(2);
186+
genericRow.setField(0, BinaryString.fromString("name"));
187+
genericRow.setField(
188+
1,
189+
new GenericArray(
190+
new Object[] {
191+
BinaryString.fromString("a"),
192+
BinaryString.fromString("b"),
193+
BinaryString.fromString("c")
194+
}));
195+
196+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
197+
FlussRowAsPaimonRow flussRowAsPaimonRow =
198+
new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType);
199+
200+
assertThat(flussRowAsPaimonRow.getString(0).toString()).isEqualTo("name");
201+
InternalArray array = flussRowAsPaimonRow.getArray(1);
202+
assertThat(array).isNotNull();
203+
assertThat(array.size()).isEqualTo(3);
204+
assertThat(array.getString(0).toString()).isEqualTo("a");
205+
assertThat(array.getString(1).toString()).isEqualTo("b");
206+
assertThat(array.getString(2).toString()).isEqualTo("c");
207+
}
208+
209+
@Test
210+
void testArrayTypeWithNullableElements() {
211+
RowType tableRowType =
212+
RowType.of(
213+
new org.apache.paimon.types.ArrayType(
214+
new org.apache.paimon.types.IntType().nullable()));
215+
216+
long logOffset = 0;
217+
long timeStamp = System.currentTimeMillis();
218+
GenericRow genericRow = new GenericRow(1);
219+
genericRow.setField(0, new GenericArray(new Object[] {1, null, 3}));
220+
221+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
222+
FlussRowAsPaimonRow flussRowAsPaimonRow =
223+
new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType);
224+
225+
InternalArray array = flussRowAsPaimonRow.getArray(0);
226+
assertThat(array).isNotNull();
227+
assertThat(array.size()).isEqualTo(3);
228+
assertThat(array.getInt(0)).isEqualTo(1);
229+
assertThat(array.isNullAt(1)).isTrue();
230+
assertThat(array.getInt(2)).isEqualTo(3);
231+
}
232+
233+
@Test
234+
void testNullArray() {
235+
RowType tableRowType =
236+
RowType.of(
237+
new org.apache.paimon.types.ArrayType(new org.apache.paimon.types.IntType())
238+
.nullable());
239+
240+
long logOffset = 0;
241+
long timeStamp = System.currentTimeMillis();
242+
GenericRow genericRow = new GenericRow(1);
243+
genericRow.setField(0, null);
244+
245+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
246+
FlussRowAsPaimonRow flussRowAsPaimonRow =
247+
new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType);
248+
249+
assertThat(flussRowAsPaimonRow.isNullAt(0)).isTrue();
250+
}
251+
252+
@Test
253+
void testNestedArrayType() {
254+
// Test ARRAY<ARRAY<INT>>
255+
RowType tableRowType =
256+
RowType.of(
257+
new org.apache.paimon.types.ArrayType(
258+
new org.apache.paimon.types.ArrayType(
259+
new org.apache.paimon.types.IntType())));
260+
261+
long logOffset = 0;
262+
long timeStamp = System.currentTimeMillis();
263+
GenericRow genericRow = new GenericRow(1);
264+
genericRow.setField(
265+
0,
266+
new GenericArray(
267+
new Object[] {
268+
new GenericArray(new int[] {1, 2}),
269+
new GenericArray(new int[] {3, 4, 5})
270+
}));
271+
272+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
273+
FlussRowAsPaimonRow flussRowAsPaimonRow =
274+
new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType);
275+
276+
InternalArray outerArray = flussRowAsPaimonRow.getArray(0);
277+
assertThat(outerArray).isNotNull();
278+
assertThat(outerArray.size()).isEqualTo(2);
279+
280+
InternalArray innerArray1 = outerArray.getArray(0);
281+
assertThat(innerArray1.size()).isEqualTo(2);
282+
assertThat(innerArray1.getInt(0)).isEqualTo(1);
283+
assertThat(innerArray1.getInt(1)).isEqualTo(2);
284+
285+
InternalArray innerArray2 = outerArray.getArray(1);
286+
assertThat(innerArray2.size()).isEqualTo(3);
287+
assertThat(innerArray2.getInt(0)).isEqualTo(3);
288+
assertThat(innerArray2.getInt(1)).isEqualTo(4);
289+
assertThat(innerArray2.getInt(2)).isEqualTo(5);
290+
}
291+
292+
@Test
293+
void testEmptyArray() {
294+
RowType tableRowType =
295+
RowType.of(
296+
new org.apache.paimon.types.ArrayType(
297+
new org.apache.paimon.types.IntType()));
298+
299+
long logOffset = 0;
300+
long timeStamp = System.currentTimeMillis();
301+
GenericRow genericRow = new GenericRow(1);
302+
genericRow.setField(0, new GenericArray(new int[] {}));
303+
304+
LogRecord logRecord = new GenericRecord(logOffset, timeStamp, APPEND_ONLY, genericRow);
305+
FlussRowAsPaimonRow flussRowAsPaimonRow =
306+
new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType);
307+
308+
InternalArray array = flussRowAsPaimonRow.getArray(0);
309+
assertThat(array).isNotNull();
310+
assertThat(array.size()).isEqualTo(0);
311+
}
143312
}

0 commit comments

Comments
 (0)