Skip to content

Commit c71d9fc

Browse files
author
binary-signal
committed
add adapters for converting fluss arrays to paimon arrays and vice versa
Signed-off-by: binary-signal <[email protected]>
1 parent d10252d commit c71d9fc

File tree

7 files changed

+884
-4
lines changed

7 files changed

+884
-4
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.paimon.source;
19+
20+
import org.apache.fluss.row.TimestampNtz;
21+
22+
import org.apache.paimon.data.BinaryString;
23+
import org.apache.paimon.data.Decimal;
24+
import org.apache.paimon.data.InternalArray;
25+
import org.apache.paimon.data.InternalMap;
26+
import org.apache.paimon.data.InternalRow;
27+
import org.apache.paimon.data.Timestamp;
28+
import org.apache.paimon.data.variant.Variant;
29+
30+
/** Adapter class for converting Fluss InternalArray to Paimon InternalArray. */
31+
public class FlussArrayAsPaimonArray implements InternalArray {
32+
33+
private final org.apache.fluss.row.InternalArray flussArray;
34+
35+
public FlussArrayAsPaimonArray(org.apache.fluss.row.InternalArray flussArray) {
36+
this.flussArray = flussArray;
37+
}
38+
39+
@Override
40+
public int size() {
41+
return flussArray.size();
42+
}
43+
44+
@Override
45+
public boolean isNullAt(int pos) {
46+
return flussArray.isNullAt(pos);
47+
}
48+
49+
@Override
50+
public boolean getBoolean(int pos) {
51+
return flussArray.getBoolean(pos);
52+
}
53+
54+
@Override
55+
public byte getByte(int pos) {
56+
return flussArray.getByte(pos);
57+
}
58+
59+
@Override
60+
public short getShort(int pos) {
61+
return flussArray.getShort(pos);
62+
}
63+
64+
@Override
65+
public int getInt(int pos) {
66+
return flussArray.getInt(pos);
67+
}
68+
69+
@Override
70+
public long getLong(int pos) {
71+
return flussArray.getLong(pos);
72+
}
73+
74+
@Override
75+
public float getFloat(int pos) {
76+
return flussArray.getFloat(pos);
77+
}
78+
79+
@Override
80+
public double getDouble(int pos) {
81+
return flussArray.getDouble(pos);
82+
}
83+
84+
@Override
85+
public BinaryString getString(int pos) {
86+
return BinaryString.fromBytes(flussArray.getString(pos).toBytes());
87+
}
88+
89+
@Override
90+
public Decimal getDecimal(int pos, int precision, int scale) {
91+
org.apache.fluss.row.Decimal flussDecimal = flussArray.getDecimal(pos, precision, scale);
92+
if (flussDecimal.isCompact()) {
93+
return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), precision, scale);
94+
} else {
95+
return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), precision, scale);
96+
}
97+
}
98+
99+
@Override
100+
public Timestamp getTimestamp(int pos, int precision) {
101+
// Default to TIMESTAMP_WITHOUT_TIME_ZONE behavior for arrays
102+
if (TimestampNtz.isCompact(precision)) {
103+
return Timestamp.fromEpochMillis(
104+
flussArray.getTimestampNtz(pos, precision).getMillisecond());
105+
} else {
106+
TimestampNtz timestampNtz = flussArray.getTimestampNtz(pos, precision);
107+
return Timestamp.fromEpochMillis(
108+
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
109+
}
110+
}
111+
112+
@Override
113+
public byte[] getBinary(int pos) {
114+
return flussArray.getBytes(pos);
115+
}
116+
117+
@Override
118+
public Variant getVariant(int pos) {
119+
throw new UnsupportedOperationException(
120+
"getVariant is not supported for Fluss array currently.");
121+
}
122+
123+
@Override
124+
public InternalArray getArray(int pos) {
125+
org.apache.fluss.row.InternalArray innerArray = flussArray.getArray(pos);
126+
return innerArray == null ? null : new FlussArrayAsPaimonArray(innerArray);
127+
}
128+
129+
@Override
130+
public InternalMap getMap(int pos) {
131+
throw new UnsupportedOperationException(
132+
"getMap is not supported for Fluss array currently.");
133+
}
134+
135+
@Override
136+
public InternalRow getRow(int pos, int numFields) {
137+
throw new UnsupportedOperationException(
138+
"getRow is not supported for Fluss array currently.");
139+
}
140+
141+
@Override
142+
public boolean[] toBooleanArray() {
143+
return flussArray.toBooleanArray();
144+
}
145+
146+
@Override
147+
public byte[] toByteArray() {
148+
return flussArray.toByteArray();
149+
}
150+
151+
@Override
152+
public short[] toShortArray() {
153+
return flussArray.toShortArray();
154+
}
155+
156+
@Override
157+
public int[] toIntArray() {
158+
return flussArray.toIntArray();
159+
}
160+
161+
@Override
162+
public long[] toLongArray() {
163+
return flussArray.toLongArray();
164+
}
165+
166+
@Override
167+
public float[] toFloatArray() {
168+
return flussArray.toFloatArray();
169+
}
170+
171+
@Override
172+
public double[] toDoubleArray() {
173+
return flussArray.toDoubleArray();
174+
}
175+
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ public Variant getVariant(int i) {
159159

160160
@Override
161161
public InternalArray getArray(int pos) {
162-
throw new UnsupportedOperationException(
163-
"getArray is not support for Fluss record currently.");
162+
org.apache.fluss.row.InternalArray flussArray = internalRow.getArray(pos);
163+
return flussArray == null ? null : new FlussArrayAsPaimonArray(flussArray);
164164
}
165165

166166
@Override
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.lake.paimon.utils;
19+
20+
import org.apache.fluss.row.BinaryString;
21+
import org.apache.fluss.row.Decimal;
22+
import org.apache.fluss.row.InternalArray;
23+
import org.apache.fluss.row.TimestampLtz;
24+
import org.apache.fluss.row.TimestampNtz;
25+
26+
import org.apache.paimon.data.Timestamp;
27+
28+
/** Adapter class for converting Paimon InternalArray to Fluss InternalArray. */
29+
public class PaimonArrayAsFlussArray implements InternalArray {
30+
31+
private final org.apache.paimon.data.InternalArray paimonArray;
32+
33+
public PaimonArrayAsFlussArray(org.apache.paimon.data.InternalArray paimonArray) {
34+
this.paimonArray = paimonArray;
35+
}
36+
37+
@Override
38+
public int size() {
39+
return paimonArray.size();
40+
}
41+
42+
@Override
43+
public boolean isNullAt(int pos) {
44+
return paimonArray.isNullAt(pos);
45+
}
46+
47+
@Override
48+
public boolean getBoolean(int pos) {
49+
return paimonArray.getBoolean(pos);
50+
}
51+
52+
@Override
53+
public byte getByte(int pos) {
54+
return paimonArray.getByte(pos);
55+
}
56+
57+
@Override
58+
public short getShort(int pos) {
59+
return paimonArray.getShort(pos);
60+
}
61+
62+
@Override
63+
public int getInt(int pos) {
64+
return paimonArray.getInt(pos);
65+
}
66+
67+
@Override
68+
public long getLong(int pos) {
69+
return paimonArray.getLong(pos);
70+
}
71+
72+
@Override
73+
public float getFloat(int pos) {
74+
return paimonArray.getFloat(pos);
75+
}
76+
77+
@Override
78+
public double getDouble(int pos) {
79+
return paimonArray.getDouble(pos);
80+
}
81+
82+
@Override
83+
public BinaryString getChar(int pos, int length) {
84+
return BinaryString.fromBytes(paimonArray.getString(pos).toBytes());
85+
}
86+
87+
@Override
88+
public BinaryString getString(int pos) {
89+
return BinaryString.fromBytes(paimonArray.getString(pos).toBytes());
90+
}
91+
92+
@Override
93+
public Decimal getDecimal(int pos, int precision, int scale) {
94+
org.apache.paimon.data.Decimal paimonDecimal =
95+
paimonArray.getDecimal(pos, precision, scale);
96+
if (paimonDecimal.isCompact()) {
97+
return Decimal.fromUnscaledLong(paimonDecimal.toUnscaledLong(), precision, scale);
98+
} else {
99+
return Decimal.fromBigDecimal(paimonDecimal.toBigDecimal(), precision, scale);
100+
}
101+
}
102+
103+
@Override
104+
public TimestampNtz getTimestampNtz(int pos, int precision) {
105+
Timestamp timestamp = paimonArray.getTimestamp(pos, precision);
106+
if (TimestampNtz.isCompact(precision)) {
107+
return TimestampNtz.fromMillis(timestamp.getMillisecond());
108+
} else {
109+
return TimestampNtz.fromMillis(
110+
timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
111+
}
112+
}
113+
114+
@Override
115+
public TimestampLtz getTimestampLtz(int pos, int precision) {
116+
Timestamp timestamp = paimonArray.getTimestamp(pos, precision);
117+
if (TimestampLtz.isCompact(precision)) {
118+
return TimestampLtz.fromEpochMillis(timestamp.getMillisecond());
119+
} else {
120+
return TimestampLtz.fromEpochMillis(
121+
timestamp.getMillisecond(), timestamp.getNanoOfMillisecond());
122+
}
123+
}
124+
125+
@Override
126+
public byte[] getBinary(int pos, int length) {
127+
return paimonArray.getBinary(pos);
128+
}
129+
130+
@Override
131+
public byte[] getBytes(int pos) {
132+
return paimonArray.getBinary(pos);
133+
}
134+
135+
@Override
136+
public InternalArray getArray(int pos) {
137+
org.apache.paimon.data.InternalArray innerArray = paimonArray.getArray(pos);
138+
return innerArray == null ? null : new PaimonArrayAsFlussArray(innerArray);
139+
}
140+
141+
@Override
142+
public boolean[] toBooleanArray() {
143+
return paimonArray.toBooleanArray();
144+
}
145+
146+
@Override
147+
public byte[] toByteArray() {
148+
return paimonArray.toByteArray();
149+
}
150+
151+
@Override
152+
public short[] toShortArray() {
153+
return paimonArray.toShortArray();
154+
}
155+
156+
@Override
157+
public int[] toIntArray() {
158+
return paimonArray.toIntArray();
159+
}
160+
161+
@Override
162+
public long[] toLongArray() {
163+
return paimonArray.toLongArray();
164+
}
165+
166+
@Override
167+
public float[] toFloatArray() {
168+
return paimonArray.toFloatArray();
169+
}
170+
171+
@Override
172+
public double[] toDoubleArray() {
173+
return paimonArray.toDoubleArray();
174+
}
175+
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ public byte[] getBytes(int pos) {
143143

144144
@Override
145145
public InternalArray getArray(int pos) {
146-
// TODO: Support Array type conversion from Paimon to Fluss
147-
throw new UnsupportedOperationException();
146+
org.apache.paimon.data.InternalArray paimonArray = paimonRow.getArray(pos);
147+
return paimonArray == null ? null : new PaimonArrayAsFlussArray(paimonArray);
148148
}
149149

150150
// TODO: Support Map type conversion from Paimon to Fluss

0 commit comments

Comments
 (0)