Skip to content

Commit ed1f8fc

Browse files
[common] Introduce MAP type for ARROW, COMPACTED and INDEXED formats
1 parent d1ae5b5 commit ed1f8fc

File tree

64 files changed

+3617
-128
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+3617
-128
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.fluss.record.ProjectionPushdownCache;
3333
import org.apache.fluss.record.TestingSchemaGetter;
3434
import org.apache.fluss.row.GenericArray;
35+
import org.apache.fluss.row.GenericMap;
3536
import org.apache.fluss.row.InternalRow;
3637
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
3738
import org.apache.fluss.types.DataTypes;
@@ -65,6 +66,7 @@
6566
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
6667
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
6768
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
69+
import static org.apache.fluss.row.BinaryString.fromString;
6870
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toByteBuffer;
6971
import static org.apache.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset;
7072
import static org.assertj.core.api.Assertions.assertThat;
@@ -228,19 +230,40 @@ void testComplexTypeFetch() throws Exception {
228230
1,
229231
new String[] {"a", "b"},
230232
new Object[] {new int[] {1, 2}, new int[] {3, 4}},
231-
new Object[] {10, new Object[] {20, "nested"}, "row1"}
233+
new Object[] {10, new Object[] {20, "nested"}, "row1"},
234+
GenericMap.of(1, fromString("one"), 2, fromString("two")),
235+
GenericMap.of(
236+
fromString("k1"),
237+
GenericMap.of(10, fromString("v1"), 20, fromString("v2"))),
238+
GenericMap.of(
239+
fromString("arr1"),
240+
new GenericArray(new int[] {1, 2}),
241+
fromString("arr2"),
242+
new GenericArray(new int[] {3, 4, 5}))
232243
},
233244
new Object[] {
234245
2,
235246
new String[] {"c", null},
236247
new Object[] {null, new int[] {3, 4}},
237-
new Object[] {30, new Object[] {40, "test"}, "row2"}
248+
new Object[] {30, new Object[] {40, "test"}, "row2"},
249+
GenericMap.of(3, null, 4, fromString("four")),
250+
GenericMap.of(fromString("k2"), GenericMap.of(30, fromString("v3"))),
251+
GenericMap.of(fromString("arr3"), new GenericArray(new int[] {6}))
238252
},
239253
new Object[] {
240254
3,
241255
new String[] {"e", "f"},
242256
new Object[] {new int[] {5, 6, 7}, new int[] {8}},
243-
new Object[] {50, new Object[] {60, "value"}, "row3"}
257+
new Object[] {50, new Object[] {60, "value"}, "row3"},
258+
GenericMap.of(5, fromString("five")),
259+
GenericMap.of(
260+
fromString("k3"),
261+
GenericMap.of(50, fromString("v5"), 60, fromString("v6"))),
262+
GenericMap.of(
263+
fromString("arr4"),
264+
new GenericArray(new int[] {7, 8}),
265+
fromString("arr5"),
266+
new GenericArray(new int[] {9}))
244267
});
245268
Schema schema =
246269
Schema.newBuilder()
@@ -253,6 +276,18 @@ void testComplexTypeFetch() throws Exception {
253276
DataTypes.INT(),
254277
DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()),
255278
DataTypes.STRING()))
279+
.column("e", DataTypes.MAP(DataTypes.INT().copy(false), DataTypes.STRING()))
280+
.column(
281+
"f",
282+
DataTypes.MAP(
283+
DataTypes.STRING().copy(false),
284+
DataTypes.MAP(
285+
DataTypes.INT().copy(false), DataTypes.STRING())))
286+
.column(
287+
"g",
288+
DataTypes.MAP(
289+
DataTypes.STRING().copy(false),
290+
DataTypes.ARRAY(DataTypes.INT())))
256291
.build();
257292
TableInfo tableInfo =
258293
TableInfo.of(
@@ -321,6 +356,18 @@ void testComplexTypeFetch() throws Exception {
321356
.isEqualTo(((Object[]) ((Object[]) complexData.get(i)[3])[1])[1]);
322357
assertThat(nestedRow.getString(2).toString())
323358
.isEqualTo(((Object[]) complexData.get(i)[3])[2]);
359+
360+
assertThat(row.getMap(4)).isInstanceOf(GenericMap.class);
361+
GenericMap simpleMap = (GenericMap) row.getMap(4);
362+
assertThat(simpleMap.size()).isGreaterThan(0);
363+
364+
assertThat(row.getMap(5)).isInstanceOf(GenericMap.class);
365+
GenericMap nestedMap = (GenericMap) row.getMap(5);
366+
assertThat(nestedMap.size()).isGreaterThan(0);
367+
368+
assertThat(row.getMap(6)).isInstanceOf(GenericMap.class);
369+
GenericMap mapWithArray = (GenericMap) row.getMap(6);
370+
assertThat(mapWithArray.size()).isGreaterThan(0);
324371
}
325372
}
326373

fluss-common/src/main/java/org/apache/fluss/row/AbstractBinaryWriter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.memory.MemorySegment;
2121
import org.apache.fluss.row.serializer.ArraySerializer;
22+
import org.apache.fluss.row.serializer.MapSerializer;
2223
import org.apache.fluss.row.serializer.RowSerializer;
2324

2425
import java.nio.charset.StandardCharsets;
@@ -89,13 +90,12 @@ public void writeArray(int pos, InternalArray input, ArraySerializer serializer)
8990
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
9091
}
9192

92-
// TODO: Map and Row write methods will be added in Issue #1973
93-
// @Override
94-
// public void writeMap(int pos, InternalMap input, InternalMapSerializer serializer) {
95-
// BinaryMap binary = serializer.toBinaryMap(input);
96-
// writeSegmentsToVarLenPart(
97-
// pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
98-
// }
93+
@Override
94+
public void writeMap(int pos, InternalMap input, MapSerializer serializer) {
95+
BinaryMap binary = serializer.toBinaryMap(input);
96+
writeSegmentsToVarLenPart(
97+
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
98+
}
9999

100100
@Override
101101
public void writeRow(int pos, InternalRow value, RowSerializer serializer) {

fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,11 @@ public InternalArray getArray(int pos) {
271271
/** Creates a nested {@link BinaryArray} with the nested data type information. */
272272
protected abstract BinaryArray createNestedArrayInstance();
273273

274-
// TODO: getMap() will be added in Issue #1973
274+
@Override
275+
public InternalMap getMap(int pos) {
276+
assertIndexIsValid(pos);
277+
return BinarySegmentUtils.readBinaryMap(segments, offset, getLong(pos));
278+
}
275279

276280
@Override
277281
public boolean getBoolean(int pos) {
@@ -602,7 +606,7 @@ public static BinaryArray fromPrimitiveArray(double[] arr) {
602606
private static BinaryArray fromPrimitiveArray(
603607
Object arr, int offset, int length, int elementSize) {
604608
final long headerInBytes = calculateHeaderInBytes(length);
605-
final long valueRegionInBytes = elementSize * length;
609+
final long valueRegionInBytes = (long) elementSize * length;
606610

607611
// must align by 8 bytes
608612
long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8;
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.row;
20+
21+
import org.apache.fluss.annotation.PublicEvolving;
22+
import org.apache.fluss.memory.MemorySegment;
23+
import org.apache.fluss.row.array.PrimitiveBinaryArray;
24+
25+
import java.io.Serializable;
26+
import java.util.function.Supplier;
27+
28+
import static org.apache.fluss.utils.Preconditions.checkArgument;
29+
30+
/**
31+
* Binary implementation of {@link InternalMap} backed by {@link MemorySegment}s.
32+
*
33+
* <p>The binary layout of {@link BinaryMap}:
34+
*
35+
* <pre>
36+
* [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
37+
* </pre>
38+
*
39+
* <p>Influenced by Apache Spark UnsafeMapData.
40+
*
41+
* @since 0.9
42+
*/
43+
@PublicEvolving
44+
public class BinaryMap extends BinarySection implements InternalMap {
45+
46+
private static final long serialVersionUID = 1L;
47+
48+
private transient BinaryArray keys;
49+
private transient BinaryArray values;
50+
51+
private final transient Supplier<BinaryArray> keyArraySupplier;
52+
private final transient Supplier<BinaryArray> valueArraySupplier;
53+
private final transient Supplier<BinaryMap> nestedMapSupplier;
54+
55+
public BinaryMap() {
56+
this(
57+
(Supplier<BinaryArray> & Serializable) PrimitiveBinaryArray::new,
58+
(Supplier<BinaryArray> & Serializable) PrimitiveBinaryArray::new,
59+
(Supplier<BinaryMap> & Serializable) BinaryMap::new);
60+
}
61+
62+
public BinaryMap(
63+
Supplier<BinaryArray> keyArraySupplier,
64+
Supplier<BinaryArray> valueArraySupplier,
65+
Supplier<BinaryMap> nestedMapSupplier) {
66+
this.keyArraySupplier = keyArraySupplier;
67+
this.valueArraySupplier = valueArraySupplier;
68+
this.nestedMapSupplier = nestedMapSupplier;
69+
}
70+
71+
@Override
72+
public int size() {
73+
return keys.size();
74+
}
75+
76+
@Override
77+
public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
78+
// Read the numBytes of key array from the first 4 bytes.
79+
final int keyArrayBytes = BinarySegmentUtils.getInt(segments, offset);
80+
assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes + ") should >= 0";
81+
final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4;
82+
assert valueArrayBytes >= 0 : "valueArraySize (" + valueArrayBytes + ") should >= 0";
83+
84+
// see BinarySection.readObject, on this call stack, keys and values are not initialized
85+
if (keys == null) {
86+
keys = createKeyArrayInstance();
87+
}
88+
keys.pointTo(segments, offset + 4, keyArrayBytes);
89+
if (values == null) {
90+
values = createValueArrayInstance();
91+
}
92+
values.pointTo(segments, offset + 4 + keyArrayBytes, valueArrayBytes);
93+
94+
assert keys.size() == values.size();
95+
96+
this.segments = segments;
97+
this.offset = offset;
98+
this.sizeInBytes = sizeInBytes;
99+
}
100+
101+
/** Creates a {@link BinaryArray} instance for keys with the nested data type information. */
102+
protected BinaryArray createKeyArrayInstance() {
103+
return keyArraySupplier.get();
104+
}
105+
106+
/** Creates a {@link BinaryArray} instance for values with the nested data type information. */
107+
protected BinaryArray createValueArrayInstance() {
108+
return valueArraySupplier.get();
109+
}
110+
111+
/** Creates a nested {@link BinaryMap} with the nested data type information. */
112+
protected BinaryMap createNestedMapInstance() {
113+
return nestedMapSupplier.get();
114+
}
115+
116+
@Override
117+
public BinaryArray keyArray() {
118+
return keys;
119+
}
120+
121+
@Override
122+
public BinaryArray valueArray() {
123+
return values;
124+
}
125+
126+
public BinaryMap copy() {
127+
return copy(createNestedMapInstance());
128+
}
129+
130+
public BinaryMap copy(BinaryMap reuse) {
131+
byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
132+
reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes);
133+
return reuse;
134+
}
135+
136+
@Override
137+
public int hashCode() {
138+
return BinarySegmentUtils.hash(segments, offset, sizeInBytes);
139+
}
140+
141+
public static BinaryMap valueOf(BinaryArray key, BinaryArray value, BinaryMap reuse) {
142+
checkArgument(key.segments.length == 1 && value.getSegments().length == 1);
143+
byte[] bytes = new byte[4 + key.sizeInBytes + value.sizeInBytes];
144+
MemorySegment segment = MemorySegment.wrap(bytes);
145+
segment.putInt(0, key.sizeInBytes);
146+
key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.sizeInBytes);
147+
value.getSegments()[0].copyTo(
148+
value.getOffset(), segment, 4 + key.sizeInBytes, value.sizeInBytes);
149+
reuse.pointTo(segment, 0, bytes.length);
150+
return reuse;
151+
}
152+
}

fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,18 +1002,30 @@ public static BinaryArray readBinaryArray(
10021002
return reusedArray;
10031003
}
10041004

1005+
public static BinaryMap readBinaryMap(
1006+
MemorySegment[] segments, int baseOffset, long offsetAndSize) {
1007+
final int size = ((int) offsetAndSize);
1008+
int offset = (int) (offsetAndSize >> 32);
1009+
BinaryMap map = new BinaryMap();
1010+
map.pointTo(segments, offset + baseOffset, size);
1011+
return map;
1012+
}
1013+
10051014
/** Read map data from segments. */
10061015
public static InternalMap readMap(MemorySegment[] segments, int offset, int numBytes) {
1007-
// TODO: Map type support will be added in Issue #1973
1008-
throw new UnsupportedOperationException(
1009-
"Map type is not supported yet. Will be added in Issue #1973.");
1016+
return readMapData(segments, offset, numBytes);
10101017
}
10111018

10121019
/** Read map data from segments with long offset. */
10131020
public static InternalMap readMap(MemorySegment[] segments, int offset, long numBytes) {
1014-
// TODO: Map type support will be added in Issue #1973
1015-
throw new UnsupportedOperationException(
1016-
"Map type is not supported yet. Will be added in Issue #1973.");
1021+
return readMapData(segments, offset, (int) numBytes);
1022+
}
1023+
1024+
/** Read map data from segments. */
1025+
public static BinaryMap readMapData(MemorySegment[] segments, int offset, int numBytes) {
1026+
BinaryMap map = new BinaryMap();
1027+
map.pointTo(segments, offset, numBytes);
1028+
return map;
10171029
}
10181030

10191031
/** Read aligned row from segments. */

fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.row.BinaryRow.BinaryRowFormat;
2222
import org.apache.fluss.row.serializer.ArraySerializer;
23+
import org.apache.fluss.row.serializer.MapSerializer;
2324
import org.apache.fluss.row.serializer.RowSerializer;
2425
import org.apache.fluss.types.ArrayType;
2526
import org.apache.fluss.types.DataType;
27+
import org.apache.fluss.types.MapType;
2628
import org.apache.fluss.types.RowType;
2729

2830
import javax.annotation.Nullable;
@@ -78,6 +80,8 @@ public interface BinaryWriter {
7880

7981
void writeArray(int pos, InternalArray value, ArraySerializer serializer);
8082

83+
void writeMap(int pos, InternalMap value, MapSerializer serializer);
84+
8185
void writeRow(int pos, InternalRow value, RowSerializer serializer);
8286

8387
/** Finally, complete write to set real size to binary. */
@@ -164,9 +168,11 @@ static BinaryWriter.ValueWriter createNotNullValueWriter(
164168
writer.writeArray(pos, (InternalArray) value, arraySerializer);
165169

166170
case MAP:
167-
// TODO: Map type support will be added in Issue #1973
168-
throw new UnsupportedOperationException(
169-
"Map type is not supported yet. Will be added in Issue #1973.");
171+
MapType mapType = (MapType) elementType;
172+
final MapSerializer mapSerializer =
173+
new MapSerializer(mapType.getKeyType(), mapType.getValueType(), rowFormat);
174+
return (writer, pos, value) ->
175+
writer.writeMap(pos, (InternalMap) value, mapSerializer);
170176
case ROW:
171177
if (rowFormat == null) {
172178
throw new IllegalArgumentException(

fluss-common/src/main/java/org/apache/fluss/row/DataGetters.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ public interface DataGetters {
9595
/** Returns the array value at the given position. */
9696
InternalArray getArray(int pos);
9797

98+
/** Returns the map value at the given position. */
99+
InternalMap getMap(int pos);
100+
98101
/** Returns the row value at the given position. */
99102
InternalRow getRow(int pos, int numFields);
100103
}

fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,10 @@ public InternalArray getArray(int pos) {
218218
return (InternalArray) getObject(pos);
219219
}
220220

221-
// TODO: getMap() will be added in Issue #1973
221+
@Override
222+
public InternalMap getMap(int pos) {
223+
return (InternalMap) getObject(pos);
224+
}
222225

223226
@Override
224227
public InternalRow getRow(int pos, int numFields) {

0 commit comments

Comments
 (0)