Skip to content

Commit 3a48f6d

Browse files
authored
[test/flink] Collect flink rows should always be with timeout (#1622)
1 parent 30dc6e3 commit 3a48f6d

File tree

6 files changed

+179
-142
lines changed

6 files changed

+179
-142
lines changed

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666

6767
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
6868
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
69+
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
6970
import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
7071
import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
7172
import static org.assertj.core.api.Assertions.assertThat;
@@ -210,11 +211,7 @@ void testAppendLogWithBucketKey(boolean sinkBucketShuffle) throws Exception {
210211
List<String> expectedRows =
211212
expectedGroups.stream().flatMap(List::stream).collect(Collectors.toList());
212213

213-
List<String> actual = new ArrayList<>(expectedRows.size());
214-
for (int i = 0; i < expectedRows.size(); i++) {
215-
actual.add(rowIter.next().toString());
216-
}
217-
rowIter.close();
214+
List<String> actual = collectRowsWithTimeout(rowIter, expectedRows.size());
218215
assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRows);
219216

220217
// check data with the same bucket key should be read in sequence.

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
4747
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
48+
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
4849
import static org.apache.fluss.server.testutils.FlussClusterExtension.BUILTIN_DATABASE;
4950
import static org.apache.fluss.testutils.DataTestUtils.row;
5051
import static org.assertj.core.api.Assertions.assertThat;
@@ -189,7 +190,7 @@ void testLimitPrimaryTableScan() throws Exception {
189190
// normal scan
190191
String query = String.format("SELECT * FROM %s limit 2", tableName);
191192
CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
192-
List<String> collected = assertAndCollectRecords(iterRows, 2);
193+
List<String> collected = collectRowsWithTimeout(iterRows, 2);
193194
List<String> expected =
194195
Arrays.asList(
195196
"+I[1, address1, name1]",
@@ -203,14 +204,14 @@ void testLimitPrimaryTableScan() throws Exception {
203204
// limit which is larger than all the data.
204205
query = String.format("SELECT * FROM %s limit 10", tableName);
205206
iterRows = tEnv.executeSql(query).collect();
206-
collected = assertAndCollectRecords(iterRows, 5);
207+
collected = collectRowsWithTimeout(iterRows, 5);
207208
assertThat(collected).isSubsetOf(expected);
208209
assertThat(collected).hasSize(5);
209210

210211
// projection scan
211212
query = String.format("SELECT id, name FROM %s limit 3", tableName);
212213
iterRows = tEnv.executeSql(query).collect();
213-
collected = assertAndCollectRecords(iterRows, 3);
214+
collected = collectRowsWithTimeout(iterRows, 3);
214215
expected =
215216
Arrays.asList(
216217
"+I[1, name1]",
@@ -237,7 +238,7 @@ void testLimitLogTableScan() throws Exception {
237238
// normal scan
238239
String query = String.format("SELECT * FROM %s limit 2", tableName);
239240
CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
240-
List<String> collected = assertAndCollectRecords(iterRows, 2);
241+
List<String> collected = collectRowsWithTimeout(iterRows, 2);
241242
List<String> expected =
242243
Arrays.asList(
243244
"+I[1, address1, name1]",
@@ -251,7 +252,7 @@ void testLimitLogTableScan() throws Exception {
251252
// projection scan
252253
query = String.format("SELECT id, name FROM %s limit 3", tableName);
253254
iterRows = tEnv.executeSql(query).collect();
254-
collected = assertAndCollectRecords(iterRows, 3);
255+
collected = collectRowsWithTimeout(iterRows, 3);
255256
expected =
256257
Arrays.asList(
257258
"+I[1, name1]",
@@ -266,7 +267,7 @@ void testLimitLogTableScan() throws Exception {
266267
String partitionTable = preparePartitionedLogTable();
267268
query = String.format("SELECT id, name FROM %s limit 3", partitionTable);
268269
iterRows = tEnv.executeSql(query).collect();
269-
collected = assertAndCollectRecords(iterRows, 3);
270+
collected = collectRowsWithTimeout(iterRows, 3);
270271
assertThat(collected).isSubsetOf(expected);
271272
assertThat(collected).hasSize(3);
272273
}
@@ -286,7 +287,7 @@ void testCountPushDown(boolean partitionTable) throws Exception {
286287
+ "fields=[count1$0])",
287288
tableName));
288289
CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
289-
List<String> collected = assertAndCollectRecords(iterRows, 1);
290+
List<String> collected = collectRowsWithTimeout(iterRows, 1);
290291
List<String> expected = Collections.singletonList(String.format("+I[%s]", expectedRows));
291292
assertThat(collected).isEqualTo(expected);
292293

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java

Lines changed: 33 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@
7070
import java.util.stream.Stream;
7171

7272
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
73+
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertQueryResultExactOrder;
7374
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
75+
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
7476
import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
7577
import static org.apache.fluss.flink.utils.FlinkTestBase.writeRows;
7678
import static org.apache.fluss.flink.utils.FlinkTestBase.writeRowsToPartition;
@@ -205,16 +207,7 @@ void testNonPkTableRead() throws Exception {
205207
writeRows(conn, tablePath, rows, true);
206208

207209
List<String> expected = Arrays.asList("+I[1, v1]", "+I[2, v2]", "+I[3, v3]");
208-
try (org.apache.flink.util.CloseableIterator<Row> rowIter =
209-
tEnv.executeSql("select * from non_pk_table_test").collect()) {
210-
int expectRecords = expected.size();
211-
List<String> actual = new ArrayList<>(expectRecords);
212-
for (int i = 0; i < expectRecords; i++) {
213-
String row = rowIter.next().toString();
214-
actual.add(row);
215-
}
216-
assertThat(actual).containsExactlyElementsOf(expected);
217-
}
210+
assertQueryResultExactOrder(tEnv, "select * from non_pk_table_test", expected);
218211
}
219212

220213
@ParameterizedTest
@@ -262,17 +255,7 @@ void testAppendTableProjectPushDown(String logFormat) throws Exception {
262255
"+I[v8, 8000, 800]",
263256
"+I[v9, 9000, 900]",
264257
"+I[v10, 10000, 1000]");
265-
try (org.apache.flink.util.CloseableIterator<Row> rowIter =
266-
tEnv.executeSql(query).collect()) {
267-
int expectRecords = expected.size();
268-
List<String> actual = new ArrayList<>(expectRecords);
269-
for (int i = 0; i < expectRecords; i++) {
270-
Row r = rowIter.next();
271-
String row = r.toString();
272-
actual.add(row);
273-
}
274-
assertThat(actual).containsExactlyElementsOf(expected);
275-
}
258+
assertQueryResultExactOrder(tEnv, query, expected);
276259
}
277260

278261
@ParameterizedTest
@@ -331,22 +314,15 @@ void testTableProjectPushDown(String mode) throws Exception {
331314
"+I[v8, 8, 800]",
332315
"+I[v9, 9, 900]",
333316
"+I[v10, 10, 1000]");
334-
try (org.apache.flink.util.CloseableIterator<Row> rowIter =
335-
tEnv.executeSql(query).collect()) {
336-
int expectRecords = expected.size();
337-
List<String> actual = new ArrayList<>(expectRecords);
338-
if (testPkLog) {
339-
// delay the write after collect job start,
340-
// to make sure reading from log instead of snapshot
341-
writeRows(conn, tablePath, rows, false);
342-
}
343-
for (int i = 0; i < expectRecords; i++) {
344-
Row r = rowIter.next();
345-
String row = r.toString();
346-
actual.add(row);
347-
}
348-
assertThat(actual).containsExactlyElementsOf(expected);
317+
org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
318+
if (testPkLog) {
319+
// delay the write after collect job start,
320+
// to make sure reading from log instead of snapshot
321+
writeRows(conn, tablePath, rows, false);
349322
}
323+
int expectRecords = expected.size();
324+
List<String> actual = collectRowsWithTimeout(rowIter, expectRecords);
325+
assertThat(actual).containsExactlyElementsOf(expected);
350326
}
351327

352328
@Test
@@ -451,12 +427,12 @@ void testReadLogTableWithDifferentScanStartupMode(boolean isPartitioned) throws
451427
"+I[8, v8, 800, 8000]",
452428
"+I[9, v9, 900, 9000]",
453429
"+I[10, v10, 1000, 10000]");
454-
assertQueryResult(query, expected);
430+
assertQueryResultExactOrder(tEnv, query, expected);
455431

456432
// 2. read kv table with scan.startup.mode='earliest'
457433
options = " /*+ OPTIONS('scan.startup.mode' = 'earliest') */";
458434
query = "select a, b, c, d from " + tableName + options;
459-
assertQueryResult(query, expected);
435+
assertQueryResultExactOrder(tEnv, query, expected);
460436

461437
// 3. read log table with scan.startup.mode='timestamp'
462438
expected =
@@ -471,7 +447,7 @@ void testReadLogTableWithDifferentScanStartupMode(boolean isPartitioned) throws
471447
" /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' ='%d') */",
472448
timestamp);
473449
query = "select a, b, c, d from " + tableName + options;
474-
assertQueryResult(query, expected);
450+
assertQueryResultExactOrder(tEnv, query, expected);
475451
}
476452

477453
@Test
@@ -501,20 +477,13 @@ void testReadKvTableWithScanStartupModeEqualsFull() throws Exception {
501477
"-U[2, v2]",
502478
"+U[2, v22]",
503479
"+I[4, v4]");
504-
try (org.apache.flink.util.CloseableIterator<Row> rowIter =
505-
tEnv.executeSql(query).collect()) {
506-
int expectRecords = 8;
507-
List<String> actual = new ArrayList<>(expectRecords);
508-
// delay to write after collect job start, to make sure reading from log instead of
509-
// snapshot
510-
writeRows(conn, tablePath, rows2, false);
511-
for (int i = 0; i < expectRecords; i++) {
512-
Row r = rowIter.next();
513-
String row = r.toString();
514-
actual.add(row);
515-
}
516-
assertThat(actual).containsExactlyElementsOf(expected);
517-
}
480+
org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
481+
int expectRecords = 8;
482+
// delay to write after collect job start, to make sure reading from log instead of
483+
// snapshot
484+
writeRows(conn, tablePath, rows2, false);
485+
List<String> actual = collectRowsWithTimeout(rowIter, expectRecords);
486+
assertThat(actual).containsExactlyElementsOf(expected);
518487
}
519488

520489
private static Stream<Arguments> readKvTableScanStartupModeArgs() {
@@ -595,17 +564,7 @@ void testReadKvTableWithEarliestAndTimestampScanStartupMode(String mode, boolean
595564
"-U[2, v2]",
596565
"+U[2, v22]",
597566
"+I[4, v4]");
598-
try (org.apache.flink.util.CloseableIterator<Row> rowIter =
599-
tEnv.executeSql(query).collect()) {
600-
int expectRecords = 10;
601-
List<String> actual = new ArrayList<>(expectRecords);
602-
for (int i = 0; i < expectRecords; i++) {
603-
Row r = rowIter.next();
604-
String row = r.toString();
605-
actual.add(row);
606-
}
607-
assertThat(actual).containsExactlyElementsOf(expected);
608-
}
567+
assertQueryResultExactOrder(tEnv, query, expected);
609568
}
610569

611570
@ParameterizedTest
@@ -687,25 +646,20 @@ void testReadTimestampGreaterThanMaxTimestamp() throws Exception {
687646
"the fetch timestamp %s is larger than the current timestamp",
688647
currentTimeMillis + Duration.ofMinutes(5).toMillis()));
689648

690-
try (org.apache.flink.util.CloseableIterator<Row> rowIter =
649+
org.apache.flink.util.CloseableIterator<Row> rowIter =
691650
tEnv.executeSql(
692651
String.format(
693652
"select * from timestamp_table /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '%s') */ ",
694653
currentTimeMillis))
695-
.collect()) {
696-
CLOCK.advanceTime(Duration.ofMillis(100L));
697-
// write second batch record.
698-
rows = Arrays.asList(row(4, "v4"), row(5, "v5"), row(6, "v6"));
699-
writeRows(conn, tablePath, rows, true);
700-
List<String> expected = Arrays.asList("+I[4, v4]", "+I[5, v5]", "+I[6, v6]");
701-
int expectRecords = expected.size();
702-
List<String> actual = new ArrayList<>(expectRecords);
703-
for (int i = 0; i < expectRecords; i++) {
704-
String row = rowIter.next().toString();
705-
actual.add(row);
706-
}
707-
assertThat(actual).containsExactlyElementsOf(expected);
708-
}
654+
.collect();
655+
CLOCK.advanceTime(Duration.ofMillis(100L));
656+
// write second batch record.
657+
rows = Arrays.asList(row(4, "v4"), row(5, "v5"), row(6, "v6"));
658+
writeRows(conn, tablePath, rows, true);
659+
List<String> expected = Arrays.asList("+I[4, v4]", "+I[5, v5]", "+I[6, v6]");
660+
int expectRecords = expected.size();
661+
List<String> actual = collectRowsWithTimeout(rowIter, expectRecords);
662+
assertThat(actual).containsExactlyElementsOf(expected);
709663
}
710664

711665
// -------------------------------------------------------------------------------------
@@ -1319,20 +1273,6 @@ private void waitUntilAllBucketFinishSnapshot(
13191273
"Fail to wait until all bucket finish snapshot");
13201274
}
13211275

1322-
private void assertQueryResult(String query, List<String> expected) throws Exception {
1323-
try (org.apache.flink.util.CloseableIterator<Row> rowIter =
1324-
tEnv.executeSql(query).collect()) {
1325-
int expectRecords = expected.size();
1326-
List<String> actual = new ArrayList<>(expectRecords);
1327-
for (int i = 0; i < expectRecords; i++) {
1328-
Row r = rowIter.next();
1329-
String row = r.toString();
1330-
actual.add(row);
1331-
}
1332-
assertThat(actual).containsExactlyElementsOf(expected);
1333-
}
1334-
}
1335-
13361276
private GenericRow rowWithPartition(Object[] values, @Nullable String partition) {
13371277
if (partition == null) {
13381278
return row(values);

0 commit comments

Comments
 (0)