Skip to content

Commit 0d29df6

Browse files
committed
use the same row format in nested row (like, row<array<row>>)
1 parent 9380fcf commit 0d29df6

31 files changed

+675
-430
lines changed

fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.memory.MemorySegment;
22+
import org.apache.fluss.row.array.PrimitiveBinaryArray;
2223
import org.apache.fluss.types.DataType;
2324

2425
import java.lang.reflect.Array;
@@ -48,7 +49,7 @@
4849
* @since 0.9
4950
*/
5051
@PublicEvolving
51-
public final class BinaryArray extends BinarySection
52+
public abstract class BinaryArray extends BinarySection
5253
implements InternalArray, MemoryAwareGetters, DataSetters {
5354

5455
private static final long serialVersionUID = 1L;
@@ -111,9 +112,7 @@ public static int calculateFixLengthPartSize(DataType type) {
111112
/** The position to start storing array elements. */
112113
private transient int elementOffset;
113114

114-
public BinaryArray() {}
115-
116-
private void assertIndexIsValid(int ordinal) {
115+
protected void assertIndexIsValid(int ordinal) {
117116
assert ordinal >= 0 : "ordinal (" + ordinal + ") should >= 0";
118117
assert ordinal < size : "ordinal (" + ordinal + ") should < " + size;
119118
}
@@ -265,16 +264,14 @@ public TimestampLtz getTimestampLtz(int pos, int precision) {
265264
@Override
266265
public InternalArray getArray(int pos) {
267266
assertIndexIsValid(pos);
268-
return BinarySegmentUtils.readBinaryArray(segments, offset, getLong(pos));
267+
return BinarySegmentUtils.readBinaryArray(
268+
segments, offset, getLong(pos), createNestedArrayInstance());
269269
}
270270

271-
// TODO: getMap() will be added in Issue #1973
271+
/** Creates a nested {@link BinaryArray} with the nested data type information. */
272+
protected abstract BinaryArray createNestedArrayInstance();
272273

273-
@Override
274-
public InternalRow getRow(int pos, int numFields) {
275-
assertIndexIsValid(pos);
276-
return BinarySegmentUtils.readBinaryRow(segments, offset, numFields, getLong(pos));
277-
}
274+
// TODO: getMap() will be added in Issue #1973
278275

279276
@Override
280277
public boolean getBoolean(int pos) {
@@ -550,21 +547,26 @@ public <T> T[] toObjectArray(DataType elementType) {
550547
return values;
551548
}
552549

553-
public BinaryArray copy() {
554-
return copy(new BinaryArray());
555-
}
556-
557-
public BinaryArray copy(BinaryArray reuse) {
558-
byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
559-
reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes);
560-
return reuse;
561-
}
562-
563550
@Override
564551
public int hashCode() {
565552
return BinarySegmentUtils.hash(segments, offset, sizeInBytes);
566553
}
567554

555+
@Override
556+
public boolean equals(Object o) {
557+
if (this == o) {
558+
return true;
559+
}
560+
// override equals and only checks the other object is instance of BinaryArray
561+
if (!(o instanceof BinaryArray)) {
562+
return false;
563+
}
564+
final BinarySection that = (BinarySection) o;
565+
return sizeInBytes == that.sizeInBytes
566+
&& BinarySegmentUtils.equals(
567+
segments, offset, that.segments, that.offset, sizeInBytes);
568+
}
569+
568570
// ------------------------------------------------------------------------------------------
569571
// Construction Utilities
570572
// ------------------------------------------------------------------------------------------
@@ -616,13 +618,13 @@ private static BinaryArray fromPrimitiveArray(
616618
UNSAFE.copyMemory(
617619
arr, offset, data, BYTE_ARRAY_BASE_OFFSET + headerInBytes, valueRegionInBytes);
618620

619-
BinaryArray result = new BinaryArray();
621+
BinaryArray result = new PrimitiveBinaryArray();
620622
result.pointTo(MemorySegment.wrap(data), 0, (int) totalSize);
621623
return result;
622624
}
623625

624626
public static BinaryArray fromLongArray(Long[] arr) {
625-
BinaryArray array = new BinaryArray();
627+
BinaryArray array = new PrimitiveBinaryArray();
626628
BinaryArrayWriter writer = new BinaryArrayWriter(array, arr.length, 8);
627629
for (int i = 0; i < arr.length; i++) {
628630
Long v = arr[i];
@@ -641,7 +643,7 @@ public static BinaryArray fromLongArray(InternalArray arr) {
641643
return (BinaryArray) arr;
642644
}
643645

644-
BinaryArray array = new BinaryArray();
646+
BinaryArray array = new PrimitiveBinaryArray();
645647
BinaryArrayWriter writer = new BinaryArrayWriter(array, arr.size(), 8);
646648
for (int i = 0; i < arr.size(); i++) {
647649
if (arr.isNullAt(i)) {

fluss-common/src/main/java/org/apache/fluss/row/BinaryRow.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package org.apache.fluss.row;
1919

2020
import org.apache.fluss.memory.MemorySegment;
21+
import org.apache.fluss.row.aligned.AlignedRow;
22+
import org.apache.fluss.row.compacted.CompactedRow;
23+
import org.apache.fluss.row.indexed.IndexedRow;
2124

2225
/**
2326
* A binary format {@link InternalRow} that is backed on {@link MemorySegment} and supports all
@@ -57,4 +60,20 @@ public interface BinaryRow extends InternalRow, MemoryAwareGetters {
5760
* @param sizeInBytes The size of the row.
5861
*/
5962
void pointTo(MemorySegment[] segments, int offset, int sizeInBytes);
63+
64+
/**
65+
* The binary row format types, it indicates the generated {@link BinaryRow} type by the {@link
66+
* BinaryWriter}.
67+
*/
68+
enum BinaryRowFormat {
69+
70+
/** Compacted binary row format, see {@link CompactedRow}. */
71+
COMPACTED,
72+
73+
/** Aligned binary row format, see {@link AlignedRow}. */
74+
ALIGNED,
75+
76+
/** Indexed binary row format, see {@link IndexedRow}. */
77+
INDEXED
78+
}
6079
}

fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.memory.MemorySegment;
2222
import org.apache.fluss.memory.OutputView;
23+
import org.apache.fluss.row.aligned.AlignedRow;
2324
import org.apache.fluss.row.compacted.CompactedRow;
2425
import org.apache.fluss.row.indexed.IndexedRow;
2526
import org.apache.fluss.types.DataType;
@@ -989,14 +990,16 @@ public static Decimal readDecimal(
989990
return Decimal.fromUnscaledBytes(bytes, precision, scale);
990991
}
991992

992-
/** Gets an instance of {@link InternalArray} from underlying {@link MemorySegment}. */
993+
/**
994+
* Read the array data into the reused {@link BinaryArray} instance from underlying {@link
995+
* MemorySegment}.
996+
*/
993997
public static BinaryArray readBinaryArray(
994-
MemorySegment[] segments, int baseOffset, long offsetAndSize) {
998+
MemorySegment[] segments, int baseOffset, long offsetAndSize, BinaryArray reusedArray) {
995999
final int size = ((int) offsetAndSize);
9961000
int offset = (int) (offsetAndSize >> 32);
997-
BinaryArray array = new BinaryArray();
998-
array.pointTo(segments, offset + baseOffset, size);
999-
return array;
1001+
reusedArray.pointTo(segments, offset + baseOffset, size);
1002+
return reusedArray;
10001003
}
10011004

10021005
/** Read map data from segments. */
@@ -1013,48 +1016,33 @@ public static InternalMap readMap(MemorySegment[] segments, int offset, long num
10131016
"Map type is not supported yet. Will be added in Issue #1973.");
10141017
}
10151018

1016-
/** Gets an instance of {@link BinaryRow} from underlying {@link MemorySegment}. */
1017-
public static BinaryRow readBinaryRow(
1018-
MemorySegment[] segments, int baseOffset, int numFields, long offsetAndSize) {
1019+
/** Read aligned row from segments. */
1020+
public static InternalRow readAlignedRow(
1021+
MemorySegment[] segments, int baseOffset, long offsetAndSize, int numFields) {
10191022
final int size = ((int) offsetAndSize);
10201023
int offset = (int) (offsetAndSize >> 32);
1021-
org.apache.fluss.row.aligned.AlignedRow row =
1022-
new org.apache.fluss.row.aligned.AlignedRow(numFields);
1023-
row.pointTo(segments, offset + baseOffset, size);
1024+
AlignedRow row = new AlignedRow(numFields);
1025+
row.pointTo(segments, baseOffset + offset, size);
10241026
return row;
10251027
}
10261028

1027-
/** Read indexed row data from segments. */
1029+
/** Read indexed row from segments. */
10281030
public static InternalRow readIndexedRow(
1029-
MemorySegment[] segments,
1030-
int offset,
1031-
int numBytes,
1032-
int numFields,
1033-
DataType[] fieldTypes) {
1034-
return readIndexedRowData(segments, offset, numBytes, numFields, fieldTypes);
1035-
}
1036-
1037-
/** Read IndexedRow data from segments. */
1038-
public static IndexedRow readIndexedRowData(
1039-
MemorySegment[] segments,
1040-
int offset,
1041-
int numBytes,
1042-
int numFields,
1043-
DataType[] fieldTypes) {
1031+
MemorySegment[] segments, int baseOffset, long offsetAndSize, DataType[] fieldTypes) {
1032+
final int size = ((int) offsetAndSize);
1033+
int offset = (int) (offsetAndSize >> 32);
10441034
IndexedRow row = new IndexedRow(fieldTypes);
1045-
row.pointTo(segments, offset, numBytes);
1035+
row.pointTo(segments, baseOffset + offset, size);
10461036
return row;
10471037
}
10481038

1049-
/** Read compacted row data from segments. */
1039+
/** Read compacted row from segments. */
10501040
public static InternalRow readCompactedRow(
1051-
MemorySegment[] segments,
1052-
int offset,
1053-
int numBytes,
1054-
int numFields,
1055-
DataType[] fieldTypes) {
1041+
MemorySegment[] segments, int baseOffset, long offsetAndSize, DataType[] fieldTypes) {
1042+
final int size = ((int) offsetAndSize);
1043+
int offset = (int) (offsetAndSize >> 32);
10561044
CompactedRow row = new CompactedRow(fieldTypes);
1057-
row.pointTo(segments, offset, numBytes);
1045+
row.pointTo(segments, baseOffset + offset, size);
10581046
return row;
10591047
}
10601048

fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818
package org.apache.fluss.row;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.row.BinaryRow.BinaryRowFormat;
2122
import org.apache.fluss.row.serializer.ArraySerializer;
2223
import org.apache.fluss.row.serializer.RowSerializer;
2324
import org.apache.fluss.types.ArrayType;
2425
import org.apache.fluss.types.DataType;
2526
import org.apache.fluss.types.RowType;
2627

28+
import javax.annotation.Nullable;
29+
2730
import java.io.Serializable;
2831

2932
import static org.apache.fluss.types.DataTypeChecks.getLength;
@@ -87,9 +90,12 @@ public interface BinaryWriter {
8790
* Creates an accessor for setting the elements of a binary writer during runtime.
8891
*
8992
* @param elementType the element type
93+
* @param rowFormat the binary row format, it is required when the element type has nested row
94+
* type, otherwise, {@link IllegalArgumentException} will be thrown.
9095
*/
91-
static BinaryWriter.ValueWriter createValueWriter(DataType elementType) {
92-
BinaryWriter.ValueWriter valueWriter = createNotNullValueWriter(elementType);
96+
static BinaryWriter.ValueWriter createValueWriter(
97+
DataType elementType, BinaryRowFormat rowFormat) {
98+
BinaryWriter.ValueWriter valueWriter = createNotNullValueWriter(elementType, rowFormat);
9399
if (!elementType.isNullable()) {
94100
return valueWriter;
95101
}
@@ -108,7 +114,8 @@ static BinaryWriter.ValueWriter createValueWriter(DataType elementType) {
108114
*
109115
* @param elementType the element type
110116
*/
111-
static BinaryWriter.ValueWriter createNotNullValueWriter(DataType elementType) {
117+
private static BinaryWriter.ValueWriter createNotNullValueWriter(
118+
DataType elementType, @Nullable BinaryRowFormat rowFormat) {
112119
switch (elementType.getTypeRoot()) {
113120
case CHAR:
114121
int charLength = getLength(elementType);
@@ -152,7 +159,7 @@ static BinaryWriter.ValueWriter createNotNullValueWriter(DataType elementType) {
152159
writer.writeTimestampLtz(pos, (TimestampLtz) value, timestampLtzPrecision);
153160
case ARRAY:
154161
final ArraySerializer arraySerializer =
155-
new ArraySerializer(((ArrayType) elementType).getElementType());
162+
new ArraySerializer(((ArrayType) elementType).getElementType(), rowFormat);
156163
return (writer, pos, value) ->
157164
writer.writeArray(pos, (InternalArray) value, arraySerializer);
158165

@@ -161,9 +168,14 @@ static BinaryWriter.ValueWriter createNotNullValueWriter(DataType elementType) {
161168
throw new UnsupportedOperationException(
162169
"Map type is not supported yet. Will be added in Issue #1973.");
163170
case ROW:
171+
if (rowFormat == null) {
172+
throw new IllegalArgumentException(
173+
"Binary row format is required to write row.");
174+
}
164175
final RowType rowType = (RowType) elementType;
165176
final RowSerializer rowSerializer =
166-
new RowSerializer(rowType.getFieldTypes().toArray(new DataType[0]));
177+
new RowSerializer(
178+
rowType.getFieldTypes().toArray(new DataType[0]), rowFormat);
167179
return (writer, pos, value) ->
168180
writer.writeRow(pos, (InternalRow) value, rowSerializer);
169181
default:

fluss-common/src/main/java/org/apache/fluss/row/aligned/AlignedRow.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.fluss.row.NullAwareGetters;
3131
import org.apache.fluss.row.TimestampLtz;
3232
import org.apache.fluss.row.TimestampNtz;
33+
import org.apache.fluss.row.array.AlignedArray;
3334
import org.apache.fluss.types.DataType;
3435
import org.apache.fluss.types.DecimalType;
3536
import org.apache.fluss.types.LocalZonedTimestampType;
@@ -390,7 +391,8 @@ public InternalArray getArray(int pos) {
390391
assertIndexIsValid(pos);
391392
int fieldOffset = getFieldOffset(pos);
392393
final long offsetAndSize = segments[0].getLong(fieldOffset);
393-
return BinarySegmentUtils.readBinaryArray(segments, offset, offsetAndSize);
394+
return BinarySegmentUtils.readBinaryArray(
395+
segments, offset, offsetAndSize, new AlignedArray());
394396
}
395397

396398
// TODO: getMap() will be added in Issue #1973
@@ -400,7 +402,7 @@ public InternalRow getRow(int pos, int numFields) {
400402
assertIndexIsValid(pos);
401403
int fieldOffset = getFieldOffset(pos);
402404
final long offsetAndSize = segments[0].getLong(fieldOffset);
403-
return BinarySegmentUtils.readBinaryRow(segments, offset, numFields, offsetAndSize);
405+
return BinarySegmentUtils.readAlignedRow(segments, offset, offsetAndSize, numFields);
404406
}
405407

406408
/** The bit is 1 when the field is null. Default is 0. */
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.fluss.row.array;
21+
22+
import org.apache.fluss.row.BinaryArray;
23+
import org.apache.fluss.row.BinarySegmentUtils;
24+
import org.apache.fluss.row.InternalRow;
25+
26+
/**
27+
* A {@link BinaryArray} that uses {@link org.apache.fluss.row.aligned.AlignedRow} as the binary
28+
* format for arrays of nested row type.
29+
*/
30+
public class AlignedArray extends BinaryArray {
31+
private static final long serialVersionUID = 1L;
32+
33+
@Override
34+
public InternalRow getRow(int pos, int numFields) {
35+
return BinarySegmentUtils.readAlignedRow(segments, offset, getLong(pos), numFields);
36+
}
37+
38+
@Override
39+
protected BinaryArray createNestedArrayInstance() {
40+
return new AlignedArray();
41+
}
42+
}

0 commit comments

Comments
 (0)