Skip to content

Commit c79abc8

Browse files
committed
[lake/iceberg] implement lake writer for iceberg pk table
1 parent d4dfb45 commit c79abc8

File tree

4 files changed

+272
-50
lines changed

4 files changed

+272
-50
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,32 @@
1717

1818
package com.alibaba.fluss.lake.iceberg.tiering;
1919

20-
import com.alibaba.fluss.lake.iceberg.tiering.append.AppendOnlyWriter;
20+
import com.alibaba.fluss.lake.iceberg.tiering.writer.AppendOnlyWriter;
21+
import com.alibaba.fluss.lake.iceberg.tiering.writer.DeltaTaskWriter;
2122
import com.alibaba.fluss.lake.writer.LakeWriter;
2223
import com.alibaba.fluss.lake.writer.WriterInitContext;
2324
import com.alibaba.fluss.metadata.TablePath;
2425
import com.alibaba.fluss.record.LogRecord;
2526

27+
import com.google.common.collect.Lists;
28+
import org.apache.iceberg.FileFormat;
29+
import org.apache.iceberg.PartitionSpec;
30+
import org.apache.iceberg.Schema;
2631
import org.apache.iceberg.Table;
32+
import org.apache.iceberg.TableProperties;
2733
import org.apache.iceberg.catalog.Catalog;
34+
import org.apache.iceberg.io.OutputFileFactory;
2835
import org.apache.iceberg.io.WriteResult;
36+
import org.apache.iceberg.util.PropertyUtil;
2937

3038
import java.io.Closeable;
3139
import java.io.IOException;
3240
import java.util.Collections;
41+
import java.util.List;
3342

3443
import static com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
44+
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
45+
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
3546

3647
/** Implementation of {@link LakeWriter} for Iceberg. */
3748
public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
@@ -52,19 +63,48 @@ public IcebergLakeWriter(
5263
}
5364

5465
private RecordWriter createRecordWriter(WriterInitContext writerInitContext) {
55-
if (!icebergTable.spec().isUnpartitioned()) {
56-
throw new UnsupportedOperationException("Partitioned tables are not yet supported");
66+
Schema schema = icebergTable.schema();
67+
List<Integer> equalityFieldIds = Lists.newArrayList(schema.identifierFieldIds());
68+
PartitionSpec spec = icebergTable.spec();
69+
70+
// Get target file size from table properties
71+
long targetFileSize = targetFileSize(icebergTable);
72+
FileFormat format = fileFormat(icebergTable);
73+
OutputFileFactory outputFileFactory =
74+
OutputFileFactory.builderFor(
75+
icebergTable,
76+
writerInitContext.tableBucket().getBucket(),
77+
// task id always 0
78+
0)
79+
.format(format)
80+
.build();
81+
82+
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
83+
if (spec.isUnpartitioned()) {
84+
return new AppendOnlyWriter(
85+
icebergTable,
86+
writerInitContext.schema().getRowType(),
87+
writerInitContext.tableBucket(),
88+
null, // No partition for non-partitioned table
89+
Collections.emptyList(), // No partition keys
90+
format,
91+
outputFileFactory,
92+
targetFileSize);
93+
}
94+
} else {
95+
if (spec.isUnpartitioned()) {
96+
return new DeltaTaskWriter(
97+
icebergTable,
98+
writerInitContext.schema().getRowType(),
99+
writerInitContext.tableBucket(),
100+
null, // No partition for non-partitioned table
101+
Collections.emptyList(), // No partition keys);
102+
format,
103+
outputFileFactory,
104+
targetFileSize);
105+
}
57106
}
58-
59-
// For now, assume append-only (no primary keys)
60-
61-
return new AppendOnlyWriter(
62-
icebergTable,
63-
writerInitContext.schema().getRowType(),
64-
writerInitContext.tableBucket(),
65-
null, // No partition for non-partitioned table
66-
Collections.emptyList() // No partition keys
67-
);
107+
return null;
68108
}
69109

70110
@Override
@@ -107,4 +147,20 @@ private Table getTable(TablePath tablePath) throws IOException {
107147
throw new IOException("Failed to get table " + tablePath + " in Iceberg.", e);
108148
}
109149
}
150+
151+
private static FileFormat fileFormat(Table icebergTable) {
152+
String formatString =
153+
PropertyUtil.propertyAsString(
154+
icebergTable.properties(),
155+
TableProperties.DEFAULT_FILE_FORMAT,
156+
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
157+
return FileFormat.fromString(formatString);
158+
}
159+
160+
private static long targetFileSize(Table icebergTable) {
161+
return PropertyUtil.propertyAsLong(
162+
icebergTable.properties(),
163+
WRITE_TARGET_FILE_SIZE_BYTES,
164+
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
165+
}
110166
}

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java renamed to fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java

Lines changed: 10 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package com.alibaba.fluss.lake.iceberg.tiering.append;
18+
package com.alibaba.fluss.lake.iceberg.tiering.writer;
1919

2020
import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
2121
import com.alibaba.fluss.metadata.TableBucket;
@@ -24,22 +24,17 @@
2424

2525
import org.apache.iceberg.FileFormat;
2626
import org.apache.iceberg.Table;
27-
import org.apache.iceberg.TableProperties;
2827
import org.apache.iceberg.data.GenericAppenderFactory;
2928
import org.apache.iceberg.data.Record;
3029
import org.apache.iceberg.io.FileAppenderFactory;
3130
import org.apache.iceberg.io.OutputFileFactory;
3231
import org.apache.iceberg.io.TaskWriter;
3332
import org.apache.iceberg.io.UnpartitionedWriter;
34-
import org.apache.iceberg.util.PropertyUtil;
3533

3634
import javax.annotation.Nullable;
3735

3836
import java.util.List;
3937

40-
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
41-
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
42-
4338
/** A {@link RecordWriter} to write to Iceberg's append-only table. */
4439
public class AppendOnlyWriter extends RecordWriter {
4540

@@ -48,9 +43,12 @@ public AppendOnlyWriter(
4843
RowType flussRowType,
4944
TableBucket tableBucket,
5045
@Nullable String partition,
51-
List<String> partitionKeys) {
46+
List<String> partitionKeys,
47+
FileFormat format,
48+
OutputFileFactory outputFileFactory,
49+
long targetFileSize) {
5250
super(
53-
createTaskWriter(icebergTable, tableBucket),
51+
createTaskWriter(icebergTable, format, outputFileFactory, targetFileSize),
5452
icebergTable.schema(),
5553
flussRowType,
5654
tableBucket,
@@ -59,21 +57,12 @@ public AppendOnlyWriter(
5957
}
6058

6159
private static TaskWriter<Record> createTaskWriter(
62-
Table icebergTable, TableBucket tableBucket) {
63-
// Get target file size from table properties
64-
long targetFileSize = targetFileSize(icebergTable);
65-
60+
Table icebergTable,
61+
FileFormat format,
62+
OutputFileFactory outputFileFactory,
63+
long targetFileSize) {
6664
FileAppenderFactory<Record> fileAppenderFactory =
6765
new GenericAppenderFactory(icebergTable.schema());
68-
FileFormat format = fileFormat(icebergTable);
69-
OutputFileFactory outputFileFactory =
70-
OutputFileFactory.builderFor(
71-
icebergTable,
72-
tableBucket.getBucket(),
73-
// task id always 0
74-
0)
75-
.format(format)
76-
.build();
7766

7867
return new UnpartitionedWriter<>(
7968
icebergTable.spec(),
@@ -89,20 +78,4 @@ public void write(LogRecord record) throws Exception {
8978
flussRecordAsIcebergRecord.setFlussRecord(record);
9079
taskWriter.write(flussRecordAsIcebergRecord);
9180
}
92-
93-
private static FileFormat fileFormat(Table icebergTable) {
94-
String formatString =
95-
PropertyUtil.propertyAsString(
96-
icebergTable.properties(),
97-
TableProperties.DEFAULT_FILE_FORMAT,
98-
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
99-
return FileFormat.fromString(formatString);
100-
}
101-
102-
private static long targetFileSize(Table icebergTable) {
103-
return PropertyUtil.propertyAsLong(
104-
icebergTable.properties(),
105-
WRITE_TARGET_FILE_SIZE_BYTES,
106-
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
107-
}
10881
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.iceberg.tiering.writer;
19+
20+
import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
21+
import com.alibaba.fluss.metadata.TableBucket;
22+
import com.alibaba.fluss.record.LogRecord;
23+
import com.alibaba.fluss.types.RowType;
24+
25+
import com.google.common.collect.Lists;
26+
import org.apache.iceberg.FileFormat;
27+
import org.apache.iceberg.Schema;
28+
import org.apache.iceberg.Table;
29+
import org.apache.iceberg.data.GenericAppenderFactory;
30+
import org.apache.iceberg.data.Record;
31+
import org.apache.iceberg.io.FileAppenderFactory;
32+
import org.apache.iceberg.io.OutputFileFactory;
33+
import org.apache.iceberg.io.TaskWriter;
34+
import org.apache.iceberg.util.ArrayUtil;
35+
36+
import javax.annotation.Nullable;
37+
38+
import java.util.List;
39+
40+
public class DeltaTaskWriter extends RecordWriter {
41+
42+
public DeltaTaskWriter(
43+
Table icebergTable,
44+
RowType flussRowType,
45+
TableBucket tableBucket,
46+
@Nullable String partition,
47+
List<String> partitionKeys,
48+
FileFormat format,
49+
OutputFileFactory outputFileFactory,
50+
long targetFileSize) {
51+
super(
52+
createTaskWriter(icebergTable, format, outputFileFactory, targetFileSize),
53+
icebergTable.schema(),
54+
flussRowType,
55+
tableBucket,
56+
partition,
57+
partitionKeys);
58+
}
59+
60+
private static TaskWriter<Record> createTaskWriter(
61+
Table icebergTable,
62+
FileFormat format,
63+
OutputFileFactory outputFileFactory,
64+
long targetFileSize) {
65+
66+
FileAppenderFactory<Record> appenderFactory =
67+
new GenericAppenderFactory(
68+
icebergTable.schema(),
69+
icebergTable.spec(),
70+
ArrayUtil.toIntArray(
71+
Lists.newArrayList(icebergTable.schema().identifierFieldIds())),
72+
icebergTable.schema(),
73+
null);
74+
75+
List<String> columns = Lists.newArrayList();
76+
for (Integer fieldId : icebergTable.schema().identifierFieldIds()) {
77+
columns.add(icebergTable.schema().findField(fieldId).name());
78+
}
79+
Schema deleteSchema = icebergTable.schema().select(columns);
80+
return new GenericTaskDeltaWriter(
81+
icebergTable.schema(),
82+
deleteSchema,
83+
icebergTable.spec(),
84+
format,
85+
appenderFactory,
86+
outputFileFactory,
87+
icebergTable.io(),
88+
targetFileSize);
89+
}
90+
91+
@Override
92+
public void write(LogRecord record) throws Exception {
93+
GenericTaskDeltaWriter deltaWriter = (GenericTaskDeltaWriter) taskWriter;
94+
flussRecordAsIcebergRecord.setFlussRecord(record);
95+
switch (record.getChangeType()) {
96+
case INSERT:
97+
case UPDATE_AFTER:
98+
deltaWriter.write(flussRecordAsIcebergRecord);
99+
break;
100+
case UPDATE_BEFORE:
101+
deltaWriter.delete(flussRecordAsIcebergRecord);
102+
case DELETE:
103+
deltaWriter.delete(flussRecordAsIcebergRecord);
104+
default:
105+
throw new UnsupportedOperationException(
106+
"Unknown row kind: " + record.getChangeType());
107+
}
108+
}
109+
}

0 commit comments

Comments
 (0)