Skip to content

Commit 4474a8f

Browse files
committed
update tests
1 parent a0af525 commit 4474a8f

12 files changed

+120
-107
lines changed

fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,15 @@ private static Configuration initConfig() {
129129
return conf;
130130
}
131131

132-
protected static LogScanner createLogScanner(Table table) {
132+
protected static LogScanner<InternalRow> createLogScanner(Table table) {
133133
return table.newScan().createLogScanner();
134134
}
135135

136-
protected static LogScanner createLogScanner(Table table, int[] projectFields) {
136+
protected static LogScanner<InternalRow> createLogScanner(Table table, int[] projectFields) {
137137
return table.newScan().project(projectFields).createLogScanner();
138138
}
139139

140-
protected static void subscribeFromBeginning(LogScanner logScanner, Table table) {
140+
protected static void subscribeFromBeginning(LogScanner<?> logScanner, Table table) {
141141
int bucketCount = table.getTableInfo().getNumBuckets();
142142
for (int i = 0; i < bucketCount; i++) {
143143
logScanner.subscribeFromBeginning(i);
@@ -216,10 +216,10 @@ public static void verifyPartitionLogs(
216216
logScanner.subscribeFromBeginning(partitionId, 0);
217217
}
218218
while (scanRecordCount < totalRecords) {
219-
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
219+
ScanRecords<InternalRow> scanRecords = logScanner.poll(Duration.ofSeconds(1));
220220
for (TableBucket scanBucket : scanRecords.buckets()) {
221-
List<ScanRecord> records = scanRecords.records(scanBucket);
222-
for (ScanRecord scanRecord : records) {
221+
List<ScanRecord<InternalRow>> records = scanRecords.records(scanBucket);
222+
for (ScanRecord<InternalRow> scanRecord : records) {
223223
actualRows
224224
.computeIfAbsent(
225225
scanBucket.getPartitionId(), k -> new ArrayList<>())
@@ -268,13 +268,14 @@ protected static void verifyPutAndLookup(Table table, Object[] fields) throws Ex
268268
upsertWriter.upsert(row);
269269
upsertWriter.flush();
270270
// lookup this key.
271-
Lookuper lookuper = table.newLookup().createLookuper();
271+
Lookuper<InternalRow> lookuper = table.newLookup().createLookuper();
272272
ProjectedRow keyRow = ProjectedRow.from(schema.getPrimaryKeyIndexes());
273273
keyRow.replaceRow(row);
274274
assertThatRow(lookupRow(lookuper, keyRow)).withSchema(schema.getRowType()).isEqualTo(row);
275275
}
276276

277-
protected static InternalRow lookupRow(Lookuper lookuper, InternalRow keyRow) throws Exception {
277+
protected static InternalRow lookupRow(Lookuper<InternalRow> lookuper, InternalRow keyRow)
278+
throws Exception {
278279
// lookup this key.
279280
return lookuper.lookup(keyRow).get().getSingletonRow();
280281
}

fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ void testPartitionedPrimaryKeyTable() throws Exception {
9393
}
9494
upsertWriter.flush();
9595

96-
Lookuper lookuper = table.newLookup().createLookuper();
96+
Lookuper<InternalRow> lookuper = table.newLookup().createLookuper();
9797
// now, let's lookup the written data by look up
9898
for (String partition : partitionIdByNames.keySet()) {
9999
for (int i = 0; i < recordsPerPartition; i++) {
@@ -285,14 +285,14 @@ private Map<Long, List<InternalRow>> writeRows(
285285
}
286286

287287
private Map<Long, List<InternalRow>> pollRecords(
288-
LogScanner logScanner, int expectRecordsCount) {
288+
LogScanner<InternalRow> logScanner, int expectRecordsCount) {
289289
int scanRecordCount = 0;
290290
Map<Long, List<InternalRow>> actualRows = new HashMap<>();
291291
while (scanRecordCount < expectRecordsCount) {
292-
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
292+
ScanRecords<InternalRow> scanRecords = logScanner.poll(Duration.ofSeconds(1));
293293
for (TableBucket scanBucket : scanRecords.buckets()) {
294-
List<ScanRecord> records = scanRecords.records(scanBucket);
295-
for (ScanRecord scanRecord : records) {
294+
List<ScanRecord<InternalRow>> records = scanRecords.records(scanBucket);
295+
for (ScanRecord<InternalRow> scanRecord : records) {
296296
actualRows
297297
.computeIfAbsent(scanBucket.getPartitionId(), k -> new ArrayList<>())
298298
.add(scanRecord.getRow());
@@ -308,7 +308,7 @@ void testOperateNotExistPartitionShouldThrowException() throws Exception {
308308
createPartitionedTable(DATA1_TABLE_PATH_PK, true);
309309
Table table = conn.getTable(DATA1_TABLE_PATH_PK);
310310
String partitionName = "notExistPartition";
311-
Lookuper lookuper = table.newLookup().createLookuper();
311+
Lookuper<InternalRow> lookuper = table.newLookup().createLookuper();
312312

313313
// lookup a not exist partition will return null.
314314
assertThat(lookuper.lookup(row(1, partitionName)).get().getSingletonRow()).isEqualTo(null);
@@ -324,7 +324,7 @@ void testOperateNotExistPartitionShouldThrowException() throws Exception {
324324
PhysicalTablePath.of(DATA1_TABLE_PATH_PK, partitionName));
325325

326326
// test scan a not exist partition's log
327-
LogScanner logScanner = table.newScan().createLogScanner();
327+
LogScanner<InternalRow> logScanner = table.newScan().createLogScanner();
328328
assertThatThrownBy(() -> logScanner.subscribe(100L, 0, 0))
329329
.isInstanceOf(PartitionNotExistException.class)
330330
.hasMessageContaining("The partition id '100' does not exist");

fluss-client/src/test/java/org/apache/fluss/client/table/FlussFailServerTableITCase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,13 @@ void testLogScan() throws Exception {
109109
// append one row.
110110
GenericRow row = row(1, "a");
111111
try (Table table = conn.getTable(DATA1_TABLE_PATH);
112-
LogScanner logScanner = createLogScanner(table)) {
112+
LogScanner<InternalRow> logScanner = createLogScanner(table)) {
113113
subscribeFromBeginning(logScanner, table);
114114
AppendWriter appendWriter = table.newAppend().createWriter();
115115
appendWriter.append(row).get();
116116

117117
// poll data util we get one record
118-
ScanRecords scanRecords;
118+
ScanRecords<InternalRow> scanRecords;
119119
do {
120120
scanRecords = logScanner.poll(Duration.ofSeconds(1));
121121
} while (scanRecords.isEmpty());
@@ -147,9 +147,9 @@ void testLogScan() throws Exception {
147147
}
148148
}
149149

150-
private List<InternalRow> toRows(ScanRecords scanRecords) {
150+
private List<InternalRow> toRows(ScanRecords<InternalRow> scanRecords) {
151151
List<InternalRow> rows = new ArrayList<>();
152-
for (ScanRecord scanRecord : scanRecords) {
152+
for (ScanRecord<InternalRow> scanRecord : scanRecords) {
153153
rows.add(scanRecord.getRow());
154154
}
155155
return rows;

fluss-client/src/test/java/org/apache/fluss/client/table/FlussLakeTableITCase.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ void testDeleteOnPrimaryKeyTable() throws Exception {
138138
tableWriter.delete(row(2, null));
139139
tableWriter.flush();
140140

141-
Lookuper lookuper = table.newLookup().createLookuper();
141+
Lookuper<InternalRow> lookuper = table.newLookup().createLookuper();
142142
List<InternalRow> row1 = lookuper.lookup(row(1)).get().getRowList();
143143
assertThatRows(row1)
144144
.withSchema(TestData.DATA1_SCHEMA_PK.getRowType())
@@ -208,7 +208,8 @@ void testPrimaryKeyTable(boolean isPartitioned, boolean isDefaultBucketKey) thro
208208
}
209209
// lookup
210210
try (Table table = conn.getTable(tablePath)) {
211-
Lookuper lookuper = table.newLookup().lookupBy(lookUpColumns).createLookuper();
211+
Lookuper<InternalRow> lookuper =
212+
table.newLookup().lookupBy(lookUpColumns).createLookuper();
212213
for (InternalRow row : allRows) {
213214
GenericRow lookupKeyRow = new GenericRow(lookUpFieldGetter.size());
214215
for (int i = 0; i < lookUpFieldGetter.size(); i++) {
@@ -317,7 +318,7 @@ private Map<TableBucket, List<InternalRow>> writeRowsAndVerifyBucket(
317318
int scanCount = 0;
318319
Map<TableBucket, List<InternalRow>> actualRows = new HashMap<>();
319320
try (Table table = conn.getTable(tablePath);
320-
LogScanner logScanner = table.newScan().createLogScanner()) {
321+
LogScanner<InternalRow> logScanner = table.newScan().createLogScanner()) {
321322
for (int bucket = 0; bucket < DEFAULT_BUCKET_COUNT; bucket++) {
322323
if (partitionIdByNames != null) {
323324
for (long partitionId : partitionIdByNames.values()) {
@@ -328,13 +329,13 @@ private Map<TableBucket, List<InternalRow>> writeRowsAndVerifyBucket(
328329
}
329330
}
330331
while (scanCount < totalRows) {
331-
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
332+
ScanRecords<InternalRow> scanRecords = logScanner.poll(Duration.ofSeconds(1));
332333
for (TableBucket tableBucket : scanRecords.buckets()) {
333334
actualRows
334335
.computeIfAbsent(tableBucket, (k) -> new ArrayList<>())
335336
.addAll(
336337
scanRecords.records(tableBucket).stream()
337-
.map(ScanRecord::getRow)
338+
.map(ScanRecord<InternalRow>::getRow)
338339
.collect(Collectors.toList()));
339340
}
340341
scanCount += scanRecords.count();

0 commit comments

Comments
 (0)