Skip to content

Commit f1a75ca

Browse files
committed
[common] Deep copy ColumnarArray in CompletedFetch#fetchRecords() to fix Arrow IndexOutOfBoundsException (apache#2040)
This fixes exception: Caused by: java.lang.IndexOutOfBoundsException: index: 0, length: 1 (expected: range(0, 0)) at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319)
1 parent eb8736b commit f1a75ca

File tree

6 files changed

+240
-27
lines changed

6 files changed

+240
-27
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.fluss.record.MemoryLogRecords;
3232
import org.apache.fluss.record.ProjectionPushdownCache;
3333
import org.apache.fluss.record.TestingSchemaGetter;
34+
import org.apache.fluss.row.GenericArray;
3435
import org.apache.fluss.row.InternalRow;
3536
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
3637
import org.apache.fluss.types.DataTypes;
@@ -219,6 +220,88 @@ void testProjection(LogFormat logFormat, byte magic) throws Exception {
219220
}
220221
}
221222

223+
@Test
224+
void testComplexTypeFetch() throws Exception {
225+
List<Object[]> complexData =
226+
Arrays.asList(
227+
new Object[] {
228+
1,
229+
new String[] {"a", "b"},
230+
new Object[] {new int[] {1, 2}, new int[] {3, 4}}
231+
},
232+
new Object[] {
233+
2, new String[] {"c", null}, new Object[] {null, new int[] {3, 4}}
234+
},
235+
new Object[] {
236+
3,
237+
new String[] {"e", "f"},
238+
new Object[] {new int[] {5, 6, 7}, new int[] {8}}
239+
});
240+
Schema schema =
241+
Schema.newBuilder()
242+
.column("a", DataTypes.INT())
243+
.column("b", DataTypes.ARRAY(DataTypes.STRING()))
244+
.column("c", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
245+
.build();
246+
TableInfo tableInfo =
247+
TableInfo.of(
248+
DATA2_TABLE_PATH,
249+
DATA2_TABLE_ID,
250+
DEFAULT_SCHEMA_ID,
251+
TableDescriptor.builder()
252+
.schema(schema)
253+
.distributedBy(3)
254+
.logFormat(LogFormat.ARROW)
255+
.build(),
256+
System.currentTimeMillis(),
257+
System.currentTimeMillis());
258+
long fetchOffset = 0L;
259+
int bucketId = 0;
260+
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
261+
FetchLogResultForBucket resultForBucket =
262+
new FetchLogResultForBucket(
263+
tb,
264+
createRecordsWithoutBaseLogOffset(
265+
schema.getRowType(),
266+
DEFAULT_SCHEMA_ID,
267+
0L,
268+
1000L,
269+
LOG_MAGIC_VALUE_V0,
270+
complexData,
271+
LogFormat.ARROW),
272+
3L);
273+
DefaultCompletedFetch defaultCompletedFetch =
274+
new DefaultCompletedFetch(
275+
tb,
276+
resultForBucket,
277+
LogRecordReadContext.createReadContext(
278+
tableInfo,
279+
false,
280+
null,
281+
new TestingSchemaGetter(
282+
tableInfo.getSchemaId(), tableInfo.getSchema())),
283+
logScannerStatus,
284+
true,
285+
fetchOffset);
286+
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(3);
287+
// close the read context to release arrow root resource,
288+
// this is important to test complex types
289+
defaultCompletedFetch.readContext.close();
290+
assertThat(scanRecords.size()).isEqualTo(3);
291+
for (int i = 0; i < scanRecords.size(); i++) {
292+
ScanRecord record = scanRecords.get(i);
293+
assertThat(record.logOffset()).isEqualTo(i);
294+
InternalRow row = record.getRow();
295+
assertThat(row.getInt(0)).isEqualTo(complexData.get(i)[0]);
296+
assertThat(row.getArray(1)).isInstanceOf(GenericArray.class);
297+
GenericArray array = (GenericArray) row.getArray(1);
298+
assertThat(array.toString())
299+
.isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[1]));
300+
assertThat(row.getArray(2).toString())
301+
.isEqualTo(Arrays.deepToString((Object[]) complexData.get(i)[2]));
302+
}
303+
}
304+
222305
private DefaultCompletedFetch makeCompletedFetch(
223306
TableBucket tableBucket, FetchLogResultForBucket resultForBucket, long offset) {
224307
return makeCompletedFetch(tableBucket, resultForBucket, offset, null);

fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,9 @@ private static FieldGetter[] buildProjectedFieldGetters(RowType rowType, int[] s
265265
List<DataType> dataTypeList = rowType.getChildren();
266266
FieldGetter[] fieldGetters = new FieldGetter[selectedFields.length];
267267
for (int i = 0; i < fieldGetters.length; i++) {
268+
// build deep field getter to support nested types
268269
fieldGetters[i] =
269-
InternalRow.createFieldGetter(
270+
InternalRow.createDeepFieldGetter(
270271
dataTypeList.get(selectedFields[i]), selectedFields[i]);
271272
}
272273
return fieldGetters;

fluss-common/src/main/java/org/apache/fluss/row/GenericArray.java

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -139,27 +139,6 @@ public boolean isNullAt(int pos) {
139139
return !isPrimitiveArray && ((Object[]) array)[pos] == null;
140140
}
141141

142-
@Override
143-
public boolean equals(Object o) {
144-
if (this == o) {
145-
return true;
146-
}
147-
if (o == null || getClass() != o.getClass()) {
148-
return false;
149-
}
150-
GenericArray that = (GenericArray) o;
151-
return size == that.size
152-
&& isPrimitiveArray == that.isPrimitiveArray
153-
&& Objects.deepEquals(array, that.array);
154-
}
155-
156-
@Override
157-
public int hashCode() {
158-
int result = Objects.hash(size, isPrimitiveArray);
159-
result = 31 * result + Arrays.deepHashCode(new Object[] {array});
160-
return result;
161-
}
162-
163142
// ------------------------------------------------------------------------------------------
164143
// Read-only accessor methods
165144
// ------------------------------------------------------------------------------------------
@@ -328,6 +307,32 @@ public double[] toDoubleArray() {
328307
return ArrayUtils.toPrimitiveDouble((Object[]) array);
329308
}
330309

310+
@Override
311+
public boolean equals(Object o) {
312+
if (this == o) {
313+
return true;
314+
}
315+
if (o == null || getClass() != o.getClass()) {
316+
return false;
317+
}
318+
GenericArray that = (GenericArray) o;
319+
return size == that.size
320+
&& isPrimitiveArray == that.isPrimitiveArray
321+
&& Objects.deepEquals(array, that.array);
322+
}
323+
324+
@Override
325+
public int hashCode() {
326+
int result = Objects.hash(size, isPrimitiveArray);
327+
result = 31 * result + Arrays.deepHashCode(new Object[] {array});
328+
return result;
329+
}
330+
331+
@Override
332+
public String toString() {
333+
return Arrays.toString((Object[]) array);
334+
}
335+
331336
// ----------------------------------------------------------------------------------------
332337
// Utilities
333338
// ----------------------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/row/InternalArray.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.fluss.row;
2020

2121
import org.apache.fluss.annotation.PublicEvolving;
22+
import org.apache.fluss.row.columnar.ColumnarRow;
23+
import org.apache.fluss.row.columnar.VectorizedColumnBatch;
2224
import org.apache.fluss.types.ArrayType;
2325
import org.apache.fluss.types.DataType;
2426

@@ -152,6 +154,55 @@ static ElementGetter createElementGetter(DataType fieldType) {
152154
};
153155
}
154156

157+
/**
158+
* Creates a deep accessor for getting elements in an internal array data structure at the given
159+
* position. It returns new objects (GenericArray/GenericMap/GenericMap) for nested
160+
* array/map/row types.
161+
*
162+
* <p>NOTE: Currently, it is only used for deep copying {@link ColumnarRow} for Arrow which
163+
* avoid the arrow buffer is released before accessing elements. It doesn't deep copy STRING and
164+
* BYTES types, because {@link ColumnarRow} already deep copies the bytes, see {@link
165+
* VectorizedColumnBatch#getString(int, int)}. This can be removed once we supports object reuse
166+
* for Arrow {@link ColumnarRow}, see {@code CompletedFetch#toScanRecord(LogRecord)}.
167+
*/
168+
static ElementGetter createDeepElementGetter(DataType fieldType) {
169+
final ElementGetter elementGetter;
170+
switch (fieldType.getTypeRoot()) {
171+
case ARRAY:
172+
DataType nestedType = ((ArrayType) fieldType).getElementType();
173+
ElementGetter nestedGetter = createDeepElementGetter(nestedType);
174+
elementGetter =
175+
(array, pos) -> {
176+
InternalArray inner = array.getArray(pos);
177+
Object[] objs = new Object[inner.size()];
178+
for (int i = 0; i < inner.size(); i++) {
179+
objs[i] = nestedGetter.getElementOrNull(inner, i);
180+
}
181+
return new GenericArray(objs);
182+
};
183+
break;
184+
case MAP:
185+
case ROW:
186+
String msg =
187+
String.format(
188+
"type %s not support in %s",
189+
fieldType.getTypeRoot().toString(), InternalArray.class.getName());
190+
throw new IllegalArgumentException(msg);
191+
default:
192+
// for primitive types, we can directly return the element getter
193+
elementGetter = createElementGetter(fieldType);
194+
}
195+
if (!fieldType.isNullable()) {
196+
return elementGetter;
197+
}
198+
return (array, pos) -> {
199+
if (array.isNullAt(pos)) {
200+
return null;
201+
}
202+
return elementGetter.getElementOrNull(array, pos);
203+
};
204+
}
205+
155206
/** Accessor for getting the elements of an array during runtime. */
156207
interface ElementGetter extends Serializable {
157208
@Nullable

fluss-common/src/main/java/org/apache/fluss/row/InternalRow.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
2121
import org.apache.fluss.record.ChangeType;
22+
import org.apache.fluss.row.columnar.ColumnarRow;
23+
import org.apache.fluss.row.columnar.VectorizedColumnBatch;
24+
import org.apache.fluss.types.ArrayType;
2225
import org.apache.fluss.types.DataType;
2326
import org.apache.fluss.types.RowType;
2427

2528
import javax.annotation.Nullable;
2629

2730
import java.io.Serializable;
2831

32+
import static org.apache.fluss.row.InternalArray.createDeepElementGetter;
2933
import static org.apache.fluss.types.DataTypeChecks.getLength;
3034
import static org.apache.fluss.types.DataTypeChecks.getPrecision;
3135
import static org.apache.fluss.types.DataTypeChecks.getScale;
@@ -228,6 +232,57 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) {
228232
};
229233
}
230234

235+
/**
236+
* Creates a deep accessor for getting elements in an internal array data structure at the given
237+
* position. It returns new objects (GenericArray/GenericMap/GenericMap) for nested
238+
* array/map/row types.
239+
*
240+
* <p>NOTE: Currently, it is only used for deep copying {@link ColumnarRow} for Arrow which
241+
* avoid the arrow buffer is released before accessing elements. It doesn't deep copy STRING and
242+
* BYTES types, because {@link ColumnarRow} already deep copies the bytes, see {@link
243+
* VectorizedColumnBatch#getString(int, int)}. This can be removed once we supports object reuse
244+
* for Arrow {@link ColumnarRow}, see {@code CompletedFetch#toScanRecord(LogRecord)}.
245+
*/
246+
static FieldGetter createDeepFieldGetter(DataType fieldType, int fieldPos) {
247+
final FieldGetter fieldGetter;
248+
switch (fieldType.getTypeRoot()) {
249+
case ARRAY:
250+
DataType elementType = ((ArrayType) fieldType).getElementType();
251+
InternalArray.ElementGetter nestedGetter = createDeepElementGetter(elementType);
252+
fieldGetter =
253+
row -> {
254+
InternalArray array = row.getArray(fieldPos);
255+
Object[] objs = new Object[array.size()];
256+
for (int i = 0; i < array.size(); i++) {
257+
objs[i] = nestedGetter.getElementOrNull(array, i);
258+
}
259+
return new GenericArray(objs);
260+
};
261+
break;
262+
case MAP:
263+
case ROW:
264+
String msg =
265+
String.format(
266+
"type %s not support in %s",
267+
fieldType.getTypeRoot().toString(), InternalArray.class.getName());
268+
throw new IllegalArgumentException(msg);
269+
default:
270+
// for primitive types, use the normal field getter
271+
fieldGetter = createFieldGetter(fieldType, fieldPos);
272+
break;
273+
}
274+
275+
if (!fieldType.isNullable()) {
276+
return fieldGetter;
277+
}
278+
return row -> {
279+
if (row.isNullAt(fieldPos)) {
280+
return null;
281+
}
282+
return fieldGetter.getFieldOrNull(row);
283+
};
284+
}
285+
231286
/** Accessor for getting the field of a row during runtime. */
232287
interface FieldGetter extends Serializable {
233288
@Nullable

fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.fluss.record.MemoryLogRecordsIndexedBuilder;
4545
import org.apache.fluss.remote.RemoteLogSegment;
4646
import org.apache.fluss.row.BinaryString;
47+
import org.apache.fluss.row.GenericArray;
4748
import org.apache.fluss.row.GenericRow;
4849
import org.apache.fluss.row.InternalRow;
4950
import org.apache.fluss.row.arrow.ArrowWriter;
@@ -97,15 +98,32 @@ public class DataTestUtils {
9798
public static GenericRow row(Object... objects) {
9899
GenericRow row = new GenericRow(objects.length);
99100
for (int i = 0; i < objects.length; i++) {
100-
if (objects[i] instanceof String) {
101-
row.setField(i, BinaryString.fromString((String) objects[i]));
102-
} else {
103-
row.setField(i, objects[i]);
104-
}
101+
Object value = toInternalObject(objects[i]);
102+
row.setField(i, value);
105103
}
106104
return row;
107105
}
108106

107+
private static Object toInternalObject(Object obj) {
108+
if (obj == null) {
109+
return null;
110+
}
111+
if (obj instanceof String) {
112+
return BinaryString.fromString((String) obj);
113+
} else if (obj instanceof Object[]) {
114+
Object[] array = (Object[]) obj;
115+
Object[] internalArray = new Object[array.length];
116+
for (int j = 0; j < array.length; j++) {
117+
internalArray[j] = toInternalObject(array[j]);
118+
}
119+
return new GenericArray(internalArray);
120+
} else if (obj instanceof int[]) {
121+
return new GenericArray((int[]) obj);
122+
} else {
123+
return obj;
124+
}
125+
}
126+
109127
public static CompactedRow compactedRow(RowType rowType, Object[] objects) {
110128
return genCompacted(rowType, objects);
111129
}

0 commit comments

Comments
 (0)