1717
1818package com .alibaba .fluss .lake .iceberg .tiering ;
1919
20+ import com .alibaba .fluss .lake .iceberg .tiering .writer .AppendOnlyWriter ;
21+ import com .alibaba .fluss .lake .iceberg .tiering .writer .DeltaTaskWriter ;
2022import com .alibaba .fluss .lake .writer .LakeWriter ;
2123import com .alibaba .fluss .lake .writer .WriterInitContext ;
2224import com .alibaba .fluss .metadata .TablePath ;
2931import org .apache .iceberg .Table ;
3032import org .apache .iceberg .TableProperties ;
3133import org .apache .iceberg .catalog .Catalog ;
32- import org .apache .iceberg .io .FileAppenderFactory ;
33- import org .apache .iceberg .io .FileIO ;
3434import org .apache .iceberg .io .OutputFileFactory ;
35- import org .apache .iceberg .io .TaskWriter ;
36- import org .apache .iceberg .io .UnpartitionedWriter ;
3735import org .apache .iceberg .io .WriteResult ;
3836import org .apache .iceberg .util .PropertyUtil ;
3937
4038import java .io .Closeable ;
4139import java .io .IOException ;
40+ import java .util .Collections ;
4241import java .util .List ;
4342
4443import static com .alibaba .fluss .lake .iceberg .utils .IcebergConversions .toIceberg ;
@@ -50,7 +49,7 @@ public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
5049
5150 private final Catalog icebergCatalog ;
5251 private final Table icebergTable ;
53- private final TaskWriter recordWriter ;
52+ private final RecordWriter recordWriter ;
5453
5554 public IcebergLakeWriter (
5655 IcebergCatalogProvider icebergCatalogProvider , WriterInitContext writerInitContext )
@@ -63,13 +62,14 @@ public IcebergLakeWriter(
6362 this .recordWriter = createRecordWriter (writerInitContext );
6463 }
6564
66- private TaskWriter < LogRecord > createRecordWriter (WriterInitContext writerInitContext ) {
65+ private RecordWriter createRecordWriter (WriterInitContext writerInitContext ) {
6766 Schema schema = icebergTable .schema ();
6867 List <Integer > equalityFieldIds = Lists .newArrayList (schema .identifierFieldIds ());
6968 PartitionSpec spec = icebergTable .spec ();
70- FileFormat format = fileFormat (icebergTable );
7169
72- FileAppenderFactory <LogRecord > appenderFactory = new FlussAppenderFactory ();
70+ // Get target file size from table properties
71+ long targetFileSize = targetFileSize (icebergTable );
72+ FileFormat format = fileFormat (icebergTable );
7373 OutputFileFactory outputFileFactory =
7474 OutputFileFactory .builderFor (
7575 icebergTable ,
@@ -78,28 +78,30 @@ private TaskWriter<LogRecord> createRecordWriter(WriterInitContext writerInitCon
7878 0 )
7979 .format (format )
8080 .build ();
81- // Get target file size from table properties
82- long targetFileSizeBytes = targetFileSize (icebergTable );
83- FileIO fileIO = icebergTable .io ();
8481
8582 if (equalityFieldIds == null || equalityFieldIds .isEmpty ()) {
8683 if (spec .isUnpartitioned ()) {
84+ return new AppendOnlyWriter (
85+ icebergTable ,
86+ writerInitContext .schema ().getRowType (),
87+ writerInitContext .tableBucket (),
88+ null , // No partition for non-partitioned table
89+ Collections .emptyList (), // No partition keys
90+ format ,
91+ outputFileFactory ,
92+ targetFileSize );
8793 }
8894 } else {
8995 if (spec .isUnpartitioned ()) {
90- return new UnpartitionedDeltaWriter (
91- spec ,
96+ return new DeltaTaskWriter (
97+ icebergTable ,
98+ writerInitContext .schema ().getRowType (),
99+ writerInitContext .tableBucket (),
100+ null , // No partition for non-partitioned table
101+ Collections .emptyList (), // No partition keys);
92102 format ,
93- appenderFactory ,
94103 outputFileFactory ,
95- fileIO ,
96- targetFileSizeBytes ,
97- schema ,
98- writerInitContext .schema ().getRowType (),
99- equalityFieldIds ,
100- writerInitContext .tableBucket ().getBucket ());
101- } else {
102-
104+ targetFileSize );
103105 }
104106 }
105107 return null ;
0 commit comments