Skip to content

Commit 288f050

Browse files
committed
nit
1 parent 2f1d0f1 commit 288f050

File tree

5 files changed

+9
-48
lines changed

5 files changed

+9
-48
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class IcebergRewriteDataFiles {
6161

6262
private static final Logger LOG = LoggerFactory.getLogger(IcebergRewriteDataFiles.class);
6363

64+
// TODO: get this value from table properties
6465
private static final int MIN_FILES_TO_COMPACT = 3;
6566

6667
private final Table table;

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,9 @@ public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
5656

5757
protected static final Logger LOG = LoggerFactory.getLogger(IcebergLakeWriter.class);
5858

59-
private static final String AUTO_MAINTENANCE_KEY = "table.datalake.auto-maintenance";
60-
6159
private final Catalog icebergCatalog;
6260
private final Table icebergTable;
6361
private final RecordWriter recordWriter;
64-
private final boolean autoMaintenanceEnabled;
6562

6663
@Nullable private final ExecutorService compactionExecutor;
6764
@Nullable private CompletableFuture<RewriteDataFileResult> compactionFuture;
@@ -72,15 +69,10 @@ public IcebergLakeWriter(
7269
this.icebergCatalog = icebergCatalogProvider.get();
7370
this.icebergTable = getTable(writerInitContext.tablePath());
7471

75-
// Check auto-maintenance from table properties
76-
this.autoMaintenanceEnabled =
77-
Boolean.parseBoolean(
78-
icebergTable.properties().getOrDefault(AUTO_MAINTENANCE_KEY, "false"));
79-
8072
// Create a record writer
8173
this.recordWriter = createRecordWriter(writerInitContext);
8274

83-
if (autoMaintenanceEnabled) {
75+
if (writerInitContext.tableInfo().getTableConfig().isDataLakeAutoCompaction()) {
8476
this.compactionExecutor =
8577
Executors.newSingleThreadExecutor(
8678
new ExecutorThreadFactory(

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,60 +18,26 @@
1818

1919
package org.apache.fluss.lake.iceberg.maintenance;
2020

21-
import org.apache.flink.core.execution.JobClient;
22-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
23-
import org.apache.fluss.config.Configuration;
2421
import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
25-
import org.apache.fluss.lake.iceberg.tiering.IcebergCatalogProvider;
26-
import org.apache.fluss.lake.iceberg.tiering.writer.TaskWriterFactory;
2722
import org.apache.fluss.metadata.TableBucket;
2823
import org.apache.fluss.metadata.TablePath;
29-
3024
import org.apache.fluss.row.BinaryString;
3125
import org.apache.fluss.row.Decimal;
3226
import org.apache.fluss.row.InternalRow;
3327
import org.apache.fluss.row.TimestampLtz;
3428
import org.apache.fluss.row.TimestampNtz;
3529
import org.apache.fluss.types.DataTypes;
3630
import org.apache.fluss.utils.TypeUtils;
37-
import org.apache.iceberg.AppendFiles;
38-
import org.apache.iceberg.DataFile;
39-
import org.apache.iceberg.FileScanTask;
40-
import org.apache.iceberg.PartitionSpec;
41-
import org.apache.iceberg.RewriteFiles;
42-
import org.apache.iceberg.Schema;
43-
import org.apache.iceberg.Table;
44-
import org.apache.iceberg.catalog.Catalog;
45-
import org.apache.iceberg.catalog.Namespace;
46-
import org.apache.iceberg.catalog.SupportsNamespaces;
47-
import org.apache.iceberg.catalog.TableIdentifier;
48-
import org.apache.iceberg.data.IcebergGenerics;
49-
import org.apache.iceberg.data.Record;
50-
import org.apache.iceberg.expressions.Expressions;
51-
import org.apache.iceberg.io.CloseableIterable;
52-
import org.apache.iceberg.io.TaskWriter;
53-
import org.apache.iceberg.types.Types;
31+
32+
import org.apache.flink.core.execution.JobClient;
33+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
5434
import org.junit.jupiter.api.BeforeAll;
55-
import org.junit.jupiter.api.BeforeEach;
5635
import org.junit.jupiter.api.Test;
57-
import org.junit.jupiter.api.io.TempDir;
5836

59-
import java.io.File;
60-
import java.io.IOException;
61-
import java.util.ArrayList;
6237
import java.util.Arrays;
63-
import java.util.HashMap;
6438
import java.util.List;
65-
import java.util.Map;
6639

67-
import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
68-
import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
69-
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
70-
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
71-
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
7240
import static org.apache.fluss.testutils.DataTestUtils.row;
73-
import static org.apache.fluss.utils.Preconditions.checkState;
74-
import static org.assertj.core.api.Assertions.assertThat;
7541

7642
/** Integration test to for Iceberg compaction. */
7743
class IcebergRewriteITCase extends FlinkIcebergTieringTestBase {

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ protected long createPkTable(TablePath tablePath, int bucketNum) throws Exceptio
250250
.distributedBy(bucketNum)
251251
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
252252
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500))
253+
.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(), "true")
253254
.build();
254255
return createTable(tablePath, table1Descriptor);
255256
}
@@ -386,7 +387,8 @@ protected void checkDataInIcebergAppendOnlyTable(
386387
}
387388
}
388389

389-
protected void checkFileInIcebergTable(TablePath tablePath, int expectedFileCount) throws IOException {
390+
protected void checkFileInIcebergTable(TablePath tablePath, int expectedFileCount)
391+
throws IOException {
390392
org.apache.iceberg.Table table = icebergCatalog.loadTable(toIceberg(tablePath));
391393
int count = 0;
392394
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {

fluss-lake/fluss-lake-iceberg/src/test/resources/log4j2-test.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
#
1818
# Set root logger level to OFF to not flood build logs
1919
# set manually to INFO for debugging purposes
20-
rootLogger.level=OFF
20+
rootLogger.level=INFO
2121
rootLogger.appenderRef.test.ref=TestLogger
2222
appender.testlogger.name=TestLogger
2323
appender.testlogger.type=CONSOLE

0 commit comments

Comments
 (0)