Skip to content

Commit c06affb

Browse files
authored
[lake/iceberg] Support partitioned table (#1565)
1 parent 52936e9 commit c06affb

File tree

10 files changed

+379
-213
lines changed

10 files changed

+379
-213
lines changed

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717

1818
package com.alibaba.fluss.lake.iceberg.tiering;
1919

20-
import com.alibaba.fluss.lake.iceberg.tiering.writer.AppendOnlyWriter;
20+
import com.alibaba.fluss.lake.iceberg.tiering.writer.AppendOnlyTaskWriter;
2121
import com.alibaba.fluss.lake.iceberg.tiering.writer.DeltaTaskWriter;
2222
import com.alibaba.fluss.lake.writer.LakeWriter;
2323
import com.alibaba.fluss.lake.writer.WriterInitContext;
2424
import com.alibaba.fluss.metadata.TablePath;
2525
import com.alibaba.fluss.record.LogRecord;
2626

2727
import org.apache.iceberg.FileFormat;
28-
import org.apache.iceberg.PartitionSpec;
2928
import org.apache.iceberg.Schema;
3029
import org.apache.iceberg.Table;
3130
import org.apache.iceberg.TableProperties;
@@ -37,7 +36,6 @@
3736
import java.io.Closeable;
3837
import java.io.IOException;
3938
import java.util.ArrayList;
40-
import java.util.Collections;
4139
import java.util.List;
4240

4341
import static com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
@@ -65,7 +63,6 @@ public IcebergLakeWriter(
6563
private RecordWriter createRecordWriter(WriterInitContext writerInitContext) {
6664
Schema schema = icebergTable.schema();
6765
List<Integer> equalityFieldIds = new ArrayList<>(schema.identifierFieldIds());
68-
PartitionSpec spec = icebergTable.spec();
6966

7067
// Get target file size from table properties
7168
long targetFileSize = targetFileSize(icebergTable);
@@ -79,34 +76,12 @@ private RecordWriter createRecordWriter(WriterInitContext writerInitContext) {
7976
.format(format)
8077
.build();
8178

82-
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
83-
if (spec.isUnpartitioned()) {
84-
return new AppendOnlyWriter(
85-
icebergTable,
86-
writerInitContext.schema().getRowType(),
87-
writerInitContext.tableBucket(),
88-
null, // No partition for non-partitioned table
89-
Collections.emptyList(), // No partition keys
90-
format,
91-
outputFileFactory,
92-
targetFileSize);
93-
} else {
94-
return null;
95-
}
79+
if (equalityFieldIds.isEmpty()) {
80+
return new AppendOnlyTaskWriter(
81+
icebergTable, writerInitContext, format, outputFileFactory, targetFileSize);
9682
} else {
97-
if (spec.isUnpartitioned()) {
98-
return new DeltaTaskWriter(
99-
icebergTable,
100-
writerInitContext.schema().getRowType(),
101-
writerInitContext.tableBucket(),
102-
null, // No partition for non-partitioned table
103-
Collections.emptyList(), // No partition keys);
104-
format,
105-
outputFileFactory,
106-
targetFileSize);
107-
} else {
108-
return null;
109-
}
83+
return new DeltaTaskWriter(
84+
icebergTable, writerInitContext, format, outputFileFactory, targetFileSize);
11085
}
11186
}
11287

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,30 +26,22 @@
2626
import org.apache.iceberg.io.TaskWriter;
2727
import org.apache.iceberg.io.WriteResult;
2828

29-
import javax.annotation.Nullable;
30-
31-
import java.util.List;
32-
3329
/** A base interface to write {@link LogRecord} to Iceberg. */
3430
public abstract class RecordWriter implements AutoCloseable {
3531

3632
protected final TaskWriter<Record> taskWriter;
3733
protected final Schema icebergSchema;
3834
protected final int bucket;
39-
@Nullable protected final String partition;
4035
protected final FlussRecordAsIcebergRecord flussRecordAsIcebergRecord;
4136

4237
public RecordWriter(
4338
TaskWriter<Record> taskWriter,
4439
Schema icebergSchema,
4540
RowType flussRowType,
46-
TableBucket tableBucket,
47-
@Nullable String partition,
48-
List<String> partitionKeys) {
41+
TableBucket tableBucket) {
4942
this.taskWriter = taskWriter;
5043
this.icebergSchema = icebergSchema;
5144
this.bucket = tableBucket.getBucket();
52-
this.partition = partition; // null for non-partitioned tables
5345
this.flussRecordAsIcebergRecord =
5446
new FlussRecordAsIcebergRecord(
5547
tableBucket.getBucket(), icebergSchema, flussRowType);

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java renamed to fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
package com.alibaba.fluss.lake.iceberg.tiering.writer;
1919

2020
import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
21-
import com.alibaba.fluss.metadata.TableBucket;
21+
import com.alibaba.fluss.lake.writer.WriterInitContext;
2222
import com.alibaba.fluss.record.LogRecord;
23-
import com.alibaba.fluss.types.RowType;
2423

2524
import org.apache.iceberg.FileFormat;
2625
import org.apache.iceberg.Table;
@@ -29,48 +28,40 @@
2928
import org.apache.iceberg.io.FileAppenderFactory;
3029
import org.apache.iceberg.io.OutputFileFactory;
3130
import org.apache.iceberg.io.TaskWriter;
32-
import org.apache.iceberg.io.UnpartitionedWriter;
33-
34-
import javax.annotation.Nullable;
35-
36-
import java.util.List;
3731

3832
/** A {@link RecordWriter} to write to Iceberg's append-only table. */
39-
public class AppendOnlyWriter extends RecordWriter {
33+
public class AppendOnlyTaskWriter extends RecordWriter {
4034

41-
public AppendOnlyWriter(
35+
public AppendOnlyTaskWriter(
4236
Table icebergTable,
43-
RowType flussRowType,
44-
TableBucket tableBucket,
45-
@Nullable String partition,
46-
List<String> partitionKeys,
37+
WriterInitContext writerInitContext,
4738
FileFormat format,
4839
OutputFileFactory outputFileFactory,
4940
long targetFileSize) {
5041
super(
51-
createTaskWriter(icebergTable, format, outputFileFactory, targetFileSize),
42+
createTaskWriter(
43+
icebergTable, writerInitContext, format, outputFileFactory, targetFileSize),
5244
icebergTable.schema(),
53-
flussRowType,
54-
tableBucket,
55-
partition,
56-
partitionKeys);
45+
writerInitContext.schema().getRowType(),
46+
writerInitContext.tableBucket());
5747
}
5848

5949
private static TaskWriter<Record> createTaskWriter(
6050
Table icebergTable,
51+
WriterInitContext writerInitContext,
6152
FileFormat format,
6253
OutputFileFactory outputFileFactory,
6354
long targetFileSize) {
6455
FileAppenderFactory<Record> fileAppenderFactory =
65-
new GenericAppenderFactory(icebergTable.schema());
66-
67-
return new UnpartitionedWriter<>(
68-
icebergTable.spec(),
56+
new GenericAppenderFactory(icebergTable.schema(), icebergTable.spec());
57+
return new GenericRecordAppendOnlyWriter(
58+
icebergTable,
6959
format,
7060
fileAppenderFactory,
7161
outputFileFactory,
7262
icebergTable.io(),
73-
targetFileSize);
63+
targetFileSize,
64+
writerInitContext);
7465
}
7566

7667
@Override

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
package com.alibaba.fluss.lake.iceberg.tiering.writer;
1919

2020
import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
21-
import com.alibaba.fluss.metadata.TableBucket;
21+
import com.alibaba.fluss.lake.writer.WriterInitContext;
2222
import com.alibaba.fluss.record.LogRecord;
23-
import com.alibaba.fluss.types.RowType;
2423

2524
import org.apache.iceberg.FileFormat;
2625
import org.apache.iceberg.Schema;
@@ -31,8 +30,6 @@
3130
import org.apache.iceberg.io.OutputFileFactory;
3231
import org.apache.iceberg.io.TaskWriter;
3332

34-
import javax.annotation.Nullable;
35-
3633
import java.util.ArrayList;
3734
import java.util.List;
3835

@@ -41,27 +38,24 @@ public class DeltaTaskWriter extends RecordWriter {
4138

4239
public DeltaTaskWriter(
4340
Table icebergTable,
44-
RowType flussRowType,
45-
TableBucket tableBucket,
46-
@Nullable String partition,
47-
List<String> partitionKeys,
41+
WriterInitContext writerInitContext,
4842
FileFormat format,
4943
OutputFileFactory outputFileFactory,
5044
long targetFileSize) {
5145
super(
52-
createTaskWriter(icebergTable, format, outputFileFactory, targetFileSize),
46+
createTaskWriter(
47+
icebergTable, format, outputFileFactory, targetFileSize, writerInitContext),
5348
icebergTable.schema(),
54-
flussRowType,
55-
tableBucket,
56-
partition,
57-
partitionKeys);
49+
writerInitContext.schema().getRowType(),
50+
writerInitContext.tableBucket());
5851
}
5952

6053
private static TaskWriter<Record> createTaskWriter(
6154
Table icebergTable,
6255
FileFormat format,
6356
OutputFileFactory outputFileFactory,
64-
long targetFileSize) {
57+
long targetFileSize,
58+
WriterInitContext writerInitContext) {
6559
int[] equalityFieldIds =
6660
icebergTable.schema().identifierFieldIds().stream()
6761
.mapToInt(Integer::intValue)
@@ -79,20 +73,20 @@ private static TaskWriter<Record> createTaskWriter(
7973
columns.add(icebergTable.schema().findField(fieldId).name());
8074
}
8175
Schema deleteSchema = icebergTable.schema().select(columns);
82-
return new GenericTaskDeltaWriter(
83-
icebergTable.schema(),
76+
return new GenericRecordDeltaWriter(
77+
icebergTable,
8478
deleteSchema,
85-
icebergTable.spec(),
8679
format,
8780
appenderFactory,
8881
outputFileFactory,
8982
icebergTable.io(),
90-
targetFileSize);
83+
targetFileSize,
84+
writerInitContext);
9185
}
9286

9387
@Override
9488
public void write(LogRecord record) throws Exception {
95-
GenericTaskDeltaWriter deltaWriter = (GenericTaskDeltaWriter) taskWriter;
89+
GenericRecordDeltaWriter deltaWriter = (GenericRecordDeltaWriter) taskWriter;
9690
flussRecordAsIcebergRecord.setFlussRecord(record);
9791
switch (record.getChangeType()) {
9892
case INSERT:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.alibaba.fluss.lake.iceberg.tiering.writer;
20+
21+
import com.alibaba.fluss.lake.writer.WriterInitContext;
22+
23+
import org.apache.iceberg.FileFormat;
24+
import org.apache.iceberg.Table;
25+
import org.apache.iceberg.data.Record;
26+
import org.apache.iceberg.io.BaseTaskWriter;
27+
import org.apache.iceberg.io.FileAppenderFactory;
28+
import org.apache.iceberg.io.FileIO;
29+
import org.apache.iceberg.io.OutputFileFactory;
30+
31+
import java.io.IOException;
32+
33+
import static com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toPartition;
34+
35+
/** A generic task writer to write {@link Record} for append-only table. * */
36+
public class GenericRecordAppendOnlyWriter extends BaseTaskWriter<Record> {
37+
38+
private final RollingFileWriter currentWriter;
39+
40+
protected GenericRecordAppendOnlyWriter(
41+
Table icebergTable,
42+
FileFormat format,
43+
FileAppenderFactory<Record> appenderFactory,
44+
OutputFileFactory fileFactory,
45+
FileIO io,
46+
long targetFileSize,
47+
WriterInitContext writerInitContext) {
48+
super(icebergTable.spec(), format, appenderFactory, fileFactory, io, targetFileSize);
49+
currentWriter =
50+
new RollingFileWriter(
51+
toPartition(
52+
icebergTable,
53+
writerInitContext.partition(),
54+
writerInitContext.tableBucket().getBucket()));
55+
}
56+
57+
@Override
58+
public void write(Record record) throws IOException {
59+
currentWriter.write(record);
60+
}
61+
62+
@Override
63+
public void close() throws IOException {
64+
currentWriter.close();
65+
}
66+
}

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java renamed to fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package com.alibaba.fluss.lake.iceberg.tiering.writer;
1919

20+
import com.alibaba.fluss.lake.writer.WriterInitContext;
21+
2022
import org.apache.iceberg.FileFormat;
2123
import org.apache.iceberg.PartitionKey;
22-
import org.apache.iceberg.PartitionSpec;
2324
import org.apache.iceberg.Schema;
2425
import org.apache.iceberg.StructLike;
26+
import org.apache.iceberg.Table;
2527
import org.apache.iceberg.data.Record;
2628
import org.apache.iceberg.io.BaseTaskWriter;
2729
import org.apache.iceberg.io.FileAppenderFactory;
@@ -30,21 +32,30 @@
3032

3133
import java.io.IOException;
3234

35+
import static com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toPartition;
36+
3337
/** A generic task equality delta writer. * */
34-
class GenericTaskDeltaWriter extends BaseTaskWriter<Record> {
38+
class GenericRecordDeltaWriter extends BaseTaskWriter<Record> {
3539
private final GenericEqualityDeltaWriter deltaWriter;
3640

37-
public GenericTaskDeltaWriter(
38-
Schema schema,
41+
public GenericRecordDeltaWriter(
42+
Table icebergTable,
3943
Schema deleteSchema,
40-
PartitionSpec spec,
4144
FileFormat format,
4245
FileAppenderFactory<Record> appenderFactory,
4346
OutputFileFactory fileFactory,
4447
FileIO io,
45-
long targetFileSize) {
46-
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
47-
this.deltaWriter = new GenericEqualityDeltaWriter(null, schema, deleteSchema);
48+
long targetFileSize,
49+
WriterInitContext writerInitContext) {
50+
super(icebergTable.spec(), format, appenderFactory, fileFactory, io, targetFileSize);
51+
this.deltaWriter =
52+
new GenericEqualityDeltaWriter(
53+
toPartition(
54+
icebergTable,
55+
writerInitContext.partition(),
56+
writerInitContext.tableBucket().getBucket()),
57+
icebergTable.schema(),
58+
deleteSchema);
4859
}
4960

5061
@Override

0 commit comments

Comments
 (0)