Skip to content

Commit c22e6b7

Browse files
committed
improve test
1 parent cd7f615 commit c22e6b7

File tree

5 files changed

+325
-610
lines changed

5 files changed

+325
-610
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.lake.paimon.source;
1919

20+
import org.apache.fluss.row.TimestampLtz;
2021
import org.apache.fluss.row.TimestampNtz;
2122

2223
import org.apache.paimon.data.BinaryString;
@@ -26,14 +27,19 @@
2627
import org.apache.paimon.data.InternalRow;
2728
import org.apache.paimon.data.Timestamp;
2829
import org.apache.paimon.data.variant.Variant;
30+
import org.apache.paimon.types.ArrayType;
31+
import org.apache.paimon.types.DataType;
2932

3033
/** Adapter class for converting Fluss InternalArray to Paimon InternalArray. */
3134
public class FlussArrayAsPaimonArray implements InternalArray {
3235

3336
private final org.apache.fluss.row.InternalArray flussArray;
37+
private final DataType elementType;
3438

35-
public FlussArrayAsPaimonArray(org.apache.fluss.row.InternalArray flussArray) {
39+
public FlussArrayAsPaimonArray(
40+
org.apache.fluss.row.InternalArray flussArray, DataType elementType) {
3641
this.flussArray = flussArray;
42+
this.elementType = elementType;
3743
}
3844

3945
@Override
@@ -99,13 +105,30 @@ public Decimal getDecimal(int pos, int precision, int scale) {
99105
@Override
100106
public Timestamp getTimestamp(int pos, int precision) {
101107
// Default to TIMESTAMP_WITHOUT_TIME_ZONE behavior for arrays
102-
if (TimestampNtz.isCompact(precision)) {
103-
return Timestamp.fromEpochMillis(
104-
flussArray.getTimestampNtz(pos, precision).getMillisecond());
105-
} else {
106-
TimestampNtz timestampNtz = flussArray.getTimestampNtz(pos, precision);
107-
return Timestamp.fromEpochMillis(
108-
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
108+
switch (elementType.getTypeRoot()) {
109+
case TIMESTAMP_WITHOUT_TIME_ZONE:
110+
if (TimestampNtz.isCompact(precision)) {
111+
return Timestamp.fromEpochMillis(
112+
flussArray.getTimestampNtz(pos, precision).getMillisecond());
113+
} else {
114+
TimestampNtz timestampNtz = flussArray.getTimestampNtz(pos, precision);
115+
return Timestamp.fromEpochMillis(
116+
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
117+
}
118+
119+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
120+
if (TimestampLtz.isCompact(precision)) {
121+
return Timestamp.fromEpochMillis(
122+
flussArray.getTimestampLtz(pos, precision).getEpochMillisecond());
123+
} else {
124+
TimestampLtz timestampLtz = flussArray.getTimestampLtz(pos, precision);
125+
return Timestamp.fromEpochMillis(
126+
timestampLtz.getEpochMillisecond(),
127+
timestampLtz.getNanoOfMillisecond());
128+
}
129+
default:
130+
throw new UnsupportedOperationException(
131+
"Unsupported data type to get timestamp: " + elementType);
109132
}
110133
}
111134

@@ -123,7 +146,10 @@ public Variant getVariant(int pos) {
123146
@Override
124147
public InternalArray getArray(int pos) {
125148
org.apache.fluss.row.InternalArray innerArray = flussArray.getArray(pos);
126-
return innerArray == null ? null : new FlussArrayAsPaimonArray(innerArray);
149+
return innerArray == null
150+
? null
151+
: new FlussArrayAsPaimonArray(
152+
innerArray, ((ArrayType) elementType).getElementType());
127153
}
128154

129155
@Override

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.data.InternalRow;
2828
import org.apache.paimon.data.Timestamp;
2929
import org.apache.paimon.data.variant.Variant;
30+
import org.apache.paimon.types.ArrayType;
3031
import org.apache.paimon.types.DataType;
3132
import org.apache.paimon.types.RowKind;
3233
import org.apache.paimon.types.RowType;
@@ -160,7 +161,11 @@ public Variant getVariant(int i) {
160161
@Override
161162
public InternalArray getArray(int pos) {
162163
org.apache.fluss.row.InternalArray flussArray = internalRow.getArray(pos);
163-
return flussArray == null ? null : new FlussArrayAsPaimonArray(flussArray);
164+
return flussArray == null
165+
? null
166+
: new FlussArrayAsPaimonArray(
167+
flussArray,
168+
((ArrayType) tableRowType.getField(pos).type()).getElementType());
164169
}
165170

166171
@Override

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.fluss.metadata.TablePath;
2828
import org.apache.fluss.row.BinaryString;
2929
import org.apache.fluss.row.Decimal;
30+
import org.apache.fluss.row.GenericArray;
3031
import org.apache.fluss.row.GenericRow;
3132
import org.apache.fluss.row.InternalRow;
3233
import org.apache.fluss.row.TimestampLtz;
@@ -132,7 +133,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
132133
String partitionName =
133134
isPartitioned ? waitUntilPartitions(t1).values().iterator().next() : null;
134135
if (partitionName != null) {
135-
queryFilterStr = queryFilterStr + " and c16= '" + partitionName + "'";
136+
queryFilterStr = queryFilterStr + " and c17= '" + partitionName + "'";
136137
}
137138

138139
List<Row> expectedRows = new ArrayList<>();
@@ -155,6 +156,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
155156
TimestampNtz.fromMillis(1698235273183L, 0),
156157
TimestampNtz.fromMillis(1698235273183L, 6000),
157158
new byte[] {1, 2, 3, 4},
159+
new float[] {1.1f, 1.2f, 1.3f},
158160
partition));
159161

160162
expectedRows.add(
@@ -174,6 +176,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
174176
TimestampNtz.fromMillis(1698235273201L),
175177
TimestampNtz.fromMillis(1698235273201L, 6000),
176178
new byte[] {1, 2, 3, 4},
179+
new float[] {1.1f, 1.2f, 1.3f},
177180
partition));
178181
}
179182
} else {
@@ -195,6 +198,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
195198
TimestampNtz.fromMillis(1698235273183L, 0),
196199
TimestampNtz.fromMillis(1698235273183L, 6000),
197200
new byte[] {1, 2, 3, 4},
201+
new float[] {1.1f, 1.2f, 1.3f},
198202
null),
199203
Row.of(
200204
true,
@@ -212,6 +216,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
212216
TimestampNtz.fromMillis(1698235273201L),
213217
TimestampNtz.fromMillis(1698235273201L, 6000),
214218
new byte[] {1, 2, 3, 4},
219+
new float[] {1.1f, 1.2f, 1.3f},
215220
null));
216221
}
217222
tableResult =
@@ -225,7 +230,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
225230
row -> {
226231
boolean isMatch = row.getField(3).equals(30);
227232
if (partitionName != null) {
228-
isMatch = isMatch && row.getField(15).equals(partitionName);
233+
isMatch = isMatch && row.getField(16).equals(partitionName);
229234
}
230235
return isMatch;
231236
})
@@ -300,6 +305,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
300305
TimestampNtz.fromMillis(1698235273183L),
301306
TimestampNtz.fromMillis(1698235273183L, 6000),
302307
new byte[] {1, 2, 3, 4},
308+
new float[] {1.1f, 1.2f, 1.3f},
303309
partition));
304310

305311
expectedRows.add(
@@ -319,6 +325,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
319325
TimestampNtz.fromMillis(1698235273501L),
320326
TimestampNtz.fromMillis(1698235273501L, 8000),
321327
new byte[] {5, 6, 7, 8},
328+
new float[] {2.1f, 2.2f, 2.3f},
322329
partition));
323330
}
324331
} else {
@@ -340,6 +347,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
340347
TimestampNtz.fromMillis(1698235273183L),
341348
TimestampNtz.fromMillis(1698235273183L, 6000),
342349
new byte[] {1, 2, 3, 4},
350+
new float[] {1.1f, 1.2f, 1.3f},
343351
null),
344352
Row.of(
345353
true,
@@ -357,6 +365,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
357365
TimestampNtz.fromMillis(1698235273501L),
358366
TimestampNtz.fromMillis(1698235273501L, 8000),
359367
new byte[] {5, 6, 7, 8},
368+
new float[] {2.1f, 2.2f, 2.3f},
360369
null));
361370
}
362371

@@ -473,6 +482,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
473482
TimestampNtz.fromMillis(1698235273183L),
474483
TimestampNtz.fromMillis(1698235273183L, 6000),
475484
new byte[] {1, 2, 3, 4},
485+
new float[] {1.1f, 1.2f, 1.3f},
476486
partition));
477487
expectedRows.add(
478488
Row.of(
@@ -491,6 +501,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
491501
TimestampNtz.fromMillis(1698235273201L),
492502
TimestampNtz.fromMillis(1698235273201L, 6000),
493503
new byte[] {1, 2, 3, 4},
504+
new float[] {1.1f, 1.2f, 1.3f},
494505
partition));
495506
}
496507
} else {
@@ -512,6 +523,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
512523
TimestampNtz.fromMillis(1698235273183L),
513524
TimestampNtz.fromMillis(1698235273183L, 6000),
514525
new byte[] {1, 2, 3, 4},
526+
new float[] {1.1f, 1.2f, 1.3f},
515527
null),
516528
Row.of(
517529
true,
@@ -529,6 +541,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
529541
TimestampNtz.fromMillis(1698235273201L),
530542
TimestampNtz.fromMillis(1698235273201L, 6000),
531543
new byte[] {1, 2, 3, 4},
544+
new float[] {1.1f, 1.2f, 1.3f},
532545
null));
533546
}
534547

@@ -571,6 +584,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
571584
TimestampNtz.fromMillis(1698235273201L),
572585
TimestampNtz.fromMillis(1698235273201L, 6000),
573586
new byte[] {1, 2, 3, 4},
587+
new float[] {1.1f, 1.2f, 1.3f},
574588
partition));
575589
expectedRows2.add(
576590
Row.ofKind(
@@ -590,6 +604,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
590604
TimestampNtz.fromMillis(1698235273501L),
591605
TimestampNtz.fromMillis(1698235273501L, 8000),
592606
new byte[] {5, 6, 7, 8},
607+
new float[] {2.1f, 2.2f, 2.3f},
593608
partition));
594609
}
595610
} else {
@@ -611,6 +626,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
611626
TimestampNtz.fromMillis(1698235273201L),
612627
TimestampNtz.fromMillis(1698235273201L, 6000),
613628
new byte[] {1, 2, 3, 4},
629+
new float[] {1.1f, 1.2f, 1.3f},
614630
null));
615631
expectedRows2.add(
616632
Row.ofKind(
@@ -630,6 +646,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
630646
TimestampNtz.fromMillis(1698235273501L),
631647
TimestampNtz.fromMillis(1698235273501L, 8000),
632648
new byte[] {5, 6, 7, 8},
649+
new float[] {2.1f, 2.2f, 2.3f},
633650
null));
634651
}
635652

@@ -962,21 +979,21 @@ private String buildExpectedResult(boolean isPartitioned, int record1, int recor
962979
+ "2023-10-25T12:01:13.182005Z, "
963980
+ "2023-10-25T12:01:13.183, "
964981
+ "2023-10-25T12:01:13.183006, "
965-
+ "[1, 2, 3, 4], %s]");
982+
+ "[1, 2, 3, 4], [1.1, 1.2, 1.3], %s]");
966983
records.add(
967984
"+I[true, 10, 20, 30, 40, 50.1, 60.0, another_string, 0.90, 100, "
968985
+ "2023-10-25T12:01:13.200Z, "
969986
+ "2023-10-25T12:01:13.200005Z, "
970987
+ "2023-10-25T12:01:13.201, "
971988
+ "2023-10-25T12:01:13.201006, "
972-
+ "[1, 2, 3, 4], %s]");
989+
+ "[1, 2, 3, 4], [1.1, 1.2, 1.3], %s]");
973990
records.add(
974991
"+I[true, 100, 200, 30, 400, 500.1, 600.0, another_string_2, 9.00, 1000, "
975992
+ "2023-10-25T12:01:13.400Z, "
976993
+ "2023-10-25T12:01:13.400007Z, "
977994
+ "2023-10-25T12:01:13.501, "
978995
+ "2023-10-25T12:01:13.501008, "
979-
+ "[5, 6, 7, 8], %s]");
996+
+ "[5, 6, 7, 8], [2.1, 2.2, 2.3], %s]");
980997

981998
if (isPartitioned) {
982999
return String.format(
@@ -1012,9 +1029,10 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean
10121029
.column("c13", DataTypes.TIMESTAMP(3))
10131030
.column("c14", DataTypes.TIMESTAMP(6))
10141031
.column("c15", DataTypes.BINARY(4))
1015-
.column("c16", DataTypes.STRING());
1032+
.column("c16", DataTypes.ARRAY(DataTypes.FLOAT()))
1033+
.column("c17", DataTypes.STRING());
10161034

1017-
return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c16");
1035+
return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c17");
10181036
}
10191037

10201038
protected long createSimplePkTable(
@@ -1079,6 +1097,7 @@ private void writeFullTypeRow(TablePath tablePath, String partition) throws Exce
10791097
TimestampNtz.fromMillis(1698235273501L),
10801098
TimestampNtz.fromMillis(1698235273501L, 8000),
10811099
new byte[] {5, 6, 7, 8},
1100+
new GenericArray(new float[] {2.1f, 2.2f, 2.3f}),
10821101
partition));
10831102
writeRows(tablePath, rows, false);
10841103
}
@@ -1101,6 +1120,7 @@ private static List<InternalRow> generateKvRowsFullType(@Nullable String partiti
11011120
TimestampNtz.fromMillis(1698235273183L),
11021121
TimestampNtz.fromMillis(1698235273183L, 6000),
11031122
new byte[] {1, 2, 3, 4},
1123+
new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
11041124
partition),
11051125
row(
11061126
true,
@@ -1118,6 +1138,7 @@ private static List<InternalRow> generateKvRowsFullType(@Nullable String partiti
11181138
TimestampNtz.fromMillis(1698235273201L),
11191139
TimestampNtz.fromMillis(1698235273201L, 6000),
11201140
new byte[] {1, 2, 3, 4},
1141+
new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
11211142
partition));
11221143
}
11231144
}

0 commit comments

Comments
 (0)