Skip to content

Commit bd93e7f

Browse files
committed
nit
1 parent 172c054 commit bd93e7f

File tree

2 files changed

+19
-41
lines changed

2 files changed

+19
-41
lines changed

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java

Lines changed: 10 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,13 @@
2121
import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
2222
import org.apache.fluss.metadata.TableBucket;
2323
import org.apache.fluss.metadata.TablePath;
24-
import org.apache.fluss.row.BinaryString;
25-
import org.apache.fluss.row.Decimal;
2624
import org.apache.fluss.row.InternalRow;
27-
import org.apache.fluss.row.TimestampLtz;
28-
import org.apache.fluss.row.TimestampNtz;
29-
import org.apache.fluss.types.DataTypes;
30-
import org.apache.fluss.utils.TypeUtils;
3125

3226
import org.apache.flink.core.execution.JobClient;
3327
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3428
import org.junit.jupiter.api.BeforeAll;
3529
import org.junit.jupiter.api.Test;
3630

37-
import java.math.BigDecimal;
3831
import java.util.ArrayList;
3932
import java.util.List;
4033

@@ -55,29 +48,29 @@ protected static void beforeAll() {
5548
}
5649

5750
@Test
58-
void testCompaction() throws Exception {
51+
void testLogTableCompaction() throws Exception {
5952
JobClient jobClient = buildTieringJob(execEnv);
6053
try {
61-
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable");
62-
long t1Id = createPkTable(t1, true);
54+
TablePath t1 = TablePath.of(DEFAULT_DB, "logTable");
55+
long t1Id = createLogTable(t1, true);
6356
TableBucket t1Bucket = new TableBucket(t1Id, 0);
6457

65-
writeFullTypeRows(t1, t1Bucket, 1, 3, 3);
58+
writeTableRecords(t1, t1Bucket, 1, 3, 3);
6659

67-
writeFullTypeRows(t1, t1Bucket, 4, 6, 6);
60+
writeTableRecords(t1, t1Bucket, 4, 6, 6);
6861

69-
writeFullTypeRows(t1, t1Bucket, 7, 9, 9);
62+
writeTableRecords(t1, t1Bucket, 7, 9, 9);
7063
checkFileCountInIcebergTable(t1, 3);
7164

7265
// trigger compaction
73-
writeFullTypeRows(t1, t1Bucket, 10, 12, 12);
66+
writeTableRecords(t1, t1Bucket, 10, 12, 12);
7467
checkFileCountInIcebergTable(t1, 2);
7568
} finally {
7669
jobClient.cancel().get();
7770
}
7871
}
7972

80-
private void writeFullTypeRows(
73+
private void writeTableRecords(
8174
TablePath tablePath,
8275
TableBucket tableBucket,
8376
int from,
@@ -86,29 +79,9 @@ private void writeFullTypeRows(
8679
throws Exception {
8780
List<InternalRow> rows = new ArrayList<>();
8881
for (int i = from; i <= to; i++) {
89-
rows.add(
90-
row(
91-
true,
92-
(byte) 100,
93-
(short) 200,
94-
i,
95-
i + 400L,
96-
500.1f,
97-
600.0d,
98-
"v1",
99-
Decimal.fromUnscaledLong(900, 5, 2),
100-
Decimal.fromBigDecimal(new BigDecimal(1000), 20, 0),
101-
TimestampLtz.fromEpochMillis(1698235273400L),
102-
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
103-
TimestampNtz.fromMillis(1698235273501L),
104-
TimestampNtz.fromMillis(1698235273501L, 8000),
105-
new byte[] {5, 6, 7, 8},
106-
TypeUtils.castFromString("2023-10-25", DataTypes.DATE()),
107-
TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()),
108-
BinaryString.fromString("abc"),
109-
new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}));
82+
rows.add(row(i, "v" + i));
11083
}
111-
writeRows(tablePath, rows, false);
84+
writeRows(tablePath, rows, true);
11285
assertReplicaStatus(tableBucket, expectedLogEndOffset);
11386
}
11487
}

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,16 @@ protected long createPkTable(TablePath tablePath, boolean enableAutoCompaction)
197197
}
198198

199199
protected long createLogTable(TablePath tablePath) throws Exception {
200-
return createLogTable(tablePath, 1);
200+
return createLogTable(tablePath, false);
201201
}
202202

203-
protected long createLogTable(TablePath tablePath, int bucketNum) throws Exception {
204-
return createLogTable(tablePath, bucketNum, false);
203+
protected long createLogTable(TablePath tablePath, boolean enableAutoCompaction)
204+
throws Exception {
205+
return createLogTable(tablePath, 1, false, enableAutoCompaction);
205206
}
206207

207-
protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPartitioned)
208+
protected long createLogTable(
209+
TablePath tablePath, int bucketNum, boolean isPartitioned, boolean enableAutoCompaction)
208210
throws Exception {
209211
Schema.Builder schemaBuilder =
210212
Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING());
@@ -222,6 +224,9 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart
222224
tableBuilder.property(
223225
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR);
224226
}
227+
if (enableAutoCompaction) {
228+
tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true");
229+
}
225230
tableBuilder.schema(schemaBuilder.build());
226231
return createTable(tablePath, tableBuilder.build());
227232
}

0 commit comments

Comments
 (0)