Skip to content

Commit a55c4ac

Browse files
authored
[iceberg] Introduce iceberg bucketing function (#1409)
1 parent 43e3812 commit a55c4ac

File tree

6 files changed

+597
-243
lines changed

6 files changed

+597
-243
lines changed

fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ static BucketingFunction of(@Nullable DataLakeFormat lakeFormat) {
4141
return new PaimonBucketingFunction();
4242
} else if (lakeFormat == DataLakeFormat.LANCE) {
4343
return new FlussBucketingFunction();
44+
} else if (lakeFormat == DataLakeFormat.ICEBERG) {
45+
return new IcebergBucketingFunction();
4446
} else {
4547
throw new UnsupportedOperationException("Unsupported lake format: " + lakeFormat);
4648
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.bucketing;
19+
20+
import com.alibaba.fluss.shaded.guava32.com.google.common.hash.HashFunction;
21+
import com.alibaba.fluss.shaded.guava32.com.google.common.hash.Hashing;
22+
23+
/** An implementation of {@link BucketingFunction} to follow Iceberg's bucketing strategy. */
24+
public class IcebergBucketingFunction implements BucketingFunction {
25+
26+
private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
27+
28+
@Override
29+
public int bucketing(byte[] bucketKey, int numBuckets) {
30+
if (bucketKey == null || bucketKey.length == 0) {
31+
throw new IllegalArgumentException("bucketKey must not be null or empty");
32+
}
33+
if (numBuckets <= 0) {
34+
throw new IllegalArgumentException("numBuckets must be positive");
35+
}
36+
return (MURMUR3.hashBytes(bucketKey).asInt() & Integer.MAX_VALUE) % numBuckets;
37+
}
38+
}

fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,48 +27,43 @@
2727
import java.io.Serializable;
2828
import java.util.Arrays;
2929

30-
import static com.alibaba.fluss.types.DataTypeChecks.getPrecision;
31-
3230
/**
3331
* A writer to encode Fluss's {@link com.alibaba.fluss.row.InternalRow} using Iceberg's binary
3432
* encoding format.
3533
*
36-
* <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer() implementation:
37-
* https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java
38-
*
39-
* <p>Key encoding principles from Iceberg's Conversions class:
34+
* <p>The encoding logic is to encode different types of keys into bytes array, which will have the
35+
* same hash value with iceberg's bucketing hash function.
4036
*
4137
* <ul>
42-
* <li>All numeric types (int, long, float, double, timestamps) use LITTLE-ENDIAN byte order
38+
* <li>For int type, it is treated as a long value for encoding
39+
* <li>All numeric types (long, float, double, timestamps) use LITTLE-ENDIAN byte order
4340
* <li>Decimal types use BIG-ENDIAN byte order
4441
* <li>Strings are encoded as UTF-8 bytes
4542
* <li>Timestamps are stored as long values (microseconds since epoch)
4643
* </ul>
4744
*
4845
* <p>Note: This implementation uses Fluss's MemorySegment instead of ByteBuffer for performance,
49-
* but maintains byte-level compatibility with Iceberg's encoding.
46+
* but maintains byte-level compatibility.
5047
*/
5148
class IcebergBinaryRowWriter {
5249

53-
private final int arity;
5450
private byte[] buffer;
5551
private MemorySegment segment;
5652
private int cursor;
5753

5854
public IcebergBinaryRowWriter(int arity) {
59-
this.arity = arity;
6055
// Conservative initial size to avoid frequent resizing
6156
int initialSize = 8 + (arity * 8);
6257
setBuffer(new byte[initialSize]);
6358
reset();
6459
}
6560

6661
public void reset() {
67-
this.cursor = 0;
6862
// Clear only the used portion for efficiency
6963
if (cursor > 0) {
7064
Arrays.fill(buffer, 0, Math.min(cursor, buffer.length), (byte) 0);
7165
}
66+
this.cursor = 0;
7267
}
7368

7469
public byte[] toBytes() {
@@ -77,13 +72,6 @@ public byte[] toBytes() {
7772
return result;
7873
}
7974

80-
public void setNullAt(int pos) {
81-
// For Iceberg key encoding, null values should not occur
82-
// This is validated at the encoder level
83-
throw new UnsupportedOperationException(
84-
"Null values are not supported in Iceberg key encoding");
85-
}
86-
8775
public void writeBoolean(boolean value) {
8876
ensureCapacity(1);
8977
UnsafeUtils.putBoolean(buffer, cursor, value);
@@ -108,6 +96,10 @@ public void writeInt(int value) {
10896
cursor += 4;
10997
}
11098

99+
public void writeIntAsLong(int value) {
100+
writeLong(value);
101+
}
102+
111103
public void writeLong(long value) {
112104
ensureCapacity(8);
113105
UnsafeUtils.putLong(buffer, cursor, value);
@@ -127,26 +119,40 @@ public void writeDouble(double value) {
127119
}
128120

129121
public void writeString(BinaryString value) {
122+
writeString(value, false);
123+
}
124+
125+
public void writeString(BinaryString value, boolean skipEncodeLength) {
130126
// Convert to UTF-8 byte array
131127
byte[] bytes = BinaryString.encodeUTF8(value.toString());
132-
// Write length prefix followed by UTF-8 bytes
133-
writeInt(bytes.length); // 4-byte length prefix
134-
ensureCapacity(bytes.length); // Ensure space for actual string bytes
135-
segment.put(cursor, bytes, 0, bytes.length);
136-
cursor += bytes.length;
128+
writeByteArray(bytes, skipEncodeLength);
137129
}
138130

139131
public void writeBytes(byte[] bytes) {
140-
// Write length prefix followed by binary data
141-
writeInt(bytes.length); // 4-byte length prefix
132+
writeBytes(bytes, false);
133+
}
134+
135+
public void writeBytes(byte[] bytes, boolean skipEncodeLength) {
136+
writeByteArray(bytes, skipEncodeLength);
137+
}
138+
139+
private void writeByteArray(byte[] bytes, boolean skipEncodeLength) {
140+
if (!skipEncodeLength) {
141+
// Write length prefix followed by binary data
142+
writeInt(bytes.length); // 4-byte length prefix
143+
}
142144
ensureCapacity(bytes.length); // Ensure space for actual binary bytes
143145
segment.put(cursor, bytes, 0, bytes.length);
144146
cursor += bytes.length;
145147
}
146148

147-
public void writeDecimal(Decimal value, int precision) {
149+
public void writeDecimal(Decimal value) {
150+
writeDecimal(value, false);
151+
}
152+
153+
public void writeDecimal(Decimal value, boolean skipEncodeLength) {
148154
byte[] unscaled = value.toUnscaledBytes();
149-
writeBytes(unscaled); // Adds 4-byte length prefix before the actual bytes
155+
writeBytes(unscaled, skipEncodeLength); // Adds 4-byte length prefix before the actual bytes
150156
}
151157

152158
private void ensureCapacity(int neededSize) {
@@ -178,7 +184,7 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
178184
switch (fieldType.getTypeRoot()) {
179185
case INTEGER:
180186
case DATE:
181-
return (writer, value) -> writer.writeInt((int) value);
187+
return (writer, value) -> writer.writeIntAsLong((int) value);
182188

183189
case TIME_WITHOUT_TIME_ZONE:
184190
// Write time as microseconds long (milliseconds * 1000)
@@ -199,16 +205,15 @@ public static FieldWriter createFieldWriter(DataType fieldType) {
199205
};
200206

201207
case DECIMAL:
202-
final int decimalPrecision = getPrecision(fieldType);
203-
return (writer, value) -> writer.writeDecimal((Decimal) value, decimalPrecision);
208+
return (writer, value) -> writer.writeDecimal((Decimal) value, true);
204209

205210
case STRING:
206211
case CHAR:
207-
return (writer, value) -> writer.writeString((BinaryString) value);
212+
return (writer, value) -> writer.writeString((BinaryString) value, true);
208213

209214
case BINARY:
210215
case BYTES:
211-
return (writer, value) -> writer.writeBytes((byte[]) value);
216+
return (writer, value) -> writer.writeBytes((byte[]) value, true);
212217

213218
default:
214219
throw new IllegalArgumentException(

0 commit comments

Comments
 (0)