Skip to content

Commit e73e61f

Browse files
author
binary-signal
committed
safe writes for array
Signed-off-by: binary-signal <[email protected]>
1 parent 49ce39d commit e73e61f

File tree

2 files changed

+71
-1
lines changed

2 files changed

+71
-1
lines changed

fluss-common/src/main/java/org/apache/fluss/row/arrow/writers/ArrowArrayWriter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ public void doWrite(int rowIndex, DataGetters row, int ordinal, boolean handleSa
4141
listVector.startNewValue(rowIndex);
4242
for (int arrIndex = 0; arrIndex < array.size(); arrIndex++) {
4343
int fieldIndex = offset + arrIndex;
44-
elementWriter.write(fieldIndex, array, arrIndex, handleSafe);
44+
// Always use safe writes for array elements because the element index (offset +
45+
// arrIndex) can exceed INITIAL_CAPACITY even when the row count doesn't. The parent's
46+
// handleSafe is based on row count, but array element indices grow based on the total
47+
// number of elements across all arrays, which can be much larger.
48+
elementWriter.write(fieldIndex, array, arrIndex, true);
4549
}
4650
offset += array.size();
4751
listVector.endValue(rowIndex, array.size());

fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,4 +259,70 @@ void testWriterExceedMaxSizeInBytes() {
259259
"The arrow batch size is full and it shouldn't accept writing new rows, it's a bug.");
260260
}
261261
}
262+
263+
/**
264+
* Tests that array columns work correctly when the total number of array elements exceeds
265+
* INITIAL_CAPACITY (1024) while the row count stays below it. This reproduces a bug where
266+
* ArrowArrayWriter used the parent's handleSafe flag (based on row count) for element writes,
267+
* causing IndexOutOfBoundsException when element indices exceeded the vector's initial
268+
* capacity.
269+
*/
270+
@Test
271+
void testArrayWriterWithManyElements() throws IOException {
272+
// Schema with array column
273+
RowType rowType =
274+
DataTypes.ROW(
275+
DataTypes.FIELD("id", DataTypes.INT()),
276+
DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.INT())));
277+
278+
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
279+
VectorSchemaRoot root =
280+
VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
281+
ArrowWriterPool provider = new ArrowWriterPool(allocator);
282+
ArrowWriter writer =
283+
provider.getOrCreateWriter(
284+
1L, 1, Integer.MAX_VALUE, rowType, NO_COMPRESSION)) {
285+
286+
// Write 200 rows, each with a 10-element array.
287+
// Total elements = 2000, exceeding INITIAL_CAPACITY (1024).
288+
// But row count (200) < 1024, so handleSafe would be false without the fix.
289+
int numRows = 200;
290+
int arraySize = 10;
291+
for (int i = 0; i < numRows; i++) {
292+
Integer[] elements = new Integer[arraySize];
293+
for (int j = 0; j < arraySize; j++) {
294+
elements[j] = i * arraySize + j;
295+
}
296+
writer.writeRow(GenericRow.of(i, GenericArray.of(elements)));
297+
}
298+
299+
// Verify serialization works without IndexOutOfBoundsException
300+
AbstractPagedOutputView pagedOutputView =
301+
new ManagedPagedOutputView(new TestingMemorySegmentPool(64 * 1024));
302+
int size =
303+
writer.serializeToOutputView(
304+
pagedOutputView, arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE));
305+
assertThat(size).isGreaterThan(0);
306+
307+
// Verify the data can be read back correctly
308+
int heapMemorySize = Math.max(size, writer.estimatedSizeInBytes());
309+
MemorySegment segment = MemorySegment.allocateHeapMemory(heapMemorySize);
310+
MemorySegment firstSegment = pagedOutputView.getCurrentSegment();
311+
firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0, size);
312+
313+
ArrowReader reader =
314+
ArrowUtils.createArrowReader(segment, 0, size, root, allocator, rowType);
315+
assertThat(reader.getRowCount()).isEqualTo(numRows);
316+
317+
for (int i = 0; i < numRows; i++) {
318+
ColumnarRow row = reader.read(i);
319+
row.setRowId(i);
320+
assertThat(row.getInt(0)).isEqualTo(i);
321+
assertThat(row.getArray(1).size()).isEqualTo(arraySize);
322+
for (int j = 0; j < arraySize; j++) {
323+
assertThat(row.getArray(1).getInt(j)).isEqualTo(i * arraySize + j);
324+
}
325+
}
326+
}
327+
}
262328
}

0 commit comments

Comments
 (0)