Skip to content

Commit 08f8258

Browse files
committed
nit
1 parent e4c7b3d commit 08f8258

File tree

2 files changed

+17
-23
lines changed

2 files changed

+17
-23
lines changed

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,41 +56,34 @@ protected static void beforeAll() {
5656

5757
@Test
5858
void testCompaction() throws Exception {
59-
// create a pk table, write some records and wait until snapshot finished
60-
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable");
61-
long t1Id = createPkTable(t1);
62-
TableBucket t1Bucket = new TableBucket(t1Id, 0);
63-
// write records
64-
writeFullTypeRow(t1, 1, 3);
65-
waitUntilSnapshot(t1Id, 1, 0);
66-
67-
// then start tiering job
6859
JobClient jobClient = buildTieringJob(execEnv);
6960
try {
70-
// check the status of replica after synced
71-
assertReplicaStatus(t1Bucket, 3);
61+
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable");
62+
long t1Id = createPkTable(t1);
63+
TableBucket t1Bucket = new TableBucket(t1Id, 0);
7264

73-
writeFullTypeRow(t1, 4, 6);
74-
waitUntilSnapshot(t1Id, 1, 1);
75-
assertReplicaStatus(t1Bucket, 6);
65+
writeFullTypeRows(t1, t1Bucket, 1, 3, 3);
7666

77-
writeFullTypeRow(t1, 7, 9);
78-
waitUntilSnapshot(t1Id, 1, 2);
79-
assertReplicaStatus(t1Bucket, 9);
67+
writeFullTypeRows(t1, t1Bucket, 4, 6, 6);
8068

69+
writeFullTypeRows(t1, t1Bucket, 7, 9, 9);
8170
checkFileCountInIcebergTable(t1, 3);
8271

83-
writeFullTypeRow(t1, 10, 12);
84-
waitUntilSnapshot(t1Id, 1, 3);
85-
assertReplicaStatus(t1Bucket, 12);
86-
72+
// trigger compaction
73+
writeFullTypeRows(t1, t1Bucket, 10, 12, 12);
8774
checkFileCountInIcebergTable(t1, 2);
8875
} finally {
8976
jobClient.cancel().get();
9077
}
9178
}
9279

93-
private void writeFullTypeRow(TablePath tablePath, int from, int to) throws Exception {
80+
private void writeFullTypeRows(
81+
TablePath tablePath,
82+
TableBucket tableBucket,
83+
int from,
84+
int to,
85+
long expectedLogEndOffset)
86+
throws Exception {
9487
List<InternalRow> rows = new ArrayList<>();
9588
for (int i = from; i <= to; i++) {
9689
rows.add(
@@ -116,5 +109,6 @@ private void writeFullTypeRow(TablePath tablePath, int from, int to) throws Exce
116109
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
117110
}
118111
writeRows(tablePath, rows, false);
112+
assertReplicaStatus(tableBucket, expectedLogEndOffset);
119113
}
120114
}

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ protected void checkDataInIcebergAppendOnlyTable(
387387
}
388388
}
389389

390-
protected void checkFileInIcebergTable(TablePath tablePath, int expectedFileCount)
390+
protected void checkFileCountInIcebergTable(TablePath tablePath, int expectedFileCount)
391391
throws IOException {
392392
org.apache.iceberg.Table table = icebergCatalog.loadTable(toIceberg(tablePath));
393393
int count = 0;

0 commit comments

Comments
 (0)