Skip to content

Commit f50e06b

Browse files
XuQianJin-Starswuchong
authored andcommitted
[common] Introduce nested ROW type for ARROW, COMPACTED and INDEXED formats (#2079)
1 parent 9e6f6a4 commit f50e06b

File tree

64 files changed

+2348
-394
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+2348
-394
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,21 +227,32 @@ void testComplexTypeFetch() throws Exception {
227227
new Object[] {
228228
1,
229229
new String[] {"a", "b"},
230-
new Object[] {new int[] {1, 2}, new int[] {3, 4}}
230+
new Object[] {new int[] {1, 2}, new int[] {3, 4}},
231+
new Object[] {10, new Object[] {20, "nested"}, "row1"}
231232
},
232233
new Object[] {
233-
2, new String[] {"c", null}, new Object[] {null, new int[] {3, 4}}
234+
2,
235+
new String[] {"c", null},
236+
new Object[] {null, new int[] {3, 4}},
237+
new Object[] {30, new Object[] {40, "test"}, "row2"}
234238
},
235239
new Object[] {
236240
3,
237241
new String[] {"e", "f"},
238-
new Object[] {new int[] {5, 6, 7}, new int[] {8}}
242+
new Object[] {new int[] {5, 6, 7}, new int[] {8}},
243+
new Object[] {50, new Object[] {60, "value"}, "row3"}
239244
});
240245
Schema schema =
241246
Schema.newBuilder()
242247
.column("a", DataTypes.INT())
243248
.column("b", DataTypes.ARRAY(DataTypes.STRING()))
244249
.column("c", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
250+
.column(
251+
"d",
252+
DataTypes.ROW(
253+
DataTypes.INT(),
254+
DataTypes.ROW(DataTypes.INT(), DataTypes.STRING()),
255+
DataTypes.STRING()))
245256
.build();
246257
TableInfo tableInfo =
247258
TableInfo.of(
@@ -299,6 +310,17 @@ void testComplexTypeFetch() throws Exception {
299310
.isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[1]));
300311
assertThat(row.getArray(2).toString())
301312
.isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[2]));
313+
InternalRow nestedRow = row.getRow(3, 3);
314+
assertThat(nestedRow).isNotNull();
315+
assertThat(nestedRow.getInt(0)).isEqualTo(((Object[]) complexData.get(i)[3])[0]);
316+
InternalRow deeplyNestedRow = nestedRow.getRow(1, 2);
317+
assertThat(deeplyNestedRow).isNotNull();
318+
assertThat(deeplyNestedRow.getInt(0))
319+
.isEqualTo(((Object[]) ((Object[]) complexData.get(i)[3])[1])[0]);
320+
assertThat(deeplyNestedRow.getString(1).toString())
321+
.isEqualTo(((Object[]) ((Object[]) complexData.get(i)[3])[1])[1]);
322+
assertThat(nestedRow.getString(2).toString())
323+
.isEqualTo(((Object[]) complexData.get(i)[3])[2]);
302324
}
303325
}
304326

fluss-common/src/main/java/org/apache/fluss/row/AbstractBinaryWriter.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.memory.MemorySegment;
2121
import org.apache.fluss.row.serializer.ArraySerializer;
22+
import org.apache.fluss.row.serializer.RowSerializer;
2223

2324
import java.nio.charset.StandardCharsets;
2425
import java.util.Arrays;
@@ -88,20 +89,20 @@ public void writeArray(int pos, InternalArray input, ArraySerializer serializer)
8889
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
8990
}
9091

91-
// TODO: Map and Row write methods will be added in Issue #1973 and #1974
92+
// TODO: Map and Row write methods will be added in Issue #1973
9293
// @Override
9394
// public void writeMap(int pos, InternalMap input, InternalMapSerializer serializer) {
9495
// BinaryMap binary = serializer.toBinaryMap(input);
9596
// writeSegmentsToVarLenPart(
9697
// pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
9798
// }
98-
//
99-
// @Override
100-
// public void writeRow(int pos, InternalRow value, InternalRowSerializer serializer) {
101-
// BinaryRow binary = serializer.toBinaryRow(value);
102-
// writeSegmentsToVarLenPart(
103-
// pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
104-
// }
99+
100+
@Override
101+
public void writeRow(int pos, InternalRow value, RowSerializer serializer) {
102+
BinaryRow binary = serializer.toBinaryRow(value);
103+
writeSegmentsToVarLenPart(
104+
pos, binary.getSegments(), binary.getOffset(), binary.getSizeInBytes());
105+
}
105106

106107
@Override
107108
public void writeChar(int pos, BinaryString value, int length) {

fluss-common/src/main/java/org/apache/fluss/row/BinaryArray.java

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.memory.MemorySegment;
22+
import org.apache.fluss.row.array.PrimitiveBinaryArray;
2223
import org.apache.fluss.types.DataType;
2324

2425
import java.lang.reflect.Array;
@@ -48,7 +49,7 @@
4849
* @since 0.9
4950
*/
5051
@PublicEvolving
51-
public final class BinaryArray extends BinarySection
52+
public abstract class BinaryArray extends BinarySection
5253
implements InternalArray, MemoryAwareGetters, DataSetters {
5354

5455
private static final long serialVersionUID = 1L;
@@ -111,9 +112,7 @@ public static int calculateFixLengthPartSize(DataType type) {
111112
/** The position to start storing array elements. */
112113
private transient int elementOffset;
113114

114-
public BinaryArray() {}
115-
116-
private void assertIndexIsValid(int ordinal) {
115+
protected void assertIndexIsValid(int ordinal) {
117116
assert ordinal >= 0 : "ordinal (" + ordinal + ") should >= 0";
118117
assert ordinal < size : "ordinal (" + ordinal + ") should < " + size;
119118
}
@@ -265,11 +264,14 @@ public TimestampLtz getTimestampLtz(int pos, int precision) {
265264
@Override
266265
public InternalArray getArray(int pos) {
267266
assertIndexIsValid(pos);
268-
return BinarySegmentUtils.readBinaryArray(segments, offset, getLong(pos));
267+
return BinarySegmentUtils.readBinaryArray(
268+
segments, offset, getLong(pos), createNestedArrayInstance());
269269
}
270270

271+
/** Creates a nested {@link BinaryArray} with the nested data type information. */
272+
protected abstract BinaryArray createNestedArrayInstance();
273+
271274
// TODO: getMap() will be added in Issue #1973
272-
// TODO: getRow() will be added in Issue #1974
273275

274276
@Override
275277
public boolean getBoolean(int pos) {
@@ -545,21 +547,26 @@ public <T> T[] toObjectArray(DataType elementType) {
545547
return values;
546548
}
547549

548-
public BinaryArray copy() {
549-
return copy(new BinaryArray());
550-
}
551-
552-
public BinaryArray copy(BinaryArray reuse) {
553-
byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
554-
reuse.pointTo(MemorySegment.wrap(bytes), 0, sizeInBytes);
555-
return reuse;
556-
}
557-
558550
@Override
559551
public int hashCode() {
560552
return BinarySegmentUtils.hash(segments, offset, sizeInBytes);
561553
}
562554

555+
@Override
556+
public boolean equals(Object o) {
557+
if (this == o) {
558+
return true;
559+
}
560+
// override equals and only checks the other object is instance of BinaryArray
561+
if (!(o instanceof BinaryArray)) {
562+
return false;
563+
}
564+
final BinarySection that = (BinarySection) o;
565+
return sizeInBytes == that.sizeInBytes
566+
&& BinarySegmentUtils.equals(
567+
segments, offset, that.segments, that.offset, sizeInBytes);
568+
}
569+
563570
// ------------------------------------------------------------------------------------------
564571
// Construction Utilities
565572
// ------------------------------------------------------------------------------------------
@@ -611,13 +618,13 @@ private static BinaryArray fromPrimitiveArray(
611618
UNSAFE.copyMemory(
612619
arr, offset, data, BYTE_ARRAY_BASE_OFFSET + headerInBytes, valueRegionInBytes);
613620

614-
BinaryArray result = new BinaryArray();
621+
BinaryArray result = new PrimitiveBinaryArray();
615622
result.pointTo(MemorySegment.wrap(data), 0, (int) totalSize);
616623
return result;
617624
}
618625

619626
public static BinaryArray fromLongArray(Long[] arr) {
620-
BinaryArray array = new BinaryArray();
627+
BinaryArray array = new PrimitiveBinaryArray();
621628
BinaryArrayWriter writer = new BinaryArrayWriter(array, arr.length, 8);
622629
for (int i = 0; i < arr.length; i++) {
623630
Long v = arr[i];
@@ -636,7 +643,7 @@ public static BinaryArray fromLongArray(InternalArray arr) {
636643
return (BinaryArray) arr;
637644
}
638645

639-
BinaryArray array = new BinaryArray();
646+
BinaryArray array = new PrimitiveBinaryArray();
640647
BinaryArrayWriter writer = new BinaryArrayWriter(array, arr.size(), 8);
641648
for (int i = 0; i < arr.size(); i++) {
642649
if (arr.isNullAt(i)) {

fluss-common/src/main/java/org/apache/fluss/row/BinaryRow.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package org.apache.fluss.row;
1919

2020
import org.apache.fluss.memory.MemorySegment;
21+
import org.apache.fluss.row.aligned.AlignedRow;
22+
import org.apache.fluss.row.compacted.CompactedRow;
23+
import org.apache.fluss.row.indexed.IndexedRow;
2124

2225
/**
2326
* A binary format {@link InternalRow} that is backed on {@link MemorySegment} and supports all
@@ -57,4 +60,20 @@ public interface BinaryRow extends InternalRow, MemoryAwareGetters {
5760
* @param sizeInBytes The size of the row.
5861
*/
5962
void pointTo(MemorySegment[] segments, int offset, int sizeInBytes);
63+
64+
/**
65+
* The binary row format types, it indicates the generated {@link BinaryRow} type by the {@link
66+
* BinaryWriter}.
67+
*/
68+
enum BinaryRowFormat {
69+
70+
/** Compacted binary row format, see {@link CompactedRow}. */
71+
COMPACTED,
72+
73+
/** Aligned binary row format, see {@link AlignedRow}. */
74+
ALIGNED,
75+
76+
/** Indexed binary row format, see {@link IndexedRow}. */
77+
INDEXED
78+
}
6079
}

fluss-common/src/main/java/org/apache/fluss/row/BinarySegmentUtils.java

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.memory.MemorySegment;
2222
import org.apache.fluss.memory.OutputView;
23+
import org.apache.fluss.row.aligned.AlignedRow;
2324
import org.apache.fluss.row.compacted.CompactedRow;
2425
import org.apache.fluss.row.indexed.IndexedRow;
2526
import org.apache.fluss.types.DataType;
@@ -989,14 +990,16 @@ public static Decimal readDecimal(
989990
return Decimal.fromUnscaledBytes(bytes, precision, scale);
990991
}
991992

992-
/** Gets an instance of {@link InternalArray} from underlying {@link MemorySegment}. */
993+
/**
994+
* Read the array data into the reused {@link BinaryArray} instance from underlying {@link
995+
* MemorySegment}.
996+
*/
993997
public static BinaryArray readBinaryArray(
994-
MemorySegment[] segments, int baseOffset, long offsetAndSize) {
998+
MemorySegment[] segments, int baseOffset, long offsetAndSize, BinaryArray reusedArray) {
995999
final int size = ((int) offsetAndSize);
9961000
int offset = (int) (offsetAndSize >> 32);
997-
BinaryArray array = new BinaryArray();
998-
array.pointTo(segments, offset + baseOffset, size);
999-
return array;
1001+
reusedArray.pointTo(segments, offset + baseOffset, size);
1002+
return reusedArray;
10001003
}
10011004

10021005
/** Read map data from segments. */
@@ -1013,27 +1016,33 @@ public static InternalMap readMap(MemorySegment[] segments, int offset, long num
10131016
"Map type is not supported yet. Will be added in Issue #1973.");
10141017
}
10151018

1016-
/** Read indexed row data from segments. */
1019+
/** Read aligned row from segments. */
1020+
public static InternalRow readAlignedRow(
1021+
MemorySegment[] segments, int baseOffset, long offsetAndSize, int numFields) {
1022+
final int size = ((int) offsetAndSize);
1023+
int offset = (int) (offsetAndSize >> 32);
1024+
AlignedRow row = new AlignedRow(numFields);
1025+
row.pointTo(segments, baseOffset + offset, size);
1026+
return row;
1027+
}
1028+
1029+
/** Read indexed row from segments. */
10171030
public static InternalRow readIndexedRow(
1018-
MemorySegment[] segments,
1019-
int offset,
1020-
int numBytes,
1021-
int numFields,
1022-
DataType[] fieldTypes) {
1031+
MemorySegment[] segments, int baseOffset, long offsetAndSize, DataType[] fieldTypes) {
1032+
final int size = ((int) offsetAndSize);
1033+
int offset = (int) (offsetAndSize >> 32);
10231034
IndexedRow row = new IndexedRow(fieldTypes);
1024-
row.pointTo(segments, offset, numBytes);
1035+
row.pointTo(segments, baseOffset + offset, size);
10251036
return row;
10261037
}
10271038

1028-
/** Read compacted row data from segments. */
1039+
/** Read compacted row from segments. */
10291040
public static InternalRow readCompactedRow(
1030-
MemorySegment[] segments,
1031-
int offset,
1032-
int numBytes,
1033-
int numFields,
1034-
DataType[] fieldTypes) {
1041+
MemorySegment[] segments, int baseOffset, long offsetAndSize, DataType[] fieldTypes) {
1042+
final int size = ((int) offsetAndSize);
1043+
int offset = (int) (offsetAndSize >> 32);
10351044
CompactedRow row = new CompactedRow(fieldTypes);
1036-
row.pointTo(segments, offset, numBytes);
1045+
row.pointTo(segments, baseOffset + offset, size);
10371046
return row;
10381047
}
10391048

fluss-common/src/main/java/org/apache/fluss/row/BinaryWriter.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,14 @@
1818
package org.apache.fluss.row;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21+
import org.apache.fluss.row.BinaryRow.BinaryRowFormat;
2122
import org.apache.fluss.row.serializer.ArraySerializer;
23+
import org.apache.fluss.row.serializer.RowSerializer;
2224
import org.apache.fluss.types.ArrayType;
2325
import org.apache.fluss.types.DataType;
26+
import org.apache.fluss.types.RowType;
27+
28+
import javax.annotation.Nullable;
2429

2530
import java.io.Serializable;
2631

@@ -73,9 +78,7 @@ public interface BinaryWriter {
7378

7479
void writeArray(int pos, InternalArray value, ArraySerializer serializer);
7580

76-
// TODO: Map and Row write methods will be added in Issue #1973 and #1974
77-
// void writeMap(int pos, InternalMap value, InternalMapSerializer serializer);
78-
// void writeRow(int pos, InternalRow value, InternalRowSerializer serializer);
81+
void writeRow(int pos, InternalRow value, RowSerializer serializer);
7982

8083
/** Finally, complete write to set real size to binary. */
8184
void complete();
@@ -87,9 +90,12 @@ public interface BinaryWriter {
8790
* Creates an accessor for setting the elements of a binary writer during runtime.
8891
*
8992
* @param elementType the element type
93+
* @param rowFormat the binary row format, it is required when the element type has nested row
94+
* type, otherwise, {@link IllegalArgumentException} will be thrown.
9095
*/
91-
static BinaryWriter.ValueWriter createValueWriter(DataType elementType) {
92-
BinaryWriter.ValueWriter valueWriter = createNotNullValueWriter(elementType);
96+
static BinaryWriter.ValueWriter createValueWriter(
97+
DataType elementType, BinaryRowFormat rowFormat) {
98+
BinaryWriter.ValueWriter valueWriter = createNotNullValueWriter(elementType, rowFormat);
9399
if (!elementType.isNullable()) {
94100
return valueWriter;
95101
}
@@ -108,7 +114,8 @@ static BinaryWriter.ValueWriter createValueWriter(DataType elementType) {
108114
*
109115
* @param elementType the element type
110116
*/
111-
static BinaryWriter.ValueWriter createNotNullValueWriter(DataType elementType) {
117+
static BinaryWriter.ValueWriter createNotNullValueWriter(
118+
DataType elementType, @Nullable BinaryRowFormat rowFormat) {
112119
switch (elementType.getTypeRoot()) {
113120
case CHAR:
114121
int charLength = getLength(elementType);
@@ -152,7 +159,7 @@ static BinaryWriter.ValueWriter createNotNullValueWriter(DataType elementType) {
152159
writer.writeTimestampLtz(pos, (TimestampLtz) value, timestampLtzPrecision);
153160
case ARRAY:
154161
final ArraySerializer arraySerializer =
155-
new ArraySerializer(((ArrayType) elementType).getElementType());
162+
new ArraySerializer(((ArrayType) elementType).getElementType(), rowFormat);
156163
return (writer, pos, value) ->
157164
writer.writeArray(pos, (InternalArray) value, arraySerializer);
158165

@@ -161,9 +168,16 @@ static BinaryWriter.ValueWriter createNotNullValueWriter(DataType elementType) {
161168
throw new UnsupportedOperationException(
162169
"Map type is not supported yet. Will be added in Issue #1973.");
163170
case ROW:
164-
// TODO: Row type support will be added in Issue #1974
165-
throw new UnsupportedOperationException(
166-
"Row type is not supported yet. Will be added in Issue #1974.");
171+
if (rowFormat == null) {
172+
throw new IllegalArgumentException(
173+
"Binary row format is required to write row.");
174+
}
175+
final RowType rowType = (RowType) elementType;
176+
final RowSerializer rowSerializer =
177+
new RowSerializer(
178+
rowType.getFieldTypes().toArray(new DataType[0]), rowFormat);
179+
return (writer, pos, value) ->
180+
writer.writeRow(pos, (InternalRow) value, rowSerializer);
167181
default:
168182
String msg =
169183
String.format(

fluss-common/src/main/java/org/apache/fluss/row/DataGetters.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,7 @@ public interface DataGetters {
9494

9595
/** Returns the array value at the given position. */
9696
InternalArray getArray(int pos);
97+
98+
/** Returns the row value at the given position. */
99+
InternalRow getRow(int pos, int numFields);
97100
}

0 commit comments

Comments
 (0)