Skip to content

Commit 8a93663

Browse files
authored
[lake/iceberg] implement lake writer for iceberg pk table (apache#1555)
1 parent c97813c commit 8a93663

File tree

6 files changed

+443
-94
lines changed

6 files changed

+443
-94
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@
2828
import org.apache.iceberg.AppendFiles;
2929
import org.apache.iceberg.CatalogUtil;
3030
import org.apache.iceberg.DataFile;
31+
import org.apache.iceberg.DeleteFile;
32+
import org.apache.iceberg.RowDelta;
3133
import org.apache.iceberg.Snapshot;
34+
import org.apache.iceberg.SnapshotUpdate;
3235
import org.apache.iceberg.Table;
3336
import org.apache.iceberg.catalog.Catalog;
3437
import org.apache.iceberg.catalog.TableIdentifier;
@@ -40,6 +43,7 @@
4043
import javax.annotation.Nullable;
4144

4245
import java.io.IOException;
46+
import java.util.Arrays;
4347
import java.util.List;
4448
import java.util.Map;
4549
import java.util.stream.Collectors;
@@ -77,6 +81,10 @@ public IcebergCommittable toCommittable(List<IcebergWriteResult> icebergWriteRes
7781
for (DataFile dataFile : writeResult.dataFiles()) {
7882
builder.addDataFile(dataFile);
7983
}
84+
// Add delete files
85+
for (DeleteFile deleteFile : writeResult.deleteFiles()) {
86+
builder.addDeleteFile(deleteFile);
87+
}
8088
}
8189

8290
return builder.build();
@@ -88,22 +96,36 @@ public long commit(IcebergCommittable committable, Map<String, String> snapshotP
8896
try {
8997
// Refresh table to get latest metadata
9098
icebergTable.refresh();
91-
// Simple append-only case: only data files, no delete files or compaction
92-
AppendFiles appendFiles = icebergTable.newAppend();
93-
for (DataFile dataFile : committable.getDataFiles()) {
94-
appendFiles.appendFile(dataFile);
95-
}
96-
if (!committable.getDeleteFiles().isEmpty()) {
97-
throw new IllegalStateException(
98-
"Delete files are not supported in append-only mode. "
99-
+ "Found "
100-
+ committable.getDeleteFiles().size()
101-
+ " delete files.");
102-
}
10399

104-
addFlussProperties(appendFiles, snapshotProperties);
100+
if (committable.getDeleteFiles().isEmpty()) {
101+
// Simple append-only case: only data files, no delete files or compaction
102+
AppendFiles appendFiles = icebergTable.newAppend();
103+
for (DataFile dataFile : committable.getDataFiles()) {
104+
appendFiles.appendFile(dataFile);
105+
}
105106

106-
appendFiles.commit();
107+
addFlussProperties(appendFiles, snapshotProperties);
108+
109+
appendFiles.commit();
110+
} else {
111+
/**
112+
* Row delta validations are not needed for streaming changes that write equality
113+
* deletes. Equality deletes are applied to data in all previous sequence numbers,
114+
* so retries may push deletes further in the future, but do not affect correctness.
115+
* Position deletes committed to the table in this path are used only to delete rows
116+
* from data files that are being added in this commit. There is no way for data
117+
* files added along with the delete files to be concurrently removed, so there is
118+
* no need to validate the files referenced by the position delete files that are
119+
* being committed.
120+
*/
121+
RowDelta rowDelta = icebergTable.newRowDelta();
122+
Arrays.stream(committable.getDataFiles().stream().toArray(DataFile[]::new))
123+
.forEach(rowDelta::addRows);
124+
Arrays.stream(committable.getDeleteFiles().stream().toArray(DeleteFile[]::new))
125+
.forEach(rowDelta::addDeletes);
126+
snapshotProperties.forEach(rowDelta::set);
127+
rowDelta.commit();
128+
}
107129

108130
Long commitSnapshotId = currentCommitSnapshotId.get();
109131
currentCommitSnapshotId.remove();
@@ -116,20 +138,26 @@ public long commit(IcebergCommittable committable, Map<String, String> snapshotP
116138
}
117139

118140
private void addFlussProperties(
119-
AppendFiles appendFiles, Map<String, String> snapshotProperties) {
120-
appendFiles.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
141+
SnapshotUpdate<?> operation, Map<String, String> snapshotProperties) {
142+
operation.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
121143
for (Map.Entry<String, String> entry : snapshotProperties.entrySet()) {
122-
appendFiles.set(entry.getKey(), entry.getValue());
144+
operation.set(entry.getKey(), entry.getValue());
123145
}
124146
}
125147

126148
@Override
127149
public void abort(IcebergCommittable committable) {
128-
List<String> filesToDelete =
150+
List<String> dataFilesToDelete =
129151
committable.getDataFiles().stream()
130-
.map(dataFile -> dataFile.path().toString())
152+
.map(file -> file.path().toString())
153+
.collect(Collectors.toList());
154+
CatalogUtil.deleteFiles(icebergTable.io(), dataFilesToDelete, "data file", true);
155+
156+
List<String> deleteFilesToDelete =
157+
committable.getDeleteFiles().stream()
158+
.map(file -> file.path().toString())
131159
.collect(Collectors.toList());
132-
CatalogUtil.deleteFiles(icebergTable.io(), filesToDelete, "data file", true);
160+
CatalogUtil.deleteFiles(icebergTable.io(), deleteFilesToDelete, "delete file", true);
133161
}
134162

135163
@Nullable

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,32 @@
1717

1818
package com.alibaba.fluss.lake.iceberg.tiering;
1919

20-
import com.alibaba.fluss.lake.iceberg.tiering.append.AppendOnlyWriter;
20+
import com.alibaba.fluss.lake.iceberg.tiering.writer.AppendOnlyWriter;
21+
import com.alibaba.fluss.lake.iceberg.tiering.writer.DeltaTaskWriter;
2122
import com.alibaba.fluss.lake.writer.LakeWriter;
2223
import com.alibaba.fluss.lake.writer.WriterInitContext;
2324
import com.alibaba.fluss.metadata.TablePath;
2425
import com.alibaba.fluss.record.LogRecord;
2526

27+
import org.apache.iceberg.FileFormat;
28+
import org.apache.iceberg.PartitionSpec;
29+
import org.apache.iceberg.Schema;
2630
import org.apache.iceberg.Table;
31+
import org.apache.iceberg.TableProperties;
2732
import org.apache.iceberg.catalog.Catalog;
33+
import org.apache.iceberg.io.OutputFileFactory;
2834
import org.apache.iceberg.io.WriteResult;
35+
import org.apache.iceberg.util.PropertyUtil;
2936

3037
import java.io.Closeable;
3138
import java.io.IOException;
39+
import java.util.ArrayList;
3240
import java.util.Collections;
41+
import java.util.List;
3342

3443
import static com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
44+
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
45+
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
3546

3647
/** Implementation of {@link LakeWriter} for Iceberg. */
3748
public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
@@ -52,19 +63,51 @@ public IcebergLakeWriter(
5263
}
5364

5465
private RecordWriter createRecordWriter(WriterInitContext writerInitContext) {
55-
if (!icebergTable.spec().isUnpartitioned()) {
56-
throw new UnsupportedOperationException("Partitioned tables are not yet supported");
66+
Schema schema = icebergTable.schema();
67+
List<Integer> equalityFieldIds = new ArrayList<>(schema.identifierFieldIds());
68+
PartitionSpec spec = icebergTable.spec();
69+
70+
// Get target file size from table properties
71+
long targetFileSize = targetFileSize(icebergTable);
72+
FileFormat format = fileFormat(icebergTable);
73+
OutputFileFactory outputFileFactory =
74+
OutputFileFactory.builderFor(
75+
icebergTable,
76+
writerInitContext.tableBucket().getBucket(),
77+
// task id always 0
78+
0)
79+
.format(format)
80+
.build();
81+
82+
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
83+
if (spec.isUnpartitioned()) {
84+
return new AppendOnlyWriter(
85+
icebergTable,
86+
writerInitContext.schema().getRowType(),
87+
writerInitContext.tableBucket(),
88+
null, // No partition for non-partitioned table
89+
Collections.emptyList(), // No partition keys
90+
format,
91+
outputFileFactory,
92+
targetFileSize);
93+
} else {
94+
return null;
95+
}
96+
} else {
97+
if (spec.isUnpartitioned()) {
98+
return new DeltaTaskWriter(
99+
icebergTable,
100+
writerInitContext.schema().getRowType(),
101+
writerInitContext.tableBucket(),
102+
null, // No partition for non-partitioned table
103+
Collections.emptyList(), // No partition keys);
104+
format,
105+
outputFileFactory,
106+
targetFileSize);
107+
} else {
108+
return null;
109+
}
57110
}
58-
59-
// For now, assume append-only (no primary keys)
60-
61-
return new AppendOnlyWriter(
62-
icebergTable,
63-
writerInitContext.schema().getRowType(),
64-
writerInitContext.tableBucket(),
65-
null, // No partition for non-partitioned table
66-
Collections.emptyList() // No partition keys
67-
);
68111
}
69112

70113
@Override
@@ -107,4 +150,20 @@ private Table getTable(TablePath tablePath) throws IOException {
107150
throw new IOException("Failed to get table " + tablePath + " in Iceberg.", e);
108151
}
109152
}
153+
154+
private static FileFormat fileFormat(Table icebergTable) {
155+
String formatString =
156+
PropertyUtil.propertyAsString(
157+
icebergTable.properties(),
158+
TableProperties.DEFAULT_FILE_FORMAT,
159+
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
160+
return FileFormat.fromString(formatString);
161+
}
162+
163+
private static long targetFileSize(Table icebergTable) {
164+
return PropertyUtil.propertyAsLong(
165+
icebergTable.properties(),
166+
WRITE_TARGET_FILE_SIZE_BYTES,
167+
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
168+
}
110169
}

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java renamed to fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java

Lines changed: 10 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package com.alibaba.fluss.lake.iceberg.tiering.append;
18+
package com.alibaba.fluss.lake.iceberg.tiering.writer;
1919

2020
import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
2121
import com.alibaba.fluss.metadata.TableBucket;
@@ -24,22 +24,17 @@
2424

2525
import org.apache.iceberg.FileFormat;
2626
import org.apache.iceberg.Table;
27-
import org.apache.iceberg.TableProperties;
2827
import org.apache.iceberg.data.GenericAppenderFactory;
2928
import org.apache.iceberg.data.Record;
3029
import org.apache.iceberg.io.FileAppenderFactory;
3130
import org.apache.iceberg.io.OutputFileFactory;
3231
import org.apache.iceberg.io.TaskWriter;
3332
import org.apache.iceberg.io.UnpartitionedWriter;
34-
import org.apache.iceberg.util.PropertyUtil;
3533

3634
import javax.annotation.Nullable;
3735

3836
import java.util.List;
3937

40-
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
41-
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
42-
4338
/** A {@link RecordWriter} to write to Iceberg's append-only table. */
4439
public class AppendOnlyWriter extends RecordWriter {
4540

@@ -48,9 +43,12 @@ public AppendOnlyWriter(
4843
RowType flussRowType,
4944
TableBucket tableBucket,
5045
@Nullable String partition,
51-
List<String> partitionKeys) {
46+
List<String> partitionKeys,
47+
FileFormat format,
48+
OutputFileFactory outputFileFactory,
49+
long targetFileSize) {
5250
super(
53-
createTaskWriter(icebergTable, tableBucket),
51+
createTaskWriter(icebergTable, format, outputFileFactory, targetFileSize),
5452
icebergTable.schema(),
5553
flussRowType,
5654
tableBucket,
@@ -59,21 +57,12 @@ public AppendOnlyWriter(
5957
}
6058

6159
private static TaskWriter<Record> createTaskWriter(
62-
Table icebergTable, TableBucket tableBucket) {
63-
// Get target file size from table properties
64-
long targetFileSize = targetFileSize(icebergTable);
65-
60+
Table icebergTable,
61+
FileFormat format,
62+
OutputFileFactory outputFileFactory,
63+
long targetFileSize) {
6664
FileAppenderFactory<Record> fileAppenderFactory =
6765
new GenericAppenderFactory(icebergTable.schema());
68-
FileFormat format = fileFormat(icebergTable);
69-
OutputFileFactory outputFileFactory =
70-
OutputFileFactory.builderFor(
71-
icebergTable,
72-
tableBucket.getBucket(),
73-
// task id always 0
74-
0)
75-
.format(format)
76-
.build();
7766

7867
return new UnpartitionedWriter<>(
7968
icebergTable.spec(),
@@ -89,20 +78,4 @@ public void write(LogRecord record) throws Exception {
8978
flussRecordAsIcebergRecord.setFlussRecord(record);
9079
taskWriter.write(flussRecordAsIcebergRecord);
9180
}
92-
93-
private static FileFormat fileFormat(Table icebergTable) {
94-
String formatString =
95-
PropertyUtil.propertyAsString(
96-
icebergTable.properties(),
97-
TableProperties.DEFAULT_FILE_FORMAT,
98-
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
99-
return FileFormat.fromString(formatString);
100-
}
101-
102-
private static long targetFileSize(Table icebergTable) {
103-
return PropertyUtil.propertyAsLong(
104-
icebergTable.properties(),
105-
WRITE_TARGET_FILE_SIZE_BYTES,
106-
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
107-
}
10881
}

0 commit comments

Comments
 (0)