Skip to content

Commit 4180f9c

Browse files
committed
nit
1 parent d4dfb45 commit 4180f9c

File tree

9 files changed

+796
-30
lines changed

9 files changed

+796
-30
lines changed
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.iceberg.tiering;
19+
20+
import com.alibaba.fluss.record.LogRecord;
21+
import com.alibaba.fluss.types.RowType;
22+
23+
import com.google.common.collect.Sets;
24+
import org.apache.iceberg.FileFormat;
25+
import org.apache.iceberg.PartitionKey;
26+
import org.apache.iceberg.PartitionSpec;
27+
import org.apache.iceberg.Schema;
28+
import org.apache.iceberg.StructLike;
29+
import org.apache.iceberg.io.BaseTaskWriter;
30+
import org.apache.iceberg.io.FileAppenderFactory;
31+
import org.apache.iceberg.io.FileIO;
32+
import org.apache.iceberg.io.OutputFileFactory;
33+
import org.apache.iceberg.types.TypeUtil;
34+
35+
import java.io.IOException;
36+
import java.util.List;
37+
38+
abstract class BaseDeltaTaskWriter extends BaseTaskWriter<LogRecord> {
39+
private final Schema schema;
40+
private final Schema deleteSchema;
41+
private final RowDataWrapper wrapper;
42+
private final RowDataWrapper keyWrapper;
43+
private final boolean upsert = true;
44+
private final RowDataProjection keyProjection;
45+
46+
BaseDeltaTaskWriter(
47+
PartitionSpec spec,
48+
FileFormat format,
49+
FileAppenderFactory<LogRecord> appenderFactory,
50+
OutputFileFactory fileFactory,
51+
FileIO io,
52+
long targetFileSize,
53+
Schema schema,
54+
RowType rowType,
55+
List<Integer> equalityFieldIds,
56+
int bucket) {
57+
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
58+
this.schema = schema;
59+
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
60+
this.wrapper = new RowDataWrapper(bucket, rowType, schema.asStruct());
61+
this.keyWrapper =
62+
new RowDataWrapper(
63+
bucket,
64+
(RowType)
65+
TypeUtil.visit(deleteSchema, new IcebergDataTypeToFlussDataType()),
66+
deleteSchema.asStruct());
67+
this.keyProjection =
68+
new RowDataProjection(rowType, schema.asStruct(), deleteSchema.asStruct());
69+
}
70+
71+
abstract RowDataDeltaWriter route(LogRecord row);
72+
73+
RowDataWrapper wrapper() {
74+
return wrapper;
75+
}
76+
77+
@Override
78+
public void write(LogRecord row) throws IOException {
79+
RowDataDeltaWriter writer = route(row);
80+
81+
switch (row.getChangeType()) {
82+
case INSERT:
83+
case UPDATE_AFTER:
84+
if (upsert) {
85+
writer.deleteKey(
86+
new ProjectionLogRecord(
87+
row.logOffset(),
88+
row.timestamp(),
89+
row.getChangeType(),
90+
row.getRow(),
91+
this.keyProjection));
92+
}
93+
writer.write(row);
94+
break;
95+
96+
default:
97+
throw new UnsupportedOperationException("Unknown row kind: " + row.getChangeType());
98+
}
99+
}
100+
101+
protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
102+
RowDataDeltaWriter(PartitionKey partition) {
103+
super(partition, schema, deleteSchema);
104+
}
105+
106+
@Override
107+
protected StructLike asStructLike(LogRecord data) {
108+
return wrapper.wrap(data);
109+
}
110+
111+
@Override
112+
protected StructLike asStructLikeKey(LogRecord data) {
113+
return keyWrapper.wrap(data);
114+
}
115+
}
116+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.iceberg.tiering;
19+
20+
import com.alibaba.fluss.record.LogRecord;
21+
22+
import org.apache.iceberg.FileFormat;
23+
import org.apache.iceberg.MetricsConfig;
24+
import org.apache.iceberg.StructLike;
25+
import org.apache.iceberg.avro.Avro;
26+
import org.apache.iceberg.deletes.EqualityDeleteWriter;
27+
import org.apache.iceberg.deletes.PositionDeleteWriter;
28+
import org.apache.iceberg.encryption.EncryptedOutputFile;
29+
import org.apache.iceberg.io.DataWriter;
30+
import org.apache.iceberg.io.FileAppender;
31+
import org.apache.iceberg.io.FileAppenderFactory;
32+
import org.apache.iceberg.io.OutputFile;
33+
34+
import java.io.Serializable;
35+
36+
public class FlussAppenderFactory implements FileAppenderFactory<LogRecord>, Serializable {
37+
@Override
38+
public FileAppender<LogRecord> newAppender(OutputFile outputFile, FileFormat fileFormat) {
39+
return null;
40+
}
41+
42+
@Override
43+
public DataWriter<LogRecord> newDataWriter(
44+
EncryptedOutputFile outputFile, FileFormat format, StructLike partition) {
45+
return null;
46+
}
47+
48+
@Override
49+
public EqualityDeleteWriter<LogRecord> newEqDeleteWriter(
50+
EncryptedOutputFile outputFile, FileFormat format, StructLike partition) {
51+
return null;
52+
}
53+
54+
55+
@Override
56+
public PositionDeleteWriter<LogRecord> newPosDeleteWriter(
57+
EncryptedOutputFile outputFile, FileFormat format, StructLike partition) {
58+
return null;
59+
}
60+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.lake.iceberg.tiering;
19+
20+
import com.alibaba.fluss.types.BigIntType;
21+
import com.alibaba.fluss.types.BinaryType;
22+
import com.alibaba.fluss.types.BooleanType;
23+
import com.alibaba.fluss.types.DataType;
24+
import com.alibaba.fluss.types.DateType;
25+
import com.alibaba.fluss.types.DecimalType;
26+
import com.alibaba.fluss.types.DoubleType;
27+
import com.alibaba.fluss.types.FloatType;
28+
import com.alibaba.fluss.types.IntType;
29+
import com.alibaba.fluss.types.LocalZonedTimestampType;
30+
import com.alibaba.fluss.types.StringType;
31+
import com.alibaba.fluss.types.TimeType;
32+
import com.alibaba.fluss.types.TimestampType;
33+
34+
import org.apache.iceberg.types.Type;
35+
import org.apache.iceberg.types.TypeUtil;
36+
import org.apache.iceberg.types.Types;
37+
38+
public class IcebergDataTypeToFlussDataType extends TypeUtil.SchemaVisitor<DataType> {
39+
IcebergDataTypeToFlussDataType() {}
40+
41+
@Override
42+
public DataType primitive(Type.PrimitiveType primitive) {
43+
switch (primitive.typeId()) {
44+
case BOOLEAN:
45+
return new BooleanType();
46+
case INTEGER:
47+
return new IntType();
48+
case LONG:
49+
return new BigIntType();
50+
case FLOAT:
51+
return new FloatType();
52+
case DOUBLE:
53+
return new DoubleType();
54+
case DATE:
55+
return new DateType();
56+
case TIME:
57+
// For the type: Flink only support TimeType with default precision (second) now.
58+
// The
59+
// precision of time is
60+
// not supported in Flink, so we can think of it as a simple time type directly.
61+
// For the data: Flink uses int that support mills to represent time data, so it
62+
// supports
63+
// mills precision.
64+
return new TimeType();
65+
case TIMESTAMP:
66+
Types.TimestampType timestamp = (Types.TimestampType) primitive;
67+
if (timestamp.shouldAdjustToUTC()) {
68+
// MICROS
69+
return new LocalZonedTimestampType(6);
70+
} else {
71+
// MICROS
72+
return new TimestampType(6);
73+
}
74+
case STRING:
75+
return new StringType();
76+
case UUID:
77+
// UUID length is 16
78+
return new BinaryType(16);
79+
case FIXED:
80+
Types.FixedType fixedType = (Types.FixedType) primitive;
81+
return new BinaryType(fixedType.length());
82+
case BINARY:
83+
return new BinaryType();
84+
case DECIMAL:
85+
Types.DecimalType decimal = (Types.DecimalType) primitive;
86+
return new DecimalType(decimal.precision(), decimal.scale());
87+
default:
88+
throw new UnsupportedOperationException(
89+
"Cannot convert unknown type to Flink: " + primitive);
90+
}
91+
}
92+
}

fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java

Lines changed: 70 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,40 @@
1717

1818
package com.alibaba.fluss.lake.iceberg.tiering;
1919

20-
import com.alibaba.fluss.lake.iceberg.tiering.append.AppendOnlyWriter;
2120
import com.alibaba.fluss.lake.writer.LakeWriter;
2221
import com.alibaba.fluss.lake.writer.WriterInitContext;
2322
import com.alibaba.fluss.metadata.TablePath;
2423
import com.alibaba.fluss.record.LogRecord;
2524

25+
import com.google.common.collect.Lists;
26+
import org.apache.iceberg.FileFormat;
27+
import org.apache.iceberg.PartitionSpec;
28+
import org.apache.iceberg.Schema;
2629
import org.apache.iceberg.Table;
30+
import org.apache.iceberg.TableProperties;
2731
import org.apache.iceberg.catalog.Catalog;
32+
import org.apache.iceberg.io.FileAppenderFactory;
33+
import org.apache.iceberg.io.FileIO;
34+
import org.apache.iceberg.io.OutputFileFactory;
35+
import org.apache.iceberg.io.TaskWriter;
36+
import org.apache.iceberg.io.UnpartitionedWriter;
2837
import org.apache.iceberg.io.WriteResult;
38+
import org.apache.iceberg.util.PropertyUtil;
2939

3040
import java.io.Closeable;
3141
import java.io.IOException;
32-
import java.util.Collections;
42+
import java.util.List;
3343

3444
import static com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
45+
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
46+
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
3547

3648
/** Implementation of {@link LakeWriter} for Iceberg. */
3749
public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
3850

3951
private final Catalog icebergCatalog;
4052
private final Table icebergTable;
41-
private final RecordWriter recordWriter;
53+
private final TaskWriter recordWriter;
4254

4355
public IcebergLakeWriter(
4456
IcebergCatalogProvider icebergCatalogProvider, WriterInitContext writerInitContext)
@@ -51,20 +63,46 @@ public IcebergLakeWriter(
5163
this.recordWriter = createRecordWriter(writerInitContext);
5264
}
5365

54-
private RecordWriter createRecordWriter(WriterInitContext writerInitContext) {
55-
if (!icebergTable.spec().isUnpartitioned()) {
56-
throw new UnsupportedOperationException("Partitioned tables are not yet supported");
57-
}
58-
59-
// For now, assume append-only (no primary keys)
66+
private TaskWriter<LogRecord> createRecordWriter(WriterInitContext writerInitContext) {
67+
Schema schema = icebergTable.schema();
68+
List<Integer> equalityFieldIds = Lists.newArrayList(schema.identifierFieldIds());
69+
PartitionSpec spec = icebergTable.spec();
70+
FileFormat format = fileFormat(icebergTable);
71+
72+
FileAppenderFactory<LogRecord> appenderFactory = new FlussAppenderFactory();
73+
OutputFileFactory outputFileFactory =
74+
OutputFileFactory.builderFor(
75+
icebergTable,
76+
writerInitContext.tableBucket().getBucket(),
77+
// task id always 0
78+
0)
79+
.format(format)
80+
.build();
81+
// Get target file size from table properties
82+
long targetFileSizeBytes = targetFileSize(icebergTable);
83+
FileIO fileIO = icebergTable.io();
84+
85+
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
86+
if (spec.isUnpartitioned()) {
87+
}
88+
} else {
89+
if (spec.isUnpartitioned()) {
90+
return new UnpartitionedDeltaWriter(
91+
spec,
92+
format,
93+
appenderFactory,
94+
outputFileFactory,
95+
fileIO,
96+
targetFileSizeBytes,
97+
schema,
98+
writerInitContext.schema().getRowType(),
99+
equalityFieldIds,
100+
writerInitContext.tableBucket().getBucket());
101+
} else {
60102

61-
return new AppendOnlyWriter(
62-
icebergTable,
63-
writerInitContext.schema().getRowType(),
64-
writerInitContext.tableBucket(),
65-
null, // No partition for non-partitioned table
66-
Collections.emptyList() // No partition keys
67-
);
103+
}
104+
}
105+
return null;
68106
}
69107

70108
@Override
@@ -107,4 +145,20 @@ private Table getTable(TablePath tablePath) throws IOException {
107145
throw new IOException("Failed to get table " + tablePath + " in Iceberg.", e);
108146
}
109147
}
148+
149+
private static FileFormat fileFormat(Table icebergTable) {
150+
String formatString =
151+
PropertyUtil.propertyAsString(
152+
icebergTable.properties(),
153+
TableProperties.DEFAULT_FILE_FORMAT,
154+
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
155+
return FileFormat.fromString(formatString);
156+
}
157+
158+
private static long targetFileSize(Table icebergTable) {
159+
return PropertyUtil.propertyAsLong(
160+
icebergTable.properties(),
161+
WRITE_TARGET_FILE_SIZE_BYTES,
162+
WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
163+
}
110164
}

0 commit comments

Comments
 (0)