Skip to content

Commit aabe78a

Browse files
authored
[iceberg] Use DataFileSet / DeleteFileSet instead of normal collections (#2155)
1 parent e02837d commit aabe78a

File tree

6 files changed

+30
-37
lines changed

6 files changed

+30
-37
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.iceberg.BaseCombinedScanTask;
2525
import org.apache.iceberg.CombinedScanTask;
2626
import org.apache.iceberg.ContentScanTask;
27-
import org.apache.iceberg.DataFile;
2827
import org.apache.iceberg.FileScanTask;
2928
import org.apache.iceberg.Snapshot;
3029
import org.apache.iceberg.Table;
@@ -35,6 +34,7 @@
3534
import org.apache.iceberg.io.TaskWriter;
3635
import org.apache.iceberg.io.WriteResult;
3736
import org.apache.iceberg.util.BinPacking;
37+
import org.apache.iceberg.util.DataFileSet;
3838
import org.slf4j.Logger;
3939
import org.slf4j.LoggerFactory;
4040

@@ -153,8 +153,8 @@ public RewriteDataFileResult execute() {
153153
return null;
154154
}
155155
LOG.info("Start to rewrite files {}.", tasksToRewrite);
156-
List<DataFile> deletedDataFiles = new ArrayList<>();
157-
List<DataFile> addedDataFiles = new ArrayList<>();
156+
DataFileSet deletedDataFiles = DataFileSet.create();
157+
DataFileSet addedDataFiles = DataFileSet.create();
158158
for (CombinedScanTask combinedScanTask : tasksToRewrite) {
159159
addedDataFiles.addAll(rewriteFileGroup(combinedScanTask));
160160
deletedDataFiles.addAll(
@@ -172,7 +172,7 @@ public RewriteDataFileResult execute() {
172172
}
173173
}
174174

175-
private List<DataFile> rewriteFileGroup(CombinedScanTask combinedScanTask) throws IOException {
175+
private DataFileSet rewriteFileGroup(CombinedScanTask combinedScanTask) throws IOException {
176176
try (CloseableIterable<Record> records = readDataFile(combinedScanTask);
177177
TaskWriter<Record> taskWriter =
178178
TaskWriterFactory.createTaskWriter(table, partition, bucket.getBucket())) {
@@ -184,7 +184,7 @@ private List<DataFile> rewriteFileGroup(CombinedScanTask combinedScanTask) throw
184184
rewriteResult.deleteFiles().length == 0,
185185
"the delete files should be empty, but got "
186186
+ Arrays.toString(rewriteResult.deleteFiles()));
187-
return Arrays.asList(rewriteResult.dataFiles());
187+
return DataFileSet.of(Arrays.asList(rewriteResult.dataFiles()));
188188
}
189189
}
190190

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,31 @@
1818

1919
package org.apache.fluss.lake.iceberg.maintenance;
2020

21-
import org.apache.iceberg.DataFile;
21+
import org.apache.iceberg.util.DataFileSet;
2222

2323
import java.io.Serializable;
24-
import java.util.List;
2524

2625
/** The result for rewrite iceberg data files. */
2726
public class RewriteDataFileResult implements Serializable {
2827

2928
private static final long serialVersionUID = 1L;
3029

3130
private final long snapshotId;
32-
private final List<DataFile> deletedDataFiles;
33-
private final List<DataFile> addedDataFiles;
31+
private final DataFileSet deletedDataFiles;
32+
private final DataFileSet addedDataFiles;
3433

3534
public RewriteDataFileResult(
36-
long snapshotId, List<DataFile> deletedDataFiles, List<DataFile> addedDataFiles) {
35+
long snapshotId, DataFileSet deletedDataFiles, DataFileSet addedDataFiles) {
3736
this.snapshotId = snapshotId;
3837
this.deletedDataFiles = deletedDataFiles;
3938
this.addedDataFiles = addedDataFiles;
4039
}
4140

42-
public List<DataFile> deletedDataFiles() {
41+
public DataFileSet deletedDataFiles() {
4342
return deletedDataFiles;
4443
}
4544

46-
public List<DataFile> addedDataFiles() {
45+
public DataFileSet addedDataFiles() {
4746
return addedDataFiles;
4847
}
4948

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import org.apache.iceberg.DataFile;
2323
import org.apache.iceberg.DeleteFile;
24+
import org.apache.iceberg.util.DataFileSet;
25+
import org.apache.iceberg.util.DeleteFileSet;
2426

2527
import java.io.Serializable;
2628
import java.util.ArrayList;
@@ -31,25 +33,25 @@ public class IcebergCommittable implements Serializable {
3133

3234
private static final long serialVersionUID = 1L;
3335

34-
private final List<DataFile> dataFiles;
35-
private final List<DeleteFile> deleteFiles;
36+
private final DataFileSet dataFiles;
37+
private final DeleteFileSet deleteFiles;
3638

3739
private final List<RewriteDataFileResult> rewriteDataFiles;
3840

3941
private IcebergCommittable(
40-
List<DataFile> dataFiles,
41-
List<DeleteFile> deleteFiles,
42+
DataFileSet dataFiles,
43+
DeleteFileSet deleteFiles,
4244
List<RewriteDataFileResult> rewriteDataFiles) {
4345
this.dataFiles = dataFiles;
4446
this.deleteFiles = deleteFiles;
4547
this.rewriteDataFiles = rewriteDataFiles;
4648
}
4749

48-
public List<DataFile> getDataFiles() {
50+
public DataFileSet getDataFiles() {
4951
return dataFiles;
5052
}
5153

52-
public List<DeleteFile> getDeleteFiles() {
54+
public DeleteFileSet getDeleteFiles() {
5355
return deleteFiles;
5456
}
5557

@@ -66,8 +68,8 @@ public static Builder builder() {
6668
* entries.
6769
*/
6870
public static class Builder {
69-
private final List<DataFile> dataFiles = new ArrayList<>();
70-
private final List<DeleteFile> deleteFiles = new ArrayList<>();
71+
private final DataFileSet dataFiles = DataFileSet.create();
72+
private final DeleteFileSet deleteFiles = DeleteFileSet.create();
7173

7274
private final List<RewriteDataFileResult> rewriteDataFileResults = new ArrayList<>();
7375

@@ -87,10 +89,7 @@ public Builder addRewriteDataFileResult(RewriteDataFileResult rewriteDataFileRes
8789
}
8890

8991
public IcebergCommittable build() {
90-
return new IcebergCommittable(
91-
new ArrayList<>(dataFiles),
92-
new ArrayList<>(deleteFiles),
93-
rewriteDataFileResults);
92+
return new IcebergCommittable(dataFiles, deleteFiles, rewriteDataFileResults);
9493
}
9594
}
9695

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import javax.annotation.Nullable;
4949

5050
import java.io.IOException;
51-
import java.util.Arrays;
5251
import java.util.List;
5352
import java.util.Map;
5453
import java.util.stream.Collectors;
@@ -115,9 +114,7 @@ public long commit(IcebergCommittable committable, Map<String, String> snapshotP
115114
if (committable.getDeleteFiles().isEmpty()) {
116115
// Simple append-only case: only data files, no delete files or compaction
117116
AppendFiles appendFiles = icebergTable.newAppend();
118-
for (DataFile dataFile : committable.getDataFiles()) {
119-
appendFiles.appendFile(dataFile);
120-
}
117+
committable.getDataFiles().forEach(appendFiles::appendFile);
121118
snapshotUpdate = appendFiles;
122119
} else {
123120
/*
@@ -131,10 +128,8 @@ public long commit(IcebergCommittable committable, Map<String, String> snapshotP
131128
being committed.
132129
*/
133130
RowDelta rowDelta = icebergTable.newRowDelta();
134-
Arrays.stream(committable.getDataFiles().stream().toArray(DataFile[]::new))
135-
.forEach(rowDelta::addRows);
136-
Arrays.stream(committable.getDeleteFiles().stream().toArray(DeleteFile[]::new))
137-
.forEach(rowDelta::addDeletes);
131+
committable.getDataFiles().forEach(rowDelta::addRows);
132+
committable.getDeleteFiles().forEach(rowDelta::addDeletes);
138133
snapshotUpdate = rowDelta;
139134
}
140135

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,13 @@
4141
import org.apache.iceberg.io.CloseableIterable;
4242
import org.apache.iceberg.io.TaskWriter;
4343
import org.apache.iceberg.types.Types;
44+
import org.apache.iceberg.util.DataFileSet;
4445
import org.junit.jupiter.api.BeforeEach;
4546
import org.junit.jupiter.api.Test;
4647
import org.junit.jupiter.api.io.TempDir;
4748

4849
import java.io.File;
4950
import java.io.IOException;
50-
import java.util.ArrayList;
51-
import java.util.List;
5251

5352
import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
5453
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
@@ -232,7 +231,7 @@ private static int countFilesForBucket(Table table, int bucket) throws IOExcepti
232231

233232
private static void appendTinyFilesWithRowsAndBucket(
234233
Table table, int files, int rowsPerFile, int baseOffset, int bucket) throws Exception {
235-
List<DataFile> toAppend = new ArrayList<>(files);
234+
DataFileSet toAppend = DataFileSet.create();
236235
for (int i = 0; i < files; i++) {
237236
toAppend.add(
238237
writeTinyDataFile(table, rowsPerFile, baseOffset + (i * rowsPerFile), bucket));

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iceberg.data.GenericRecord;
3131
import org.apache.iceberg.io.WriteResult;
3232
import org.apache.iceberg.types.Types;
33+
import org.apache.iceberg.util.DataFileSet;
3334
import org.junit.jupiter.api.BeforeEach;
3435
import org.junit.jupiter.api.Test;
3536

@@ -80,8 +81,8 @@ void testSerializeAndDeserializeWithAllFiles() throws IOException {
8081
RewriteDataFileResult rewriteDataFileResult =
8182
new RewriteDataFileResult(
8283
1L,
83-
Collections.singletonList(dataFile),
84-
Collections.singletonList(dataFile));
84+
DataFileSet.of(Collections.singletonList(dataFile)),
85+
DataFileSet.of(Collections.singletonList(dataFile)));
8586
originalResult = new IcebergWriteResult(writeResult, rewriteDataFileResult);
8687
serializedData = serializer.serialize(originalResult);
8788
deserializedResult = serializer.deserialize(serializer.getVersion(), serializedData);

0 commit comments

Comments
 (0)