Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.fluss.record.ProjectionPushdownCache;
import org.apache.fluss.record.TestingSchemaGetter;
import org.apache.fluss.row.GenericArray;
import org.apache.fluss.row.GenericMap;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
import org.apache.fluss.types.DataTypes;
Expand Down Expand Up @@ -65,6 +66,7 @@
import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static org.apache.fluss.row.BinaryString.fromString;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toByteBuffer;
import static org.apache.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -228,19 +230,40 @@ void testComplexTypeFetch() throws Exception {
1,
new String[] {"a", "b"},
new Object[] {new int[] {1, 2}, new int[] {3, 4}},
new Object[] {10, new Object[] {20, "nested"}, "row1"}
new Object[] {10, new Object[] {20, "nested"}, "row1"},
GenericMap.of(1, fromString("one"), 2, fromString("two")),
GenericMap.of(
fromString("k1"),
GenericMap.of(10, fromString("v1"), 20, fromString("v2"))),
GenericMap.of(
fromString("arr1"),
new GenericArray(new int[] {1, 2}),
fromString("arr2"),
new GenericArray(new int[] {3, 4, 5}))
},
new Object[] {
2,
new String[] {"c", null},
new Object[] {null, new int[] {3, 4}},
new Object[] {30, new Object[] {40, "test"}, "row2"}
new Object[] {30, new Object[] {40, "test"}, "row2"},
GenericMap.of(3, null, 4, fromString("four")),
GenericMap.of(fromString("k2"), GenericMap.of(30, fromString("v3"))),
GenericMap.of(fromString("arr3"), new GenericArray(new int[] {6}))
},
new Object[] {
3,
new String[] {"e", "f"},
new Object[] {new int[] {5, 6, 7}, new int[] {8}},
new Object[] {50, new Object[] {60, "value"}, "row3"}
new Object[] {50, new Object[] {60, "value"}, "row3"},
GenericMap.of(5, fromString("five")),
GenericMap.of(
fromString("k3"),
GenericMap.of(50, fromString("v5"), 60, fromString("v6"))),
GenericMap.of(
fromString("arr4"),
new GenericArray(new int[] {7, 8}),
fromString("arr5"),
new GenericArray(new int[] {9}))
});
Schema schema =
Schema.newBuilder()
Expand All @@ -253,6 +276,18 @@ void testComplexTypeFetch() throws Exception {
DataTypes.INT(),
DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()),
DataTypes.STRING()))
.column("e", DataTypes.MAP(DataTypes.INT().copy(false), DataTypes.STRING()))
.column(
"f",
DataTypes.MAP(
DataTypes.STRING().copy(false),
DataTypes.MAP(
DataTypes.INT().copy(false), DataTypes.STRING())))
.column(
"g",
DataTypes.MAP(
DataTypes.STRING().copy(false),
DataTypes.ARRAY(DataTypes.INT())))
.build();
TableInfo tableInfo =
TableInfo.of(
Expand Down Expand Up @@ -321,6 +356,18 @@ void testComplexTypeFetch() throws Exception {
.isEqualTo(((Object[]) ((Object[]) complexData.get(i)[3])[1])[1]);
assertThat(nestedRow.getString(2).toString())
.isEqualTo(((Object[]) complexData.get(i)[3])[2]);

assertThat(row.getMap(4)).isInstanceOf(GenericMap.class);
GenericMap simpleMap = (GenericMap) row.getMap(4);
assertThat(simpleMap.size()).isGreaterThan(0);

assertThat(row.getMap(5)).isInstanceOf(GenericMap.class);
GenericMap nestedMap = (GenericMap) row.getMap(5);
assertThat(nestedMap.size()).isGreaterThan(0);

assertThat(row.getMap(6)).isInstanceOf(GenericMap.class);
GenericMap mapWithArray = (GenericMap) row.getMap(6);
assertThat(mapWithArray.size()).isGreaterThan(0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.row.serializer.ArraySerializer;
import org.apache.fluss.row.serializer.MapSerializer;
import org.apache.fluss.row.serializer.RowSerializer;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -89,13 +90,12 @@ public void writeArray(int pos, InternalArray input, ArraySerializer serializer)
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}

// TODO: Map and Row write methods will be added in Issue #1973
// @Override
// public void writeMap(int pos, InternalMap input, InternalMapSerializer serializer) {
// BinaryMap binary = serializer.toBinaryMap(input);
// writeSegmentsToVarLenPart(
// pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
// }
@Override
public void writeMap(int pos, InternalMap input, MapSerializer serializer) {
BinaryMap binary = serializer.toBinaryMap(input);
writeSegmentsToVarLenPart(
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}

@Override
public void writeRow(int pos, InternalRow value, RowSerializer serializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ public InternalArray getArray(int pos) {
/** Creates a nested {@link BinaryArray} with the nested data type information. */
protected abstract BinaryArray createNestedArrayInstance();

// TODO: getMap() will be added in Issue #1973
@Override
public InternalMap getMap(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.readBinaryMap(segments, offset, getLong(pos));
}

@Override
public boolean getBoolean(int pos) {
Expand Down Expand Up @@ -602,7 +606,7 @@ public static BinaryArray fromPrimitiveArray(double[] arr) {
private static BinaryArray fromPrimitiveArray(
Object arr, int offset, int length, int elementSize) {
final long headerInBytes = calculateHeaderInBytes(length);
final long valueRegionInBytes = elementSize * length;
final long valueRegionInBytes = ((long) elementSize) * length;

// must align by 8 bytes
long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8;
Expand Down
166 changes: 166 additions & 0 deletions fluss-common/src/main/java/org/apache/fluss/row/BinaryMap.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.row;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.row.array.PrimitiveBinaryArray;

import java.io.Serializable;
import java.util.function.Supplier;

import static org.apache.fluss.utils.Preconditions.checkArgument;

/**
* Binary implementation of {@link InternalMap} backed by {@link MemorySegment}s.
*
* <p>The binary layout of {@link BinaryMap}:
*
* <pre>
* [4 byte(keyArray size in bytes)] + [Key BinaryArray] + [Value BinaryArray].
* </pre>
*
* <p>Influenced by Apache Spark UnsafeMapData.
*
* @since 0.9
*/
@PublicEvolving
public class BinaryMap extends BinarySection implements InternalMap {
Copy link

Copilot AI Dec 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class BinaryMap overrides hashCode but not equals.

Copilot uses AI. Check for mistakes.

private static final long serialVersionUID = 1L;

private transient BinaryArray keys;
private transient BinaryArray values;

private final transient Supplier<BinaryArray> keyArraySupplier;
private final transient Supplier<BinaryArray> valueArraySupplier;
private final transient Supplier<BinaryMap> nestedMapSupplier;

public BinaryMap() {
this(
(Supplier<BinaryArray> & Serializable) PrimitiveBinaryArray::new,
(Supplier<BinaryArray> & Serializable) PrimitiveBinaryArray::new,
(Supplier<BinaryMap> & Serializable) BinaryMap::new);
}

public BinaryMap(
Supplier<BinaryArray> keyArraySupplier,
Supplier<BinaryArray> valueArraySupplier,
Supplier<BinaryMap> nestedMapSupplier) {
this.keyArraySupplier = keyArraySupplier;
this.valueArraySupplier = valueArraySupplier;
this.nestedMapSupplier = nestedMapSupplier;
}

@Override
public int size() {
return keys.size();
}

@Override
public void pointTo(MemorySegment[] segments, int offset, int sizeInBytes) {
// Read the numBytes of key array from the first 4 bytes.
final int keyArrayBytes = BinarySegmentUtils.getInt(segments, offset);
assert keyArrayBytes >= 0 : "keyArraySize (" + keyArrayBytes + ") should >= 0";
final int valueArrayBytes = sizeInBytes - keyArrayBytes - 4;
assert valueArrayBytes >= 0 : "valueArraySize (" + valueArrayBytes + ") should >= 0";

// see BinarySection.readObject, on this call stack, keys and values are not initialized
if (keys == null) {
keys = createKeyArrayInstance();
}
keys.pointTo(segments, offset + 4, keyArrayBytes);
if (values == null) {
values = createValueArrayInstance();
}
values.pointTo(segments, offset + 4 + keyArrayBytes, valueArrayBytes);

assert keys.size() == values.size();

this.segments = segments;
this.offset = offset;
this.sizeInBytes = sizeInBytes;
}

/** Creates a {@link BinaryArray} instance for keys with the nested data type information. */
protected BinaryArray createKeyArrayInstance() {
return keyArraySupplier.get();
}

/** Creates a {@link BinaryArray} instance for values with the nested data type information. */
protected BinaryArray createValueArrayInstance() {
return valueArraySupplier.get();
}

/** Creates a nested {@link BinaryMap} with the nested data type information. */
protected BinaryMap createNestedMapInstance() {
return nestedMapSupplier.get();
}

@Override
public BinaryArray keyArray() {
return keys;
}

@Override
public BinaryArray valueArray() {
return values;
}

public BinaryMap copy() {
return copy(createNestedMapInstance());
}

public BinaryMap copy(BinaryMap reuse) {
byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes);
return reuse;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof BinaryMap)) {
return false;
}
final BinarySection that = (BinarySection) o;
return sizeInBytes == that.sizeInBytes
&& BinarySegmentUtils.equals(
segments, offset, that.segments, that.offset, sizeInBytes);
}

@Override
public int hashCode() {
return BinarySegmentUtils.hash(segments, offset, sizeInBytes);
}

public static BinaryMap valueOf(BinaryArray key, BinaryArray value, BinaryMap reuse) {
checkArgument(key.segments.length == 1 && value.getSegments().length == 1);
byte[] bytes = new byte[4 + key.sizeInBytes + value.sizeInBytes];
MemorySegment segment = MemorySegment.wrap(bytes);
segment.putInt(0, key.sizeInBytes);
key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.sizeInBytes);
value.getSegments()[0].copyTo(
value.getOffset(), segment, 4 + key.sizeInBytes, value.sizeInBytes);
reuse.pointTo(segment, 0, bytes.length);
return reuse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1002,18 +1002,30 @@ public static BinaryArray readBinaryArray(
return reusedArray;
}

public static BinaryMap readBinaryMap(
MemorySegment[] segments, int baseOffset, long offsetAndSize) {
final int size = ((int) offsetAndSize);
int offset = (int) (offsetAndSize >> 32);
BinaryMap map = new BinaryMap();
map.pointTo(segments, offset + baseOffset, size);
return map;
}

/** Read map data from segments. */
public static InternalMap readMap(MemorySegment[] segments, int offset, int numBytes) {
// TODO: Map type support will be added in Issue #1973
throw new UnsupportedOperationException(
"Map type is not supported yet. Will be added in Issue #1973.");
return readMapData(segments, offset, numBytes);
}

/** Read map data from segments with long offset. */
public static InternalMap readMap(MemorySegment[] segments, int offset, long numBytes) {
// TODO: Map type support will be added in Issue #1973
throw new UnsupportedOperationException(
"Map type is not supported yet. Will be added in Issue #1973.");
return readMapData(segments, offset, (int) numBytes);
}

/** Read map data from segments. */
public static BinaryMap readMapData(MemorySegment[] segments, int offset, int numBytes) {
BinaryMap map = new BinaryMap();
map.pointTo(segments, offset, numBytes);
return map;
}

/** Read aligned row from segments. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.row.BinaryRow.BinaryRowFormat;
import org.apache.fluss.row.serializer.ArraySerializer;
import org.apache.fluss.row.serializer.MapSerializer;
import org.apache.fluss.row.serializer.RowSerializer;
import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.MapType;
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -78,6 +80,8 @@ public interface BinaryWriter {

void writeArray(int pos, InternalArray value, ArraySerializer serializer);

void writeMap(int pos, InternalMap value, MapSerializer serializer);

void writeRow(int pos, InternalRow value, RowSerializer serializer);

/** Finally, complete write to set real size to binary. */
Expand Down Expand Up @@ -164,9 +168,11 @@ static BinaryWriter.ValueWriter createNotNullValueWriter(
writer.writeArray(pos, (InternalArray) value, arraySerializer);

case MAP:
// TODO: Map type support will be added in Issue #1973
throw new UnsupportedOperationException(
"Map type is not supported yet. Will be added in Issue #1973.");
MapType mapType = (MapType) elementType;
final MapSerializer mapSerializer =
new MapSerializer(mapType.getKeyType(), mapType.getValueType(), rowFormat);
return (writer, pos, value) ->
writer.writeMap(pos, (InternalMap) value, mapSerializer);
case ROW:
if (rowFormat == null) {
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public interface DataGetters {
/** Returns the array value at the given position. */
InternalArray getArray(int pos);

/** Returns the map value at the given position. */
InternalMap getMap(int pos);

/** Returns the row value at the given position. */
InternalRow getRow(int pos, int numFields);
}
Loading