Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,21 +227,32 @@ void testComplexTypeFetch() throws Exception {
new Object[] {
1,
new String[] {"a", "b"},
new Object[] {new int[] {1, 2}, new int[] {3, 4}}
new Object[] {new int[] {1, 2}, new int[] {3, 4}},
new Object[] {10, new Object[] {20, "nested"}, "row1"}
},
new Object[] {
2, new String[] {"c", null}, new Object[] {null, new int[] {3, 4}}
2,
new String[] {"c", null},
new Object[] {null, new int[] {3, 4}},
new Object[] {30, new Object[] {40, "test"}, "row2"}
},
new Object[] {
3,
new String[] {"e", "f"},
new Object[] {new int[] {5, 6, 7}, new int[] {8}}
new Object[] {new int[] {5, 6, 7}, new int[] {8}},
new Object[] {50, new Object[] {60, "value"}, "row3"}
});
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.ARRAY(DataTypes.STRING()))
.column("c", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
.column(
"d",
DataTypes.ROW(
DataTypes.INT(),
DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()),
DataTypes.STRING()))
.build();
TableInfo tableInfo =
TableInfo.of(
Expand Down Expand Up @@ -299,6 +310,17 @@ void testComplexTypeFetch() throws Exception {
.isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[1]));
assertThat(row.getArray(2).toString())
.isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[2]));
InternalRow nestedRow = row.getRow(3, 3);
assertThat(nestedRow).isNotNull();
assertThat(nestedRow.getInt(0)).isEqualTo(((Object[]) complexData.get(i)[3])[0]);
InternalRow deeplyNestedRow = nestedRow.getRow(1, 2);
assertThat(deeplyNestedRow).isNotNull();
assertThat(deeplyNestedRow.getInt(0))
.isEqualTo(((Object[]) ((Object[]) complexData.get(i)[3])[1])[0]);
assertThat(deeplyNestedRow.getString(1).toString())
.isEqualTo(((Object[]) ((Object[]) complexData.get(i)[3])[1])[1]);
assertThat(nestedRow.getString(2).toString())
.isEqualTo(((Object[]) complexData.get(i)[3])[2]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.row.serializer.ArraySerializer;
import org.apache.fluss.row.serializer.RowSerializer;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
Expand Down Expand Up @@ -88,20 +89,20 @@ public void writeArray(int pos, InternalArray input, ArraySerializer serializer)
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}

// TODO: Map and Row write methods will be added in Issue #1973 and #1974
// TODO: Map and Row write methods will be added in Issue #1973
// @Override
// public void writeMap(int pos, InternalMap input, InternalMapSerializer serializer) {
// BinaryMap binary = serializer.toBinaryMap(input);
// writeSegmentsToVarLenPart(
// pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
// }
//
// @Override
// public void writeRow(int pos, InternalRow value, InternalRowSerializer serializer) {
// BinaryRow binary = serializer.toBinaryRow(value);
// writeSegmentsToVarLenPart(
// pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
// }

@Override
public void writeRow(int pos, InternalRow value, RowSerializer serializer) {
BinaryRow binary = serializer.toBinaryRow(value);
writeSegmentsToVarLenPart(
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
}

@Override
public void writeChar(int pos, BinaryString value, int length) {
Expand Down
45 changes: 26 additions & 19 deletions fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.row.array.PrimitiveBinaryArray;
import org.apache.fluss.types.DataType;

import java.lang.reflect.Array;
Expand Down Expand Up @@ -48,7 +49,7 @@
* @since 0.9
*/
@PublicEvolving
public final class BinaryArray extends BinarySection
public abstract class BinaryArray extends BinarySection
implements InternalArray, MemoryAwareGetters, DataSetters {

private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -111,9 +112,7 @@ public static int calculateFixLengthPartSize(DataType type) {
/** The position to start storing array elements. */
private transient int elementOffset;

public BinaryArray() {}

private void assertIndexIsValid(int ordinal) {
protected void assertIndexIsValid(int ordinal) {
assert ordinal >= 0 : "ordinal (" + ordinal + ") should >= 0";
assert ordinal < size : "ordinal (" + ordinal + ") should < " + size;
}
Expand Down Expand Up @@ -265,11 +264,14 @@ public TimestampLtz getTimestampLtz(int pos, int precision) {
@Override
public InternalArray getArray(int pos) {
assertIndexIsValid(pos);
return BinarySegmentUtils.readBinaryArray(segments, offset, getLong(pos));
return BinarySegmentUtils.readBinaryArray(
segments, offset, getLong(pos), createNestedArrayInstance());
}

/** Creates a nested {@link BinaryArray} with the nested data type information. */
protected abstract BinaryArray createNestedArrayInstance();

// TODO: getMap() will be added in Issue #1973
// TODO: getRow() will be added in Issue #1974

@Override
public boolean getBoolean(int pos) {
Expand Down Expand Up @@ -545,21 +547,26 @@ public <T> T[] toObjectArray(DataType elementType) {
return values;
}

public BinaryArray copy() {
return copy(new BinaryArray());
}

public BinaryArray copy(BinaryArray reuse) {
byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes);
return reuse;
}

@Override
public int hashCode() {
return BinarySegmentUtils.hash(segments, offset, sizeInBytes);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
// override equals and only checks the other object is instance of BinaryArray
if (!(o instanceof BinaryArray)) {
return false;
}
final BinarySection that = (BinarySection) o;
return sizeInBytes == that.sizeInBytes
&& BinarySegmentUtils.equals(
segments, offset, that.segments, that.offset, sizeInBytes);
}

// ------------------------------------------------------------------------------------------
// Construction Utilities
// ------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -611,13 +618,13 @@ private static BinaryArray fromPrimitiveArray(
UNSAFE.copyMemory(
arr, offset, data, BYTE_ARRAY_BASE_OFFSET + headerInBytes, valueRegionInBytes);

BinaryArray result = new BinaryArray();
BinaryArray result = new PrimitiveBinaryArray();
result.pointTo(MemorySegment.wrap(data), 0, (int) totalSize);
return result;
}

public static BinaryArray fromLongArray(Long[] arr) {
BinaryArray array = new BinaryArray();
BinaryArray array = new PrimitiveBinaryArray();
BinaryArrayWriter writer = new BinaryArrayWriter(array, arr.length, 8);
for (int i = 0; i < arr.length; i++) {
Long v = arr[i];
Expand All @@ -636,7 +643,7 @@ public static BinaryArray fromLongArray(InternalArray arr) {
return (BinaryArray) arr;
}

BinaryArray array = new BinaryArray();
BinaryArray array = new PrimitiveBinaryArray();
BinaryArrayWriter writer = new BinaryArrayWriter(array, arr.size(), 8);
for (int i = 0; i < arr.size(); i++) {
if (arr.isNullAt(i)) {
Expand Down
19 changes: 19 additions & 0 deletions fluss-common/src/main/java/org/apache/fluss/row/BinaryRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.fluss.row;

import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.row.aligned.AlignedRow;
import org.apache.fluss.row.compacted.CompactedRow;
import org.apache.fluss.row.indexed.IndexedRow;

/**
* A binary format {@link InternalRow} that is backed on {@link MemorySegment} and supports all
Expand Down Expand Up @@ -57,4 +60,20 @@ public interface BinaryRow extends InternalRow, MemoryAwareGetters {
* @param sizeInBytes The size of the row.
*/
void pointTo(MemorySegment[] segments, int offset, int sizeInBytes);

/**
* The binary row format types, it indicates the generated {@link BinaryRow} type by the {@link
* BinaryWriter}.
*/
enum BinaryRowFormat {

/** Compacted binary row format, see {@link CompactedRow}. */
COMPACTED,

/** Aligned binary row format, see {@link AlignedRow}. */
ALIGNED,

/** Indexed binary row format, see {@link IndexedRow}. */
INDEXED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.fluss.annotation.Internal;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.memory.OutputView;
import org.apache.fluss.row.aligned.AlignedRow;
import org.apache.fluss.row.compacted.CompactedRow;
import org.apache.fluss.row.indexed.IndexedRow;
import org.apache.fluss.types.DataType;
Expand Down Expand Up @@ -989,14 +990,16 @@ public static Decimal readDecimal(
return Decimal.fromUnscaledBytes(bytes, precision, scale);
}

/** Gets an instance of {@link InternalArray} from underlying {@link MemorySegment}. */
/**
* Read the array data into the reused {@link BinaryArray} instance from underlying {@link
* MemorySegment}.
*/
public static BinaryArray readBinaryArray(
MemorySegment[] segments, int baseOffset, long offsetAndSize) {
MemorySegment[] segments, int baseOffset, long offsetAndSize, BinaryArray reusedArray) {
final int size = ((int) offsetAndSize);
int offset = (int) (offsetAndSize >> 32);
BinaryArray array = new BinaryArray();
array.pointTo(segments, offset + baseOffset, size);
return array;
reusedArray.pointTo(segments, offset + baseOffset, size);
return reusedArray;
}

/** Read map data from segments. */
Expand All @@ -1013,27 +1016,33 @@ public static InternalMap readMap(MemorySegment[] segments, int offset, long num
"Map type is not supported yet. Will be added in Issue #1973.");
}

/** Read indexed row data from segments. */
/** Read aligned row from segments. */
public static InternalRow readAlignedRow(
MemorySegment[] segments, int baseOffset, long offsetAndSize, int numFields) {
final int size = ((int) offsetAndSize);
int offset = (int) (offsetAndSize >> 32);
AlignedRow row = new AlignedRow(numFields);
row.pointTo(segments, baseOffset + offset, size);
return row;
}

/** Read indexed row from segments. */
public static InternalRow readIndexedRow(
MemorySegment[] segments,
int offset,
int numBytes,
int numFields,
DataType[] fieldTypes) {
MemorySegment[] segments, int baseOffset, long offsetAndSize, DataType[] fieldTypes) {
final int size = ((int) offsetAndSize);
int offset = (int) (offsetAndSize >> 32);
IndexedRow row = new IndexedRow(fieldTypes);
row.pointTo(segments, offset, numBytes);
row.pointTo(segments, baseOffset + offset, size);
return row;
}

/** Read compacted row data from segments. */
/** Read compacted row from segments. */
public static InternalRow readCompactedRow(
MemorySegment[] segments,
int offset,
int numBytes,
int numFields,
DataType[] fieldTypes) {
MemorySegment[] segments, int baseOffset, long offsetAndSize, DataType[] fieldTypes) {
final int size = ((int) offsetAndSize);
int offset = (int) (offsetAndSize >> 32);
CompactedRow row = new CompactedRow(fieldTypes);
row.pointTo(segments, offset, numBytes);
row.pointTo(segments, baseOffset + offset, size);
return row;
}

Expand Down
34 changes: 24 additions & 10 deletions fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
package org.apache.fluss.row;

import org.apache.fluss.annotation.PublicEvolving;
import org.apache.fluss.row.BinaryRow.BinaryRowFormat;
import org.apache.fluss.row.serializer.ArraySerializer;
import org.apache.fluss.row.serializer.RowSerializer;
import org.apache.fluss.types.ArrayType;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;

import java.io.Serializable;

Expand Down Expand Up @@ -73,9 +78,7 @@ public interface BinaryWriter {

void writeArray(int pos, InternalArray value, ArraySerializer serializer);

// TODO: Map and Row write methods will be added in Issue #1973 and #1974
// void writeMap(int pos, InternalMap value, InternalMapSerializer serializer);
// void writeRow(int pos, InternalRow value, InternalRowSerializer serializer);
void writeRow(int pos, InternalRow value, RowSerializer serializer);

/** Finally, complete write to set real size to binary. */
void complete();
Expand All @@ -87,9 +90,12 @@ public interface BinaryWriter {
* Creates an accessor for setting the elements of a binary writer during runtime.
*
* @param elementType the element type
* @param rowFormat the binary row format, it is required when the element type has nested row
* type, otherwise, {@link IllegalArgumentException} will be thrown.
*/
static BinaryWriter.ValueWriter createValueWriter(DataType elementType) {
BinaryWriter.ValueWriter valueWriter = createNotNullValueWriter(elementType);
static BinaryWriter.ValueWriter createValueWriter(
DataType elementType, BinaryRowFormat rowFormat) {
BinaryWriter.ValueWriter valueWriter = createNotNullValueWriter(elementType, rowFormat);
if (!elementType.isNullable()) {
return valueWriter;
}
Expand All @@ -108,7 +114,8 @@ static BinaryWriter.ValueWriter createValueWriter(DataType elementType) {
*
* @param elementType the element type
*/
static BinaryWriter.ValueWriter createNotNullValueWriter(DataType elementType) {
static BinaryWriter.ValueWriter createNotNullValueWriter(
DataType elementType, @Nullable BinaryRowFormat rowFormat) {
switch (elementType.getTypeRoot()) {
case CHAR:
int charLength = getLength(elementType);
Expand Down Expand Up @@ -152,7 +159,7 @@ static BinaryWriter.ValueWriter createNotNullValueWriter(DataType elementType) {
writer.writeTimestampLtz(pos, (TimestampLtz) value, timestampLtzPrecision);
case ARRAY:
final ArraySerializer arraySerializer =
new ArraySerializer(((ArrayType) elementType).getElementType());
new ArraySerializer(((ArrayType) elementType).getElementType(), rowFormat);
return (writer, pos, value) ->
writer.writeArray(pos, (InternalArray) value, arraySerializer);

Expand All @@ -161,9 +168,16 @@ static BinaryWriter.ValueWriter createNotNullValueWriter(DataType elementType) {
throw new UnsupportedOperationException(
"Map type is not supported yet. Will be added in Issue #1973.");
case ROW:
// TODO: Row type support will be added in Issue #1974
throw new UnsupportedOperationException(
"Row type is not supported yet. Will be added in Issue #1974.");
if (rowFormat == null) {
throw new IllegalArgumentException(
"Binary row format is required to write row.");
}
final RowType rowType = (RowType) elementType;
final RowSerializer rowSerializer =
new RowSerializer(
rowType.getFieldTypes().toArray(new DataType[0]), rowFormat);
return (writer, pos, value) ->
writer.writeRow(pos, (InternalRow) value, rowSerializer);
default:
String msg =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,7 @@ public interface DataGetters {

/** Returns the array value at the given position. */
InternalArray getArray(int pos);

/** Returns the row value at the given position. */
InternalRow getRow(int pos, int numFields);
}
Loading