Skip to content

Commit ced8cba

Browse files
[test/flink] Cover full data types in UnionRead tests and polish some tests
Co-authored-by: Leonard Xu <[email protected]>
1 parent 56279b3 commit ced8cba

File tree

4 files changed

+510
-252
lines changed

4 files changed

+510
-252
lines changed

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java

Lines changed: 96 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
package com.alibaba.fluss.lake.paimon.flink;
1919

2020
import com.alibaba.fluss.metadata.TablePath;
21+
import com.alibaba.fluss.row.Decimal;
2122
import com.alibaba.fluss.row.InternalRow;
23+
import com.alibaba.fluss.row.TimestampLtz;
24+
import com.alibaba.fluss.row.TimestampNtz;
2225

2326
import org.apache.flink.core.execution.JobClient;
2427
import org.apache.flink.types.Row;
@@ -29,6 +32,9 @@
2932

3033
import javax.annotation.Nullable;
3134

35+
import java.time.Instant;
36+
import java.time.LocalDateTime;
37+
import java.time.ZoneId;
3238
import java.util.ArrayList;
3339
import java.util.List;
3440
import java.util.Map;
@@ -47,7 +53,7 @@ protected static void beforeAll() {
4753

4854
@ParameterizedTest
4955
@ValueSource(booleans = {false, true})
50-
void testReadLogTable(boolean isPartitioned) throws Exception {
56+
void testReadLogTableFullType(boolean isPartitioned) throws Exception {
5157
// first of all, start tiering
5258
JobClient jobClient = buildTieringJob(execEnv);
5359

@@ -56,7 +62,7 @@ void testReadLogTable(boolean isPartitioned) throws Exception {
5662
TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
5763
List<Row> writtenRows = new ArrayList<>();
5864
long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, writtenRows);
59-
// wait until records has has been synced
65+
// wait until records has been synced
6066
waitUtilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
6167

6268
// now, start to read the log table, which will read paimon
@@ -83,7 +89,7 @@ void testReadLogTable(boolean isPartitioned) throws Exception {
8389
// test project push down
8490
actual =
8591
CollectionUtil.iteratorToList(
86-
batchTEnv.executeSql("select b from " + tableName).collect());
92+
batchTEnv.executeSql("select f_byte from " + tableName).collect());
8793
List<Row> expected =
8894
writtenRows.stream()
8995
.map(row -> Row.of(row.getField(1)))
@@ -94,33 +100,109 @@ void testReadLogTable(boolean isPartitioned) throws Exception {
94100
private long prepareLogTable(
95101
TablePath tablePath, int bucketNum, boolean isPartitioned, List<Row> flinkRows)
96102
throws Exception {
97-
long t1Id = createLogTable(tablePath, bucketNum, isPartitioned);
103+
long t1Id = createFullTypeLogTable(tablePath, bucketNum, isPartitioned);
98104
if (isPartitioned) {
99105
Map<Long, String> partitionNameById = waitUntilPartitions(tablePath);
100106
for (String partition : partitionNameById.values()) {
101107
for (int i = 0; i < 3; i++) {
102-
flinkRows.addAll(writeRows(tablePath, 10, partition));
108+
flinkRows.addAll(writeFullTypeRows(tablePath, 10, partition));
103109
}
104110
}
105111
} else {
106112
for (int i = 0; i < 3; i++) {
107-
flinkRows.addAll(writeRows(tablePath, 10, null));
113+
flinkRows.addAll(writeFullTypeRows(tablePath, 10, null));
108114
}
109115
}
110116
return t1Id;
111117
}
112118

113-
private List<Row> writeRows(TablePath tablePath, int rowCount, @Nullable String partition)
114-
throws Exception {
119+
private List<Row> writeFullTypeRows(
120+
TablePath tablePath, int rowCount, @Nullable String partition) throws Exception {
115121
List<InternalRow> rows = new ArrayList<>();
116122
List<Row> flinkRows = new ArrayList<>();
117123
for (int i = 0; i < rowCount; i++) {
118124
if (partition == null) {
119-
rows.add(row(i, "v" + i));
120-
flinkRows.add(Row.of(i, "v" + i));
125+
rows.add(
126+
row(
127+
true,
128+
(byte) 100,
129+
(short) 200,
130+
30,
131+
400L,
132+
500.1f,
133+
600.0d,
134+
"another_string_" + i,
135+
Decimal.fromUnscaledLong(900, 5, 2),
136+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
137+
TimestampLtz.fromEpochMillis(1698235273400L),
138+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
139+
TimestampNtz.fromMillis(1698235273501L),
140+
TimestampNtz.fromMillis(1698235273501L, 8000),
141+
new byte[] {5, 6, 7, 8}));
142+
143+
flinkRows.add(
144+
Row.of(
145+
true,
146+
(byte) 100,
147+
(short) 200,
148+
30,
149+
400L,
150+
500.1f,
151+
600.0d,
152+
"another_string_" + i,
153+
new java.math.BigDecimal("9.00"),
154+
new java.math.BigDecimal("1000"),
155+
Instant.ofEpochMilli(1698235273400L),
156+
Instant.ofEpochMilli(1698235273400L).plusNanos(7000),
157+
LocalDateTime.ofInstant(
158+
Instant.ofEpochMilli(1698235273501L), ZoneId.of("UTC")),
159+
LocalDateTime.ofInstant(
160+
Instant.ofEpochMilli(1698235273501L),
161+
ZoneId.of("UTC"))
162+
.plusNanos(8000),
163+
new byte[] {5, 6, 7, 8}));
121164
} else {
122-
rows.add(row(i, "v" + i, partition));
123-
flinkRows.add(Row.of(i, "v" + i, partition));
165+
rows.add(
166+
row(
167+
true,
168+
(byte) 100,
169+
(short) 200,
170+
30,
171+
400L,
172+
500.1f,
173+
600.0d,
174+
"another_string_" + i,
175+
Decimal.fromUnscaledLong(900, 5, 2),
176+
Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
177+
TimestampLtz.fromEpochMillis(1698235273400L),
178+
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
179+
TimestampNtz.fromMillis(1698235273501L),
180+
TimestampNtz.fromMillis(1698235273501L, 8000),
181+
new byte[] {5, 6, 7, 8},
182+
partition));
183+
184+
flinkRows.add(
185+
Row.of(
186+
true,
187+
(byte) 100,
188+
(short) 200,
189+
30,
190+
400L,
191+
500.1f,
192+
600.0d,
193+
"another_string_" + i,
194+
new java.math.BigDecimal("9.00"),
195+
new java.math.BigDecimal("1000"),
196+
Instant.ofEpochMilli(1698235273400L),
197+
Instant.ofEpochMilli(1698235273400L).plusNanos(7000),
198+
LocalDateTime.ofInstant(
199+
Instant.ofEpochMilli(1698235273501L), ZoneId.of("UTC")),
200+
LocalDateTime.ofInstant(
201+
Instant.ofEpochMilli(1698235273501L),
202+
ZoneId.of("UTC"))
203+
.plusNanos(8000),
204+
new byte[] {5, 6, 7, 8},
205+
partition));
124206
}
125207
}
126208
writeRows(tablePath, rows, true);
@@ -132,11 +214,11 @@ private List<Row> writeRows(TablePath tablePath, int rowCount, boolean isPartiti
132214
if (isPartitioned) {
133215
List<Row> rows = new ArrayList<>();
134216
for (String partition : waitUntilPartitions(tablePath).values()) {
135-
rows.addAll(writeRows(tablePath, rowCount, partition));
217+
rows.addAll(writeFullTypeRows(tablePath, rowCount, partition));
136218
}
137219
return rows;
138220
} else {
139-
return writeRows(tablePath, rowCount, null);
221+
return writeFullTypeRows(tablePath, rowCount, null);
140222
}
141223
}
142224
}

0 commit comments

Comments
 (0)