Skip to content

Commit 0b9bdd3

Browse files
committed
add support for pk table commit
1 parent 1bc6609 commit 0b9bdd3

File tree

1 file changed

+45
-14
lines changed

1 file changed

+45
-14
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.iceberg.AppendFiles;
2929
import org.apache.iceberg.CatalogUtil;
3030
import org.apache.iceberg.DataFile;
31+
import org.apache.iceberg.DeleteFile;
32+
import org.apache.iceberg.RowDelta;
3133
import org.apache.iceberg.Snapshot;
3234
import org.apache.iceberg.Table;
3335
import org.apache.iceberg.catalog.Catalog;
@@ -40,6 +42,7 @@
4042
import javax.annotation.Nullable;
4143

4244
import java.io.IOException;
45+
import java.util.Arrays;
4346
import java.util.List;
4447
import java.util.Map;
4548
import java.util.stream.Collectors;
@@ -77,6 +80,10 @@ public IcebergCommittable toCommittable(List<IcebergWriteResult> icebergWriteRes
7780
for (DataFile dataFile : writeResult.dataFiles()) {
7881
builder.addDataFile(dataFile);
7982
}
83+
// Add delete files
84+
for (DeleteFile deleteFile : writeResult.deleteFiles()) {
85+
builder.addDeleteFile(deleteFile);
86+
}
8087
}
8188

8289
return builder.build();
@@ -88,22 +95,46 @@ public long commit(IcebergCommittable committable, Map<String, String> snapshotP
8895
try {
8996
// Refresh table to get latest metadata
9097
icebergTable.refresh();
91-
// Simple append-only case: only data files, no delete files or compaction
92-
AppendFiles appendFiles = icebergTable.newAppend();
93-
for (DataFile dataFile : committable.getDataFiles()) {
94-
appendFiles.appendFile(dataFile);
95-
}
96-
if (!committable.getDeleteFiles().isEmpty()) {
97-
throw new IllegalStateException(
98-
"Delete files are not supported in append-only mode. "
99-
+ "Found "
100-
+ committable.getDeleteFiles().size()
101-
+ " delete files.");
102-
}
10398

104-
addFlussProperties(appendFiles, snapshotProperties);
99+
if (committable.getDeleteFiles().isEmpty()) {
100+
// Simple append-only case: only data files, no delete files or compaction
101+
AppendFiles appendFiles = icebergTable.newAppend();
102+
for (DataFile dataFile : committable.getDataFiles()) {
103+
appendFiles.appendFile(dataFile);
104+
}
105+
if (!committable.getDeleteFiles().isEmpty()) {
106+
throw new IllegalStateException(
107+
"Delete files are not supported in append-only mode. "
108+
+ "Found "
109+
+ committable.getDeleteFiles().size()
110+
+ " delete files.");
111+
}
112+
113+
addFlussProperties(appendFiles, snapshotProperties);
105114

106-
appendFiles.commit();
115+
appendFiles.commit();
116+
} else {
117+
// Row delta validations are not needed for streaming changes that write equality
118+
// deletes.
119+
// Equality deletes are applied to data in all previous sequence numbers, so retries
120+
// may
121+
// push deletes further in the future, but do not affect correctness. Position
122+
// deletes
123+
// committed to the table in this path are used only to delete rows from data files
124+
// that are
125+
// being added in this commit. There is no way for data files added along with the
126+
// delete
127+
// files to be concurrently removed, so there is no need to validate the files
128+
// referenced by
129+
// the position delete files that are being committed.
130+
RowDelta rowDelta = icebergTable.newRowDelta();
131+
Arrays.stream(committable.getDataFiles().toArray(DataFile[]::new))
132+
.forEach(rowDelta::addRows);
133+
Arrays.stream(committable.getDeleteFiles().toArray(DeleteFile[]::new))
134+
.forEach(rowDelta::addDeletes);
135+
snapshotProperties.forEach(rowDelta::set);
136+
rowDelta.commit();
137+
}
107138

108139
Long commitSnapshotId = currentCommitSnapshotId.get();
109140
currentCommitSnapshotId.remove();

0 commit comments

Comments
 (0)