Skip to content

Commit 1c65471

Browse files
[common] Introduce MAP type for ARROW, COMPACTED and INDEXED formats
1 parent d1ae5b5 commit 1c65471

File tree

52 files changed

+1327
-102
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1327
-102
lines changed

fluss-common/pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,14 @@
6262
<artifactId>fluss-shaded-arrow</artifactId>
6363
</dependency>
6464

65-
<!-- TODO: these two dependencies need to be shaded. -->
65+
<!-- TODO: these three dependencies need to be shaded. -->
66+
<!-- Use the Arrow compatible version -->
67+
<dependency>
68+
<groupId>org.eclipse.collections</groupId>
69+
<artifactId>eclipse-collections</artifactId>
70+
<version>11.1.0</version>
71+
</dependency>
72+
6673
<dependency>
6774
<groupId>org.lz4</groupId>
6875
<artifactId>lz4-java</artifactId>

fluss-common/src/main/java/org/apache/fluss/row/AbstractBinaryWriter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.memory.MemorySegment;
2121
import org.apache.fluss.row.serializer.ArraySerializer;
22+
import org.apache.fluss.row.serializer.MapSerializer;
2223
import org.apache.fluss.row.serializer.RowSerializer;
2324

2425
import java.nio.charset.StandardCharsets;
@@ -89,13 +90,12 @@ public void writeArray(int pos, InternalArray input, ArraySerializer serializer)
8990
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
9091
}
9192

92-
// TODO: Map and Row write methods will be added in Issue #1973
93-
// @Override
94-
// public void writeMap(int pos, InternalMap input, InternalMapSerializer serializer) {
95-
// BinaryMap binary = serializer.toBinaryMap(input);
96-
// writeSegmentsToVarLenPart(
97-
// pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
98-
// }
93+
@Override
94+
public void writeMap(int pos, InternalMap input, MapSerializer serializer) {
95+
BinaryMap binary = serializer.toBinaryMap(input);
96+
writeSegmentsToVarLenPart(
97+
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
98+
}
9999

100100
@Override
101101
public void writeRow(int pos, InternalRow value, RowSerializer serializer) {

fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,11 @@ public InternalArray getArray(int pos) {
271271
/** Creates a nested {@link BinaryArray} with the nested data type information. */
272272
protected abstract BinaryArray createNestedArrayInstance();
273273

274-
// TODO: getMap() will be added in Issue #1973
274+
@Override
275+
public InternalMap getMap(int pos) {
276+
assertIndexIsValid(pos);
277+
return BinarySegmentUtils.readBinaryMap(segments, offset, getLong(pos));
278+
}
275279

276280
@Override
277281
public boolean getBoolean(int pos) {
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.row;
20+
21+
import org.apache.fluss.annotation.PublicEvolving;
22+
import org.apache.fluss.memory.MemorySegment;
23+
import org.apache.fluss.row.array.PrimitiveBinaryArray;
24+
import org.apache.fluss.types.DataType;
25+
26+
import java.io.Serializable;
27+
import java.util.HashMap;
28+
import java.util.Map;
29+
import java.util.function.Supplier;
30+
31+
import static org.apache.fluss.utils.Preconditions.checkArgument;
32+
33+
/**
34+
* Binary implementation of {@link InternalMap} backed by {@link MemorySegment}s.
35+
*
36+
* <p>The binary layout of {@link BinaryMap}:
37+
*
38+
* <pre>
39+
* [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
40+
* </pre>
41+
*
42+
* <p>Influenced by Apache Spark UnsafeMapData.
43+
*
44+
* @since 0.9
45+
*/
46+
@PublicEvolving
47+
public class BinaryMap extends BinarySection implements InternalMap {
48+
49+
private static final long serialVersionUID = 1L;
50+
51+
private transient BinaryArray keys;
52+
private transient BinaryArray values;
53+
54+
private final transient Supplier<BinaryArray> keyArraySupplier;
55+
private final transient Supplier<BinaryArray> valueArraySupplier;
56+
private final transient Supplier<BinaryMap> nestedMapSupplier;
57+
58+
public BinaryMap() {
59+
this(
60+
(Supplier<BinaryArray> & Serializable) PrimitiveBinaryArray::new,
61+
(Supplier<BinaryArray> & Serializable) PrimitiveBinaryArray::new,
62+
(Supplier<BinaryMap> & Serializable) BinaryMap::new);
63+
}
64+
65+
public BinaryMap(
66+
Supplier<BinaryArray> keyArraySupplier,
67+
Supplier<BinaryArray> valueArraySupplier,
68+
Supplier<BinaryMap> nestedMapSupplier) {
69+
this.keyArraySupplier = keyArraySupplier;
70+
this.valueArraySupplier = valueArraySupplier;
71+
this.nestedMapSupplier = nestedMapSupplier;
72+
}
73+
74+
@Override
75+
public int size() {
76+
return keys.size();
77+
}
78+
79+
@Override
80+
public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
81+
// Read the numBytes of key array from the first 4 bytes.
82+
final int keyArrayBytes = BinarySegmentUtils.getInt(segments, offset);
83+
assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes + ") should >= 0";
84+
final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4;
85+
assert valueArrayBytes >= 0 : "valueArraySize (" + valueArrayBytes + ") should >= 0";
86+
87+
// see BinarySection.readObject, on this call stack, keys and values are not initialized
88+
if (keys == null) {
89+
keys = createKeyArrayInstance();
90+
}
91+
keys.pointTo(segments, offset + 4, keyArrayBytes);
92+
if (values == null) {
93+
values = createValueArrayInstance();
94+
}
95+
values.pointTo(segments, offset + 4 + keyArrayBytes, valueArrayBytes);
96+
97+
assert keys.size() == values.size();
98+
99+
this.segments = segments;
100+
this.offset = offset;
101+
this.sizeInBytes = sizeInBytes;
102+
}
103+
104+
/** Creates a {@link BinaryArray} instance for keys with the nested data type information. */
105+
protected BinaryArray createKeyArrayInstance() {
106+
return keyArraySupplier.get();
107+
}
108+
109+
/** Creates a {@link BinaryArray} instance for values with the nested data type information. */
110+
protected BinaryArray createValueArrayInstance() {
111+
return valueArraySupplier.get();
112+
}
113+
114+
/** Creates a nested {@link BinaryMap} with the nested data type information. */
115+
protected BinaryMap createNestedMapInstance() {
116+
return nestedMapSupplier.get();
117+
}
118+
119+
@Override
120+
public BinaryArray keyArray() {
121+
return keys;
122+
}
123+
124+
@Override
125+
public BinaryArray valueArray() {
126+
return values;
127+
}
128+
129+
public Map<?, ?> toJavaMap(DataType keyType, DataType valueType) {
130+
Object[] keyArray = keys.toObjectArray(keyType);
131+
Object[] valueArray = values.toObjectArray(valueType);
132+
133+
Map<Object, Object> map = new HashMap<>();
134+
for (int i = 0; i < keyArray.length; i++) {
135+
map.put(keyArray[i], valueArray[i]);
136+
}
137+
return map;
138+
}
139+
140+
public BinaryMap copy() {
141+
return copy(createNestedMapInstance());
142+
}
143+
144+
public BinaryMap copy(BinaryMap reuse) {
145+
byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
146+
reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes);
147+
return reuse;
148+
}
149+
150+
@Override
151+
public int hashCode() {
152+
return BinarySegmentUtils.hash(segments, offset, sizeInBytes);
153+
}
154+
155+
public static BinaryMap valueOf(BinaryArray key, BinaryArray value, BinaryMap reuse) {
156+
checkArgument(key.segments.length == 1 && value.getSegments().length == 1);
157+
byte[] bytes = new byte[4 + key.sizeInBytes + value.sizeInBytes];
158+
MemorySegment segment = MemorySegment.wrap(bytes);
159+
segment.putInt(0, key.sizeInBytes);
160+
key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.sizeInBytes);
161+
value.getSegments()[0].copyTo(
162+
value.getOffset(), segment, 4 + key.sizeInBytes, value.sizeInBytes);
163+
reuse.pointTo(segment, 0, bytes.length);
164+
return reuse;
165+
}
166+
}

fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,18 +1002,30 @@ public static BinaryArray readBinaryArray(
10021002
return reusedArray;
10031003
}
10041004

1005+
public static BinaryMap readBinaryMap(
1006+
MemorySegment[] segments, int baseOffset, long offsetAndSize) {
1007+
final int size = ((int) offsetAndSize);
1008+
int offset = (int) (offsetAndSize >> 32);
1009+
BinaryMap map = new BinaryMap();
1010+
map.pointTo(segments, offset + baseOffset, size);
1011+
return map;
1012+
}
1013+
10051014
/** Read map data from segments. */
10061015
public static InternalMap readMap(MemorySegment[] segments, int offset, int numBytes) {
1007-
// TODO: Map type support will be added in Issue #1973
1008-
throw new UnsupportedOperationException(
1009-
"Map type is not supported yet. Will be added in Issue #1973.");
1016+
return readMapData(segments, offset, numBytes);
10101017
}
10111018

10121019
/** Read map data from segments with long offset. */
10131020
public static InternalMap readMap(MemorySegment[] segments, int offset, long numBytes) {
1014-
// TODO: Map type support will be added in Issue #1973
1015-
throw new UnsupportedOperationException(
1016-
"Map type is not supported yet. Will be added in Issue #1973.");
1021+
return readMapData(segments, offset, (int) numBytes);
1022+
}
1023+
1024+
/** Read map data from segments. */
1025+
public static BinaryMap readMapData(MemorySegment[] segments, int offset, int numBytes) {
1026+
BinaryMap map = new BinaryMap();
1027+
map.pointTo(segments, offset, numBytes);
1028+
return map;
10171029
}
10181030

10191031
/** Read aligned row from segments. */

fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.row.BinaryRow.BinaryRowFormat;
2222
import org.apache.fluss.row.serializer.ArraySerializer;
23+
import org.apache.fluss.row.serializer.MapSerializer;
2324
import org.apache.fluss.row.serializer.RowSerializer;
2425
import org.apache.fluss.types.ArrayType;
2526
import org.apache.fluss.types.DataType;
27+
import org.apache.fluss.types.MapType;
2628
import org.apache.fluss.types.RowType;
2729

2830
import javax.annotation.Nullable;
@@ -78,6 +80,8 @@ public interface BinaryWriter {
7880

7981
void writeArray(int pos, InternalArray value, ArraySerializer serializer);
8082

83+
void writeMap(int pos, InternalMap value, MapSerializer serializer);
84+
8185
void writeRow(int pos, InternalRow value, RowSerializer serializer);
8286

8387
/** Finally, complete write to set real size to binary. */
@@ -164,9 +168,11 @@ static BinaryWriter.ValueWriter createNotNullValueWriter(
164168
writer.writeArray(pos, (InternalArray) value, arraySerializer);
165169

166170
case MAP:
167-
// TODO: Map type support will be added in Issue #1973
168-
throw new UnsupportedOperationException(
169-
"Map type is not supported yet. Will be added in Issue #1973.");
171+
MapType mapType = (MapType) elementType;
172+
final MapSerializer mapSerializer =
173+
new MapSerializer(mapType.getKeyType(), mapType.getValueType(), rowFormat);
174+
return (writer, pos, value) ->
175+
writer.writeMap(pos, (InternalMap) value, mapSerializer);
170176
case ROW:
171177
if (rowFormat == null) {
172178
throw new IllegalArgumentException(

fluss-common/src/main/java/org/apache/fluss/row/DataGetters.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ public interface DataGetters {
9595
/** Returns the array value at the given position. */
9696
InternalArray getArray(int pos);
9797

98+
/** Returns the map value at the given position. */
99+
InternalMap getMap(int pos);
100+
98101
/** Returns the row value at the given position. */
99102
InternalRow getRow(int pos, int numFields);
100103
}

fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,10 @@ public InternalArray getArray(int pos) {
218218
return (InternalArray) getObject(pos);
219219
}
220220

221-
// TODO: getMap() will be added in Issue #1973
221+
@Override
222+
public InternalMap getMap(int pos) {
223+
return (InternalMap) getObject(pos);
224+
}
222225

223226
@Override
224227
public InternalRow getRow(int pos, int numFields) {

0 commit comments

Comments
 (0)