Skip to content

Commit a7bf007

Browse files
authored
[lake/iceberg] implement Iceberg log table compaction IT (#1651)
--------- Co-authored-by: maxcwang <[email protected]>
1 parent 58292a0 commit a7bf007

File tree

5 files changed

+144
-27
lines changed

5 files changed

+144
-27
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class IcebergRewriteDataFiles {
6161

6262
private static final Logger LOG = LoggerFactory.getLogger(IcebergRewriteDataFiles.class);
6363

64+
// TODO: make compaction strategy configurable
6465
private static final int MIN_FILES_TO_COMPACT = 3;
6566

6667
private final Table table;

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,9 @@ public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
5656

5757
protected static final Logger LOG = LoggerFactory.getLogger(IcebergLakeWriter.class);
5858

59-
private static final String AUTO_MAINTENANCE_KEY = "table.datalake.auto-maintenance";
60-
6159
private final Catalog icebergCatalog;
6260
private final Table icebergTable;
6361
private final RecordWriter recordWriter;
64-
private final boolean autoMaintenanceEnabled;
6562

6663
@Nullable private final ExecutorService compactionExecutor;
6764
@Nullable private CompletableFuture<RewriteDataFileResult> compactionFuture;
@@ -72,15 +69,10 @@ public IcebergLakeWriter(
7269
this.icebergCatalog = icebergCatalogProvider.get();
7370
this.icebergTable = getTable(writerInitContext.tablePath());
7471

75-
// Check auto-maintenance from table properties
76-
this.autoMaintenanceEnabled =
77-
Boolean.parseBoolean(
78-
icebergTable.properties().getOrDefault(AUTO_MAINTENANCE_KEY, "false"));
79-
8072
// Create a record writer
8173
this.recordWriter = createRecordWriter(writerInitContext);
8274

83-
if (autoMaintenanceEnabled) {
75+
if (writerInitContext.tableInfo().getTableConfig().isDataLakeAutoCompaction()) {
8476
this.compactionExecutor =
8577
Executors.newSingleThreadExecutor(
8678
new ExecutorThreadFactory(
@@ -137,9 +129,11 @@ public void close() throws IOException {
137129
compactionFuture.cancel(true);
138130
}
139131

140-
if (compactionExecutor != null
141-
&& !compactionExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
142-
LOG.warn("Fail to close compactionExecutor.");
132+
if (compactionExecutor != null) {
133+
compactionExecutor.shutdown();
134+
if (!compactionExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
135+
LOG.warn("Fail to close compactionExecutor.");
136+
}
143137
}
144138

145139
if (recordWriter != null) {

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.List;
3434

3535
import static org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
36+
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
3637

3738
/** Utility class for static conversions between Fluss and Iceberg types. */
3839
public class IcebergConversions {
@@ -70,13 +71,15 @@ public static Expression toFilterExpression(
7071
Expressions.and(
7172
expression,
7273
Expressions.equal(
73-
partitionFields.get(partitionIndex++).name(), partition));
74+
table.schema()
75+
.findColumnName(
76+
partitionFields
77+
.get(partitionIndex++)
78+
.sourceId()),
79+
partition));
7480
}
7581
}
76-
expression =
77-
Expressions.and(
78-
expression,
79-
Expressions.equal(partitionFields.get(partitionIndex).name(), bucket));
82+
expression = Expressions.and(expression, Expressions.equal(BUCKET_COLUMN_NAME, bucket));
8083
return expression;
8184
}
8285
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.lake.iceberg.maintenance;
20+
21+
import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
22+
import org.apache.fluss.metadata.TableBucket;
23+
import org.apache.fluss.metadata.TablePath;
24+
import org.apache.fluss.row.InternalRow;
25+
26+
import org.apache.flink.core.execution.JobClient;
27+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
28+
import org.junit.jupiter.api.BeforeAll;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.util.ArrayList;
32+
import java.util.Arrays;
33+
import java.util.List;
34+
35+
import static org.apache.fluss.testutils.DataTestUtils.row;
36+
37+
/** Integration test for Iceberg compaction. */
38+
class IcebergRewriteITCase extends FlinkIcebergTieringTestBase {
39+
protected static final String DEFAULT_DB = "fluss";
40+
41+
private static StreamExecutionEnvironment execEnv;
42+
43+
@BeforeAll
44+
protected static void beforeAll() {
45+
FlinkIcebergTieringTestBase.beforeAll();
46+
execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
47+
execEnv.setParallelism(2);
48+
execEnv.enableCheckpointing(1000);
49+
}
50+
51+
@Test
52+
void testLogTableCompaction() throws Exception {
53+
JobClient jobClient = buildTieringJob(execEnv);
54+
try {
55+
TablePath t1 = TablePath.of(DEFAULT_DB, "log_table");
56+
long t1Id = createLogTable(t1, true);
57+
TableBucket t1Bucket = new TableBucket(t1Id, 0);
58+
59+
int i = 0;
60+
List<InternalRow> flussRows = new ArrayList<>();
61+
flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
62+
63+
flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
64+
65+
flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
66+
checkFileCountInIcebergTable(t1, 3);
67+
68+
// Write should trigger compaction now since the current data file count is greater or
69+
// equal MIN_FILES_TO_COMPACT
70+
flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
71+
// Should only have two files now, one file it for newly written, one file is for target
72+
// compacted file
73+
checkFileCountInIcebergTable(t1, 2);
74+
75+
// check data in iceberg to make sure compaction won't lose data or duplicate data
76+
checkDataInIcebergAppendOnlyTable(t1, flussRows, 0);
77+
} finally {
78+
jobClient.cancel().get();
79+
}
80+
}
81+
82+
private List<InternalRow> writeLogTableRecords(
83+
TablePath tablePath, TableBucket tableBucket, long expectedLogEndOffset)
84+
throws Exception {
85+
List<InternalRow> rows = Arrays.asList(row(1, "v1"));
86+
writeRows(tablePath, rows, true);
87+
assertReplicaStatus(tableBucket, expectedLogEndOffset);
88+
return rows;
89+
}
90+
}

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,15 @@
4545
import org.apache.flink.core.execution.JobClient;
4646
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4747
import org.apache.iceberg.DataFile;
48+
import org.apache.iceberg.FileScanTask;
4849
import org.apache.iceberg.Snapshot;
4950
import org.apache.iceberg.TableScan;
5051
import org.apache.iceberg.catalog.Catalog;
5152
import org.apache.iceberg.data.IcebergGenerics;
5253
import org.apache.iceberg.data.Record;
5354
import org.apache.iceberg.data.parquet.GenericParquetReaders;
5455
import org.apache.iceberg.hadoop.HadoopCatalog;
56+
import org.apache.iceberg.io.CloseableIterable;
5557
import org.apache.iceberg.io.CloseableIterator;
5658
import org.apache.iceberg.parquet.Parquet;
5759
import org.junit.jupiter.api.AfterAll;
@@ -60,6 +62,7 @@
6062
import org.junit.jupiter.api.extension.RegisterExtension;
6163

6264
import java.io.Closeable;
65+
import java.io.IOException;
6366
import java.nio.ByteBuffer;
6467
import java.nio.ByteOrder;
6568
import java.nio.file.Files;
@@ -185,18 +188,25 @@ protected static Catalog getIcebergCatalog() {
185188
}
186189

187190
protected long createPkTable(TablePath tablePath) throws Exception {
188-
return createPkTable(tablePath, 1);
191+
return createPkTable(tablePath, false);
192+
}
193+
194+
protected long createPkTable(TablePath tablePath, boolean enableAutoCompaction)
195+
throws Exception {
196+
return createPkTable(tablePath, 1, enableAutoCompaction);
189197
}
190198

191199
protected long createLogTable(TablePath tablePath) throws Exception {
192-
return createLogTable(tablePath, 1);
200+
return createLogTable(tablePath, false);
193201
}
194202

195-
protected long createLogTable(TablePath tablePath, int bucketNum) throws Exception {
196-
return createLogTable(tablePath, bucketNum, false);
203+
protected long createLogTable(TablePath tablePath, boolean enableAutoCompaction)
204+
throws Exception {
205+
return createLogTable(tablePath, 1, false, enableAutoCompaction);
197206
}
198207

199-
protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPartitioned)
208+
protected long createLogTable(
209+
TablePath tablePath, int bucketNum, boolean isPartitioned, boolean enableAutoCompaction)
200210
throws Exception {
201211
Schema.Builder schemaBuilder =
202212
Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING());
@@ -214,12 +224,16 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart
214224
tableBuilder.property(
215225
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR);
216226
}
227+
if (enableAutoCompaction) {
228+
tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true");
229+
}
217230
tableBuilder.schema(schemaBuilder.build());
218231
return createTable(tablePath, tableBuilder.build());
219232
}
220233

221-
protected long createPkTable(TablePath tablePath, int bucketNum) throws Exception {
222-
TableDescriptor table1Descriptor =
234+
protected long createPkTable(TablePath tablePath, int bucketNum, boolean enableAutoCompaction)
235+
throws Exception {
236+
TableDescriptor.Builder pkTableBuilder =
223237
TableDescriptor.builder()
224238
.schema(
225239
Schema.newBuilder()
@@ -246,9 +260,12 @@ protected long createPkTable(TablePath tablePath, int bucketNum) throws Exceptio
246260
.build())
247261
.distributedBy(bucketNum)
248262
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
249-
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500))
250-
.build();
251-
return createTable(tablePath, table1Descriptor);
263+
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
264+
265+
if (enableAutoCompaction) {
266+
pkTableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true");
267+
}
268+
return createTable(tablePath, pkTableBuilder.build());
252269
}
253270

254271
protected long createTable(TablePath tablePath, TableDescriptor tableDescriptor)
@@ -383,6 +400,18 @@ protected void checkDataInIcebergAppendOnlyTable(
383400
}
384401
}
385402

403+
protected void checkFileCountInIcebergTable(TablePath tablePath, int expectedFileCount)
404+
throws IOException {
405+
org.apache.iceberg.Table table = icebergCatalog.loadTable(toIceberg(tablePath));
406+
int count = 0;
407+
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
408+
for (FileScanTask ignored : tasks) {
409+
count++;
410+
}
411+
}
412+
assertThat(count).isEqualTo(expectedFileCount);
413+
}
414+
386415
protected void checkDataInIcebergAppendOnlyPartitionedTable(
387416
TablePath tablePath,
388417
Map<String, String> partitionSpec,

0 commit comments

Comments
 (0)