Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public void doWrite(int rowIndex, DataGetters row, int ordinal, boolean handleSa
listVector.startNewValue(rowIndex);
for (int arrIndex = 0; arrIndex < array.size(); arrIndex++) {
int fieldIndex = offset + arrIndex;
elementWriter.write(fieldIndex, array, arrIndex, handleSafe);
// Always use safe writes for array elements because the element index (offset +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention in the comments on the class that the handleSafe field is ignored when writing.

// arrIndex) can exceed INITIAL_CAPACITY even when the row count doesn't. The parent's
// handleSafe is based on row count, but array element indices grow based on the total
// number of elements across all arrays, which can be much larger.
elementWriter.write(fieldIndex, array, arrIndex, true);
}
offset += array.size();
listVector.endValue(rowIndex, array.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,70 @@ void testWriterExceedMaxSizeInBytes() {
"The arrow batch size is full and it shouldn't accept writing new rows, it's a bug.");
}
}

/**
* Tests that array columns work correctly when the total number of array elements exceeds
* INITIAL_CAPACITY (1024) while the row count stays below it. This reproduces a bug where
* ArrowArrayWriter used the parent's handleSafe flag (based on row count) for element writes,
* causing IndexOutOfBoundsException when element indices exceeded the vector's initial
* capacity.
*/
@Test
void testArrayWriterWithManyElements() throws IOException {
// Schema with array column
RowType rowType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.INT())));

try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
VectorSchemaRoot root =
VectorSchemaRoot.create(ArrowUtils.toArrowSchema(rowType), allocator);
ArrowWriterPool provider = new ArrowWriterPool(allocator);
ArrowWriter writer =
provider.getOrCreateWriter(
1L, 1, Integer.MAX_VALUE, rowType, NO_COMPRESSION)) {

// Write 200 rows, each with a 10-element array.
// Total elements = 2000, exceeding INITIAL_CAPACITY (1024).
// But row count (200) < 1024, so handleSafe would be false without the fix.
int numRows = 200;
int arraySize = 10;
for (int i = 0; i < numRows; i++) {
Integer[] elements = new Integer[arraySize];
for (int j = 0; j < arraySize; j++) {
elements[j] = i * arraySize + j;
}
writer.writeRow(GenericRow.of(i, GenericArray.of(elements)));
}

// Verify serialization works without IndexOutOfBoundsException
AbstractPagedOutputView pagedOutputView =
new ManagedPagedOutputView(new TestingMemorySegmentPool(64 * 1024));
int size =
writer.serializeToOutputView(
pagedOutputView, arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE));
assertThat(size).isGreaterThan(0);

// Verify the data can be read back correctly
int heapMemorySize = Math.max(size, writer.estimatedSizeInBytes());
MemorySegment segment = MemorySegment.allocateHeapMemory(heapMemorySize);
MemorySegment firstSegment = pagedOutputView.getCurrentSegment();
firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0, size);

ArrowReader reader =
ArrowUtils.createArrowReader(segment, 0, size, root, allocator, rowType);
assertThat(reader.getRowCount()).isEqualTo(numRows);

for (int i = 0; i < numRows; i++) {
ColumnarRow row = reader.read(i);
row.setRowId(i);
assertThat(row.getInt(0)).isEqualTo(i);
assertThat(row.getArray(1).size()).isEqualTo(arraySize);
for (int j = 0; j < arraySize; j++) {
assertThat(row.getArray(1).getInt(j)).isEqualTo(i * arraySize + j);
}
}
}
}
}