Skip to content

Commit bb18842

Browse files
committed
improve test
1 parent 1445b13 commit bb18842

File tree

5 files changed

+325
-610
lines changed

5 files changed

+325
-610
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussArrayAsPaimonArray.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.lake.paimon.source;
1919

20+
import org.apache.fluss.row.TimestampLtz;
2021
import org.apache.fluss.row.TimestampNtz;
2122

2223
import org.apache.paimon.data.BinaryString;
@@ -26,14 +27,19 @@
2627
import org.apache.paimon.data.InternalRow;
2728
import org.apache.paimon.data.Timestamp;
2829
import org.apache.paimon.data.variant.Variant;
30+
import org.apache.paimon.types.ArrayType;
31+
import org.apache.paimon.types.DataType;
2932

3033
/** Adapter class for converting Fluss InternalArray to Paimon InternalArray. */
3134
public class FlussArrayAsPaimonArray implements InternalArray {
3235

3336
private final org.apache.fluss.row.InternalArray flussArray;
37+
private final DataType elementType;
3438

35-
public FlussArrayAsPaimonArray(org.apache.fluss.row.InternalArray flussArray) {
39+
public FlussArrayAsPaimonArray(
40+
org.apache.fluss.row.InternalArray flussArray, DataType elementType) {
3641
this.flussArray = flussArray;
42+
this.elementType = elementType;
3743
}
3844

3945
@Override
@@ -99,13 +105,30 @@ public Decimal getDecimal(int pos, int precision, int scale) {
99105
@Override
100106
public Timestamp getTimestamp(int pos, int precision) {
101107
// Default to TIMESTAMP_WITHOUT_TIME_ZONE behavior for arrays
102-
if (TimestampNtz.isCompact(precision)) {
103-
return Timestamp.fromEpochMillis(
104-
flussArray.getTimestampNtz(pos, precision).getMillisecond());
105-
} else {
106-
TimestampNtz timestampNtz = flussArray.getTimestampNtz(pos, precision);
107-
return Timestamp.fromEpochMillis(
108-
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
108+
switch (elementType.getTypeRoot()) {
109+
case TIMESTAMP_WITHOUT_TIME_ZONE:
110+
if (TimestampNtz.isCompact(precision)) {
111+
return Timestamp.fromEpochMillis(
112+
flussArray.getTimestampNtz(pos, precision).getMillisecond());
113+
} else {
114+
TimestampNtz timestampNtz = flussArray.getTimestampNtz(pos, precision);
115+
return Timestamp.fromEpochMillis(
116+
timestampNtz.getMillisecond(), timestampNtz.getNanoOfMillisecond());
117+
}
118+
119+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
120+
if (TimestampLtz.isCompact(precision)) {
121+
return Timestamp.fromEpochMillis(
122+
flussArray.getTimestampLtz(pos, precision).getEpochMillisecond());
123+
} else {
124+
TimestampLtz timestampLtz = flussArray.getTimestampLtz(pos, precision);
125+
return Timestamp.fromEpochMillis(
126+
timestampLtz.getEpochMillisecond(),
127+
timestampLtz.getNanoOfMillisecond());
128+
}
129+
default:
130+
throw new UnsupportedOperationException(
131+
"Unsupported data type to get timestamp: " + elementType);
109132
}
110133
}
111134

@@ -123,7 +146,10 @@ public Variant getVariant(int pos) {
123146
@Override
124147
public InternalArray getArray(int pos) {
125148
org.apache.fluss.row.InternalArray innerArray = flussArray.getArray(pos);
126-
return innerArray == null ? null : new FlussArrayAsPaimonArray(innerArray);
149+
return innerArray == null
150+
? null
151+
: new FlussArrayAsPaimonArray(
152+
innerArray, ((ArrayType) elementType).getElementType());
127153
}
128154

129155
@Override

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/FlussRowAsPaimonRow.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.data.InternalRow;
2828
import org.apache.paimon.data.Timestamp;
2929
import org.apache.paimon.data.variant.Variant;
30+
import org.apache.paimon.types.ArrayType;
3031
import org.apache.paimon.types.DataType;
3132
import org.apache.paimon.types.RowKind;
3233
import org.apache.paimon.types.RowType;
@@ -160,7 +161,11 @@ public Variant getVariant(int i) {
160161
@Override
161162
public InternalArray getArray(int pos) {
162163
org.apache.fluss.row.InternalArray flussArray = internalRow.getArray(pos);
163-
return flussArray == null ? null : new FlussArrayAsPaimonArray(flussArray);
164+
return flussArray == null
165+
? null
166+
: new FlussArrayAsPaimonArray(
167+
flussArray,
168+
((ArrayType) tableRowType.getField(pos).type()).getElementType());
164169
}
165170

166171
@Override

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.fluss.metadata.TableDescriptor;
2525
import org.apache.fluss.metadata.TablePath;
2626
import org.apache.fluss.row.Decimal;
27+
import org.apache.fluss.row.GenericArray;
2728
import org.apache.fluss.row.InternalRow;
2829
import org.apache.fluss.row.TimestampLtz;
2930
import org.apache.fluss.row.TimestampNtz;
@@ -125,7 +126,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
125126
String partitionName =
126127
isPartitioned ? waitUntilPartitions(t1).values().iterator().next() : null;
127128
if (partitionName != null) {
128-
queryFilterStr = queryFilterStr + " and c16= '" + partitionName + "'";
129+
queryFilterStr = queryFilterStr + " and c17= '" + partitionName + "'";
129130
}
130131

131132
List<Row> expectedRows = new ArrayList<>();
@@ -148,6 +149,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
148149
TimestampNtz.fromMillis(1698235273183L, 0),
149150
TimestampNtz.fromMillis(1698235273183L, 6000),
150151
new byte[] {1, 2, 3, 4},
152+
new float[] {1.1f, 1.2f, 1.3f},
151153
partition));
152154

153155
expectedRows.add(
@@ -167,6 +169,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
167169
TimestampNtz.fromMillis(1698235273201L),
168170
TimestampNtz.fromMillis(1698235273201L, 6000),
169171
new byte[] {1, 2, 3, 4},
172+
new float[] {1.1f, 1.2f, 1.3f},
170173
partition));
171174
}
172175
} else {
@@ -188,6 +191,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
188191
TimestampNtz.fromMillis(1698235273183L, 0),
189192
TimestampNtz.fromMillis(1698235273183L, 6000),
190193
new byte[] {1, 2, 3, 4},
194+
new float[] {1.1f, 1.2f, 1.3f},
191195
null),
192196
Row.of(
193197
true,
@@ -205,6 +209,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
205209
TimestampNtz.fromMillis(1698235273201L),
206210
TimestampNtz.fromMillis(1698235273201L, 6000),
207211
new byte[] {1, 2, 3, 4},
212+
new float[] {1.1f, 1.2f, 1.3f},
208213
null));
209214
}
210215
tableResult =
@@ -218,7 +223,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
218223
row -> {
219224
boolean isMatch = row.getField(3).equals(30);
220225
if (partitionName != null) {
221-
isMatch = isMatch && row.getField(15).equals(partitionName);
226+
isMatch = isMatch && row.getField(16).equals(partitionName);
222227
}
223228
return isMatch;
224229
})
@@ -293,6 +298,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
293298
TimestampNtz.fromMillis(1698235273183L),
294299
TimestampNtz.fromMillis(1698235273183L, 6000),
295300
new byte[] {1, 2, 3, 4},
301+
new float[] {1.1f, 1.2f, 1.3f},
296302
partition));
297303

298304
expectedRows.add(
@@ -312,6 +318,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
312318
TimestampNtz.fromMillis(1698235273501L),
313319
TimestampNtz.fromMillis(1698235273501L, 8000),
314320
new byte[] {5, 6, 7, 8},
321+
new float[] {2.1f, 2.2f, 2.3f},
315322
partition));
316323
}
317324
} else {
@@ -333,6 +340,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
333340
TimestampNtz.fromMillis(1698235273183L),
334341
TimestampNtz.fromMillis(1698235273183L, 6000),
335342
new byte[] {1, 2, 3, 4},
343+
new float[] {1.1f, 1.2f, 1.3f},
336344
null),
337345
Row.of(
338346
true,
@@ -350,6 +358,7 @@ void testUnionReadFullType(Boolean isPartitioned) throws Exception {
350358
TimestampNtz.fromMillis(1698235273501L),
351359
TimestampNtz.fromMillis(1698235273501L, 8000),
352360
new byte[] {5, 6, 7, 8},
361+
new float[] {2.1f, 2.2f, 2.3f},
353362
null));
354363
}
355364

@@ -421,6 +430,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
421430
TimestampNtz.fromMillis(1698235273183L),
422431
TimestampNtz.fromMillis(1698235273183L, 6000),
423432
new byte[] {1, 2, 3, 4},
433+
new float[] {1.1f, 1.2f, 1.3f},
424434
partition));
425435
expectedRows.add(
426436
Row.of(
@@ -439,6 +449,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
439449
TimestampNtz.fromMillis(1698235273201L),
440450
TimestampNtz.fromMillis(1698235273201L, 6000),
441451
new byte[] {1, 2, 3, 4},
452+
new float[] {1.1f, 1.2f, 1.3f},
442453
partition));
443454
}
444455
} else {
@@ -460,6 +471,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
460471
TimestampNtz.fromMillis(1698235273183L),
461472
TimestampNtz.fromMillis(1698235273183L, 6000),
462473
new byte[] {1, 2, 3, 4},
474+
new float[] {1.1f, 1.2f, 1.3f},
463475
null),
464476
Row.of(
465477
true,
@@ -477,6 +489,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
477489
TimestampNtz.fromMillis(1698235273201L),
478490
TimestampNtz.fromMillis(1698235273201L, 6000),
479491
new byte[] {1, 2, 3, 4},
492+
new float[] {1.1f, 1.2f, 1.3f},
480493
null));
481494
}
482495

@@ -519,6 +532,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
519532
TimestampNtz.fromMillis(1698235273201L),
520533
TimestampNtz.fromMillis(1698235273201L, 6000),
521534
new byte[] {1, 2, 3, 4},
535+
new float[] {1.1f, 1.2f, 1.3f},
522536
partition));
523537
expectedRows2.add(
524538
Row.ofKind(
@@ -538,6 +552,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
538552
TimestampNtz.fromMillis(1698235273501L),
539553
TimestampNtz.fromMillis(1698235273501L, 8000),
540554
new byte[] {5, 6, 7, 8},
555+
new float[] {2.1f, 2.2f, 2.3f},
541556
partition));
542557
}
543558
} else {
@@ -559,6 +574,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
559574
TimestampNtz.fromMillis(1698235273201L),
560575
TimestampNtz.fromMillis(1698235273201L, 6000),
561576
new byte[] {1, 2, 3, 4},
577+
new float[] {1.1f, 1.2f, 1.3f},
562578
null));
563579
expectedRows2.add(
564580
Row.ofKind(
@@ -578,6 +594,7 @@ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
578594
TimestampNtz.fromMillis(1698235273501L),
579595
TimestampNtz.fromMillis(1698235273501L, 8000),
580596
new byte[] {5, 6, 7, 8},
597+
new float[] {2.1f, 2.2f, 2.3f},
581598
null));
582599
}
583600

@@ -807,21 +824,21 @@ private String buildExpectedResult(boolean isPartitioned, int record1, int recor
807824
+ "2023-10-25T12:01:13.182005Z, "
808825
+ "2023-10-25T12:01:13.183, "
809826
+ "2023-10-25T12:01:13.183006, "
810-
+ "[1, 2, 3, 4], %s]");
827+
+ "[1, 2, 3, 4], [1.1, 1.2, 1.3], %s]");
811828
records.add(
812829
"+I[true, 10, 20, 30, 40, 50.1, 60.0, another_string, 0.90, 100, "
813830
+ "2023-10-25T12:01:13.200Z, "
814831
+ "2023-10-25T12:01:13.200005Z, "
815832
+ "2023-10-25T12:01:13.201, "
816833
+ "2023-10-25T12:01:13.201006, "
817-
+ "[1, 2, 3, 4], %s]");
834+
+ "[1, 2, 3, 4], [1.1, 1.2, 1.3], %s]");
818835
records.add(
819836
"+I[true, 100, 200, 30, 400, 500.1, 600.0, another_string_2, 9.00, 1000, "
820837
+ "2023-10-25T12:01:13.400Z, "
821838
+ "2023-10-25T12:01:13.400007Z, "
822839
+ "2023-10-25T12:01:13.501, "
823840
+ "2023-10-25T12:01:13.501008, "
824-
+ "[5, 6, 7, 8], %s]");
841+
+ "[5, 6, 7, 8], [2.1, 2.2, 2.3], %s]");
825842

826843
if (isPartitioned) {
827844
return String.format(
@@ -857,9 +874,10 @@ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean
857874
.column("c13", DataTypes.TIMESTAMP(3))
858875
.column("c14", DataTypes.TIMESTAMP(6))
859876
.column("c15", DataTypes.BINARY(4))
860-
.column("c16", DataTypes.STRING());
877+
.column("c16", DataTypes.ARRAY(DataTypes.FLOAT()))
878+
.column("c17", DataTypes.STRING());
861879

862-
return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c16");
880+
return createPkTable(tablePath, bucketNum, isPartitioned, true, schemaBuilder, "c4", "c17");
863881
}
864882

865883
protected long createSimplePkTable(
@@ -924,6 +942,7 @@ private void writeFullTypeRow(TablePath tablePath, String partition) throws Exce
924942
TimestampNtz.fromMillis(1698235273501L),
925943
TimestampNtz.fromMillis(1698235273501L, 8000),
926944
new byte[] {5, 6, 7, 8},
945+
new GenericArray(new float[] {2.1f, 2.2f, 2.3f}),
927946
partition));
928947
writeRows(tablePath, rows, false);
929948
}
@@ -946,6 +965,7 @@ private static List<InternalRow> generateKvRowsFullType(@Nullable String partiti
946965
TimestampNtz.fromMillis(1698235273183L),
947966
TimestampNtz.fromMillis(1698235273183L, 6000),
948967
new byte[] {1, 2, 3, 4},
968+
new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
949969
partition),
950970
row(
951971
true,
@@ -963,6 +983,7 @@ private static List<InternalRow> generateKvRowsFullType(@Nullable String partiti
963983
TimestampNtz.fromMillis(1698235273201L),
964984
TimestampNtz.fromMillis(1698235273201L, 6000),
965985
new byte[] {1, 2, 3, 4},
986+
new GenericArray(new float[] {1.1f, 1.2f, 1.3f}),
966987
partition));
967988
}
968989
}

0 commit comments

Comments
 (0)