Skip to content

Commit 263be18

Browse files
committed
111
1 parent 0e82b84 commit 263be18

File tree

6 files changed

+152
-146
lines changed

6 files changed

+152
-146
lines changed

fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/FlussLogScannerITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ void testPollWhileCreateTableNotReady() throws Exception {
9898
TablePath tablePath =
9999
new TablePath("test_db_1", "test_poll_while_create_table_not_ready_t1");
100100
// create one table with 100 buckets.
101-
int bucketNumber = 100;
101+
int bucketNumber = 20;
102102
TableDescriptor tableDescriptor =
103103
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(bucketNumber).build();
104104
createTable(tablePath, tableDescriptor, false);

fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScannerITCase.java

Lines changed: 37 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -97,70 +97,67 @@ void testScanSnapshot() throws Exception {
9797
TablePath tablePath = TablePath.of(DEFAULT_DB, "test-table-snapshot");
9898
long tableId = createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, true);
9999

100+
Table table = conn.getTable(tablePath);
100101
// scan the snapshot
101-
Map<TableBucket, List<InternalRow>> expectedRowByBuckets = putRows(tableId, tablePath, 10);
102+
Map<TableBucket, List<InternalRow>> expectedRowByBuckets = putRows(tableId, table, 10);
102103

103104
// wait snapshot finish
104105
waitUtilAllSnapshotFinished(expectedRowByBuckets.keySet(), 0);
105106

106107
// test read snapshot
107-
testSnapshotRead(tablePath, expectedRowByBuckets);
108+
testSnapshotRead(table, tablePath, expectedRowByBuckets);
108109

109110
// test again;
110-
expectedRowByBuckets = putRows(tableId, tablePath, 20);
111+
expectedRowByBuckets = putRows(tableId, table, 20);
111112

112113
// wait snapshot finish
113114
waitUtilAllSnapshotFinished(expectedRowByBuckets.keySet(), 1);
114115

115116
// test read snapshot
116-
testSnapshotRead(tablePath, expectedRowByBuckets);
117+
testSnapshotRead(table, tablePath, expectedRowByBuckets);
117118
}
118119

119-
private Map<TableBucket, List<InternalRow>> putRows(long tableId, TablePath tablePath, int rows)
120-
throws Exception {
120+
private Map<TableBucket, List<InternalRow>> putRows(long tableId, Table table, int rows) {
121121
Map<TableBucket, List<InternalRow>> rowsByBuckets = new HashMap<>();
122-
try (Table table = conn.getTable(tablePath)) {
123-
UpsertWriter upsertWriter = table.getUpsertWriter();
124-
for (int i = 0; i < rows; i++) {
125-
InternalRow row = compactedRow(DATA1_ROW_TYPE, new Object[] {i, "v" + i});
126-
upsertWriter.upsert(row);
127-
TableBucket tableBucket = new TableBucket(tableId, getBucketId(row));
128-
rowsByBuckets.computeIfAbsent(tableBucket, k -> new ArrayList<>()).add(row);
129-
}
130-
upsertWriter.flush();
122+
UpsertWriter upsertWriter = table.getUpsertWriter();
123+
for (int i = 0; i < rows; i++) {
124+
InternalRow row = compactedRow(DATA1_ROW_TYPE, new Object[] {i, "v" + i});
125+
upsertWriter.upsert(row);
126+
TableBucket tableBucket = new TableBucket(tableId, getBucketId(row));
127+
rowsByBuckets.computeIfAbsent(tableBucket, k -> new ArrayList<>()).add(row);
131128
}
129+
upsertWriter.flush();
132130
return rowsByBuckets;
133131
}
134132

135133
private void testSnapshotRead(
136-
TablePath tablePath, Map<TableBucket, List<InternalRow>> bucketRows) throws Exception {
134+
Table table, TablePath tablePath, Map<TableBucket, List<InternalRow>> bucketRows)
135+
throws Exception {
137136
KvSnapshotInfo kvSnapshotInfo = admin.getKvSnapshot(tablePath).get();
138137
BucketsSnapshotInfo bucketsSnapshotInfo = kvSnapshotInfo.getBucketsSnapshots();
139138
long tableId = kvSnapshotInfo.getTableId();
140-
try (Table table = conn.getTable(tablePath)) {
141-
for (int bucketId : bucketsSnapshotInfo.getBucketIds()) {
142-
TableBucket tableBucket = new TableBucket(tableId, bucketId);
143-
BucketSnapshotInfo bucketSnapshotInfo =
144-
bucketsSnapshotInfo.getBucketSnapshotInfo(bucketId).get();
145-
146-
// create the snapshot scan according to the snapshot files
147-
SnapshotScan snapshotScan =
148-
new SnapshotScan(
149-
tableBucket,
150-
bucketSnapshotInfo.getSnapshotFiles(),
151-
DEFAULT_SCHEMA,
152-
null);
153-
SnapshotScanner snapshotScanner = table.getSnapshotScanner(snapshotScan);
154-
155-
// collect all the records from the scanner
156-
List<ScanRecord> scanRecords = collectRecords(snapshotScanner);
157-
158-
// get the expected rows
159-
List<InternalRow> expectedRows = bucketRows.get(tableBucket);
160-
161-
// check the records
162-
assertScanRecords(scanRecords, expectedRows);
163-
}
139+
for (int bucketId : bucketsSnapshotInfo.getBucketIds()) {
140+
TableBucket tableBucket = new TableBucket(tableId, bucketId);
141+
BucketSnapshotInfo bucketSnapshotInfo =
142+
bucketsSnapshotInfo.getBucketSnapshotInfo(bucketId).get();
143+
144+
// create the snapshot scan according to the snapshot files
145+
SnapshotScan snapshotScan =
146+
new SnapshotScan(
147+
tableBucket,
148+
bucketSnapshotInfo.getSnapshotFiles(),
149+
DEFAULT_SCHEMA,
150+
null);
151+
SnapshotScanner snapshotScanner = table.getSnapshotScanner(snapshotScan);
152+
153+
// collect all the records from the scanner
154+
List<ScanRecord> scanRecords = collectRecords(snapshotScanner);
155+
156+
// get the expected rows
157+
List<InternalRow> expectedRows = bucketRows.get(tableBucket);
158+
159+
// check the records
160+
assertScanRecords(scanRecords, expectedRows);
164161
}
165162
}
166163

0 commit comments

Comments
 (0)