Skip to content

Commit b492c29

Browse files
MehulBatraluoyuxia
andauthored
[lake/iceberg] Iceberg encoding strategy (apache#1350)
--------- Co-authored-by: luoyuxia <[email protected]>
1 parent 4738a24 commit b492c29

File tree

10 files changed

+736
-25
lines changed

10 files changed

+736
-25
lines changed

fluss-common/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@
103103
<version>${paimon.version}</version>
104104
<scope>test</scope>
105105
</dependency>
106+
<dependency>
107+
<groupId>org.apache.iceberg</groupId>
108+
<artifactId>iceberg-api</artifactId>
109+
<version>${iceberg.version}</version>
110+
<scope>test</scope>
111+
</dependency>
106112
</dependencies>
107113

108114
<build>

fluss-common/src/main/java/com/alibaba/fluss/metadata/DataLakeFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
/** An enum for datalake format. */
2121
public enum DataLakeFormat {
22-
PAIMON("paimon");
22+
PAIMON("paimon"),
23+
ICEBERG("iceberg");
2324

2425
private final String value;
2526

fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.alibaba.fluss.metadata.DataLakeFormat;
2121
import com.alibaba.fluss.row.InternalRow;
22+
import com.alibaba.fluss.row.encode.iceberg.IcebergKeyEncoder;
2223
import com.alibaba.fluss.row.encode.paimon.PaimonKeyEncoder;
2324
import com.alibaba.fluss.types.RowType;
2425

@@ -46,6 +47,8 @@ static KeyEncoder of(
4647
return CompactedKeyEncoder.createKeyEncoder(rowType, keyFields);
4748
} else if (lakeFormat == DataLakeFormat.PAIMON) {
4849
return new PaimonKeyEncoder(rowType, keyFields);
50+
} else if (lakeFormat == DataLakeFormat.ICEBERG) {
51+
return new IcebergKeyEncoder(rowType, keyFields);
4952
} else {
5053
throw new UnsupportedOperationException("Unsupported datalake format: " + lakeFormat);
5154
}
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.row.encode.iceberg;
19+
20+
import com.alibaba.fluss.memory.MemorySegment;
21+
import com.alibaba.fluss.row.BinaryString;
22+
import com.alibaba.fluss.row.Decimal;
23+
import com.alibaba.fluss.row.TimestampNtz;
24+
import com.alibaba.fluss.types.DataType;
25+
import com.alibaba.fluss.utils.UnsafeUtils;
26+
27+
import java.io.Serializable;
28+
import java.util.Arrays;
29+
30+
import static com.alibaba.fluss.types.DataTypeChecks.getPrecision;
31+
32+
/**
33+
* A writer to encode Fluss's {@link com.alibaba.fluss.row.InternalRow} using Iceberg's binary
34+
* encoding format.
35+
*
36+
* <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() implementation:
37+
* https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java
38+
*
39+
* <p>Key encoding principles from Iceberg's Conversions class:
40+
*
41+
* <ul>
42+
* <li>All numeric types (int, long, float, double, timestamps) use LITTLE-ENDIAN byte order
43+
* <li>Decimal types use BIG-ENDIAN byte order
44+
* <li>Strings are encoded as UTF-8 bytes
45+
* <li>Timestamps are stored as long values (microseconds since epoch)
46+
* </ul>
47+
*
48+
* <p>Note: This implementation uses Fluss's MemorySegment instead of ByteBuffer for performance,
49+
* but maintains byte-level compatibility with Iceberg's encoding.
50+
*/
51+
class IcebergBinaryRowWriter {
52+
53+
private final int arity;
54+
private byte[] buffer;
55+
private MemorySegment segment;
56+
private int cursor;
57+
58+
public IcebergBinaryRowWriter(int arity) {
59+
this.arity = arity;
60+
// Conservative initial size to avoid frequent resizing
61+
int initialSize = 8 + (arity * 8);
62+
setBuffer(new byte[initialSize]);
63+
reset();
64+
}
65+
66+
public void reset() {
67+
this.cursor = 0;
68+
// Clear only the used portion for efficiency
69+
if (cursor > 0) {
70+
Arrays.fill(buffer, 0, Math.min(cursor, buffer.length), (byte) 0);
71+
}
72+
}
73+
74+
public byte[] toBytes() {
75+
byte[] result = new byte[cursor];
76+
System.arraycopy(buffer, 0, result, 0, cursor);
77+
return result;
78+
}
79+
80+
public void setNullAt(int pos) {
81+
// For Iceberg key encoding, null values should not occur
82+
// This is validated at the encoder level
83+
throw new UnsupportedOperationException(
84+
"Null values are not supported in Iceberg key encoding");
85+
}
86+
87+
public void writeBoolean(boolean value) {
88+
ensureCapacity(1);
89+
UnsafeUtils.putBoolean(buffer, cursor, value);
90+
cursor += 1;
91+
}
92+
93+
public void writeByte(byte value) {
94+
ensureCapacity(1);
95+
UnsafeUtils.putByte(buffer, cursor, value);
96+
cursor += 1;
97+
}
98+
99+
public void writeShort(short value) {
100+
ensureCapacity(2);
101+
UnsafeUtils.putShort(buffer, cursor, value);
102+
cursor += 2;
103+
}
104+
105+
public void writeInt(int value) {
106+
ensureCapacity(4);
107+
UnsafeUtils.putInt(buffer, cursor, value);
108+
cursor += 4;
109+
}
110+
111+
public void writeLong(long value) {
112+
ensureCapacity(8);
113+
UnsafeUtils.putLong(buffer, cursor, value);
114+
cursor += 8;
115+
}
116+
117+
public void writeFloat(float value) {
118+
ensureCapacity(4);
119+
UnsafeUtils.putFloat(buffer, cursor, value);
120+
cursor += 4;
121+
}
122+
123+
public void writeDouble(double value) {
124+
ensureCapacity(8);
125+
UnsafeUtils.putDouble(buffer, cursor, value);
126+
cursor += 8;
127+
}
128+
129+
public void writeString(BinaryString value) {
130+
// Convert to UTF-8 byte array
131+
byte[] bytes = BinaryString.encodeUTF8(value.toString());
132+
// Write length prefix followed by UTF-8 bytes
133+
writeInt(bytes.length); // 4-byte length prefix
134+
ensureCapacity(bytes.length); // Ensure space for actual string bytes
135+
segment.put(cursor, bytes, 0, bytes.length);
136+
cursor += bytes.length;
137+
}
138+
139+
public void writeBytes(byte[] bytes) {
140+
// Write length prefix followed by binary data
141+
writeInt(bytes.length); // 4-byte length prefix
142+
ensureCapacity(bytes.length); // Ensure space for actual binary bytes
143+
segment.put(cursor, bytes, 0, bytes.length);
144+
cursor += bytes.length;
145+
}
146+
147+
public void writeDecimal(Decimal value, int precision) {
148+
byte[] unscaled = value.toUnscaledBytes();
149+
writeBytes(unscaled); // Adds 4-byte length prefix before the actual bytes
150+
}
151+
152+
private void ensureCapacity(int neededSize) {
153+
if (buffer.length < cursor + neededSize) {
154+
grow(cursor + neededSize);
155+
}
156+
}
157+
158+
private void grow(int minCapacity) {
159+
int oldCapacity = buffer.length;
160+
int newCapacity = oldCapacity + (oldCapacity >> 1); // 1.5x growth
161+
if (newCapacity < minCapacity) {
162+
newCapacity = minCapacity;
163+
}
164+
setBuffer(Arrays.copyOf(buffer, newCapacity));
165+
}
166+
167+
private void setBuffer(byte[] buffer) {
168+
this.buffer = buffer;
169+
this.segment = MemorySegment.wrap(buffer);
170+
}
171+
172+
/**
173+
* Creates an accessor for writing the elements of an iceberg binary row writer during runtime.
174+
*
175+
* @param fieldType the field type to write
176+
*/
177+
public static FieldWriter createFieldWriter(DataType fieldType) {
178+
switch (fieldType.getTypeRoot()) {
179+
case INTEGER:
180+
case DATE:
181+
return (writer, value) -> writer.writeInt((int) value);
182+
183+
case TIME_WITHOUT_TIME_ZONE:
184+
// Write time as microseconds long (milliseconds * 1000)
185+
return (writer, value) -> {
186+
int millis = (int) value;
187+
long micros = millis * 1000L;
188+
writer.writeLong(micros);
189+
};
190+
191+
case BIGINT:
192+
return (writer, value) -> writer.writeLong((long) value);
193+
// support for nanoseconds come check again after #1195 merge
194+
case TIMESTAMP_WITHOUT_TIME_ZONE:
195+
return (writer, value) -> {
196+
TimestampNtz ts = (TimestampNtz) value;
197+
long micros = ts.getMillisecond() * 1000L + (ts.getNanoOfMillisecond() / 1000L);
198+
writer.writeLong(micros);
199+
};
200+
201+
case DECIMAL:
202+
final int decimalPrecision = getPrecision(fieldType);
203+
return (writer, value) -> writer.writeDecimal((Decimal) value, decimalPrecision);
204+
205+
case STRING:
206+
case CHAR:
207+
return (writer, value) -> writer.writeString((BinaryString) value);
208+
209+
case BINARY:
210+
case BYTES:
211+
return (writer, value) -> writer.writeBytes((byte[]) value);
212+
213+
default:
214+
throw new IllegalArgumentException(
215+
"Unsupported type for Iceberg binary row writer: " + fieldType);
216+
}
217+
}
218+
219+
/** Accessor for writing the elements of an iceberg binary row writer during runtime. */
220+
interface FieldWriter extends Serializable {
221+
void writeField(IcebergBinaryRowWriter writer, Object value);
222+
}
223+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.row.encode.iceberg;
19+
20+
import com.alibaba.fluss.row.InternalRow;
21+
import com.alibaba.fluss.row.encode.KeyEncoder;
22+
import com.alibaba.fluss.types.DataType;
23+
import com.alibaba.fluss.types.RowType;
24+
25+
import java.util.List;
26+
27+
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
28+
29+
/** An implementation of {@link KeyEncoder} to follow Iceberg's encoding strategy. */
30+
public class IcebergKeyEncoder implements KeyEncoder {
31+
32+
private final InternalRow.FieldGetter[] fieldGetters;
33+
34+
private final IcebergBinaryRowWriter.FieldWriter[] fieldEncoders;
35+
36+
private final IcebergBinaryRowWriter icebergBinaryRowWriter;
37+
38+
public IcebergKeyEncoder(RowType rowType, List<String> keys) {
39+
40+
// Validate single key field requirement as per FIP
41+
checkArgument(
42+
keys.size() == 1,
43+
"Key fields must have exactly one field for iceberg format, but got: %s",
44+
keys);
45+
46+
// for get fields from fluss internal row
47+
fieldGetters = new InternalRow.FieldGetter[keys.size()];
48+
// for encode fields into iceberg
49+
fieldEncoders = new IcebergBinaryRowWriter.FieldWriter[keys.size()];
50+
for (int i = 0; i < keys.size(); i++) {
51+
int keyIndex = rowType.getFieldIndex(keys.get(i));
52+
DataType keyDataType = rowType.getTypeAt(keyIndex);
53+
fieldGetters[i] = InternalRow.createFieldGetter(keyDataType, keyIndex);
54+
fieldEncoders[i] = IcebergBinaryRowWriter.createFieldWriter(keyDataType);
55+
}
56+
57+
icebergBinaryRowWriter = new IcebergBinaryRowWriter(keys.size());
58+
}
59+
60+
@Override
61+
public byte[] encodeKey(InternalRow row) {
62+
icebergBinaryRowWriter.reset();
63+
// iterate all the fields of the row, and encode each field
64+
for (int i = 0; i < fieldGetters.length; i++) {
65+
fieldEncoders[i].writeField(
66+
icebergBinaryRowWriter, fieldGetters[i].getFieldOrNull(row));
67+
}
68+
return icebergBinaryRowWriter.toBytes();
69+
}
70+
}

0 commit comments

Comments
 (0)