Skip to content

Commit df6ea8a

Browse files
committed
[core] Enhance ArrowBundleWriter to more batch scenarios
1 parent 04c8c52 commit df6ea8a

9 files changed

Lines changed: 987 additions & 6 deletions

File tree

paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.paimon.arrow.ArrowUtils;
2222
import org.apache.paimon.data.InternalRow;
23+
import org.apache.paimon.data.columnar.ColumnVector;
2324
import org.apache.paimon.types.RowType;
2425

2526
import org.apache.arrow.c.ArrowArray;
@@ -70,10 +71,19 @@ public ArrowFormatCWriter(ArrowFormatWriter arrowFormatWriter) {
7071
schema = ArrowSchema.allocateNew(allocator);
7172
}
7273

74+
public ArrowFormatWriter formatWriter() {
75+
return realWriter;
76+
}
77+
7378
public boolean write(InternalRow currentRow) {
7479
return realWriter.write(currentRow);
7580
}
7681

82+
public void write(
83+
ColumnVector[] columns, @Nullable int[] pickedInColumn, int startIndex, int batchRows) {
84+
realWriter.write(columns, pickedInColumn, startIndex, batchRows);
85+
}
86+
7787
public ArrowCStruct toCStruct() {
7888
VectorSchemaRoot vectorSchemaRoot = realWriter.getVectorSchemaRoot();
7989
return ArrowUtils.serializeToCStruct(

paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
2525
import org.apache.paimon.arrow.writer.ArrowFieldWriters;
2626
import org.apache.paimon.data.InternalRow;
27+
import org.apache.paimon.data.columnar.ColumnVector;
2728
import org.apache.paimon.types.DataField;
2829
import org.apache.paimon.types.DataType;
2930
import org.apache.paimon.types.RowType;
3031
import org.apache.paimon.types.VariantType;
32+
import org.apache.paimon.utils.Preconditions;
3133

3234
import org.apache.arrow.memory.BufferAllocator;
3335
import org.apache.arrow.memory.RootAllocator;
@@ -187,6 +189,15 @@ public boolean write(InternalRow currentRow) {
187189
return true;
188190
}
189191

192+
public void write(
193+
ColumnVector[] columns, @Nullable int[] pickedInColumn, int startIndex, int batchRows) {
194+
Preconditions.checkState(rowId == 0, "rowId must be 0 before writing columns.");
195+
for (int i = 0; i < columns.length; i++) {
196+
fieldWriters[i].write(columns[i], pickedInColumn, startIndex, batchRows);
197+
}
198+
rowId = batchRows;
199+
}
200+
190201
public long memoryUsed() {
191202
vectorSchemaRoot.setRowCount(rowId);
192203
long memoryUsed = 0;
@@ -213,6 +224,14 @@ public void close() {
213224
allocator.close();
214225
}
215226

227+
public int getBatchSize() {
228+
return batchSize;
229+
}
230+
231+
public ArrowFieldWriter[] getFieldWriters() {
232+
return fieldWriters;
233+
}
234+
216235
public VectorSchemaRoot getVectorSchemaRoot() {
217236
return vectorSchemaRoot;
218237
}

paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowBundleWriter.java

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,18 @@
2323
import org.apache.paimon.arrow.vector.ArrowCStruct;
2424
import org.apache.paimon.arrow.vector.ArrowFormatCWriter;
2525
import org.apache.paimon.data.InternalRow;
26+
import org.apache.paimon.data.columnar.ColumnVector;
27+
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
28+
import org.apache.paimon.deletionvectors.DeletionFileRecordIterator;
29+
import org.apache.paimon.deletionvectors.DeletionVectorJudger;
2630
import org.apache.paimon.format.BundleFormatWriter;
2731
import org.apache.paimon.fs.PositionOutputStream;
2832
import org.apache.paimon.io.BundleRecords;
33+
import org.apache.paimon.io.DeletionFileIteratorRecords;
34+
import org.apache.paimon.io.VectorizedIteratorRecords;
35+
import org.apache.paimon.reader.FileRecordIterator;
36+
import org.apache.paimon.reader.VectorizedRecordIterator;
37+
import org.apache.paimon.utils.IntArrayList;
2938

3039
import org.apache.arrow.c.ArrowArray;
3140
import org.apache.arrow.c.ArrowSchema;
@@ -34,6 +43,8 @@
3443
import org.slf4j.Logger;
3544
import org.slf4j.LoggerFactory;
3645

46+
import javax.annotation.Nullable;
47+
3748
import java.io.IOException;
3849

3950
/** Arrow bundle writer. */
@@ -72,11 +83,23 @@ public void addElement(InternalRow internalRow) {
7283
public void writeBundle(BundleRecords bundleRecords) throws IOException {
7384
if (bundleRecords instanceof ArrowBundleRecords) {
7485
add(((ArrowBundleRecords) bundleRecords).getVectorSchemaRoot());
75-
} else {
76-
for (InternalRow row : bundleRecords) {
77-
addElement(row);
86+
return;
87+
} else if (bundleRecords instanceof VectorizedIteratorRecords) {
88+
add(((VectorizedIteratorRecords) bundleRecords).vectorizedIterator().batch(), null);
89+
return;
90+
} else if (bundleRecords instanceof DeletionFileIteratorRecords) {
91+
DeletionFileRecordIterator iterator =
92+
((DeletionFileIteratorRecords) bundleRecords).deletionFileIterator();
93+
FileRecordIterator<InternalRow> recordIterator = iterator.iterator();
94+
if (recordIterator instanceof VectorizedRecordIterator) {
95+
add(((VectorizedRecordIterator) recordIterator).batch(), iterator.deletionVector());
96+
return;
7897
}
7998
}
99+
100+
for (InternalRow row : bundleRecords) {
101+
addElement(row);
102+
}
80103
}
81104

82105
public void add(VectorSchemaRoot vsr) {
@@ -98,6 +121,47 @@ public void add(VectorSchemaRoot vsr) {
98121
}
99122
}
100123

124+
public void add(
125+
VectorizedColumnBatch batch, @Nullable DeletionVectorJudger deletionVectorJudger) {
126+
if (!arrowFormatWriter.empty()) {
127+
flush();
128+
}
129+
130+
int batchSize = arrowFormatWriter.formatWriter().getBatchSize();
131+
ColumnVector[] columns = batch.columns;
132+
133+
int[] pickedInColumn = null;
134+
int totalNumRows;
135+
136+
if (deletionVectorJudger != null) {
137+
int originNumRows = batch.getNumRows();
138+
IntArrayList picked = new IntArrayList(originNumRows);
139+
for (int i = 0; i < originNumRows; i++) {
140+
if (!deletionVectorJudger.isDeleted(i)) {
141+
picked.add(i);
142+
}
143+
}
144+
if (picked.size() == originNumRows) {
145+
totalNumRows = originNumRows;
146+
} else {
147+
pickedInColumn = picked.toArray();
148+
totalNumRows = pickedInColumn.length;
149+
}
150+
} else {
151+
totalNumRows = batch.getNumRows();
152+
}
153+
154+
int startIndex = 0;
155+
while (startIndex < totalNumRows) {
156+
int batchRows = Math.min(batchSize, totalNumRows - startIndex);
157+
arrowFormatWriter.write(columns, pickedInColumn, startIndex, batchRows);
158+
startIndex += batchRows;
159+
if (startIndex < totalNumRows) {
160+
flush();
161+
}
162+
}
163+
}
164+
101165
@Override
102166
public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
103167
return suggestedCheck && (underlyingStream.getPos() > targetSize);

paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriters.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ protected void doWrite(
105105
for (int i = 0; i < batchRows; i++) {
106106
int row = getRowNumber(startIndex, i, pickedInColumn);
107107
if (columnVector.isNullAt(row)) {
108-
varCharVector.setNull(row);
108+
varCharVector.setNull(i);
109109
} else {
110110
byte[] value = ((BytesColumnVector) columnVector).getBytes(row).getBytes();
111111
varCharVector.setSafe(i, value);
@@ -154,7 +154,7 @@ protected void doWrite(
154154
for (int i = 0; i < batchRows; i++) {
155155
int row = getRowNumber(startIndex, i, pickedInColumn);
156156
if (columnVector.isNullAt(row)) {
157-
bitVector.setNull(row);
157+
bitVector.setNull(i);
158158
} else {
159159
int value = ((BooleanColumnVector) columnVector).getBoolean(row) ? 1 : 0;
160160
bitVector.setSafe(i, value);
@@ -473,7 +473,7 @@ protected void doWrite(
473473
if (columnVector.isNullAt(row)) {
474474
timeMilliVector.setNull(i);
475475
} else {
476-
int value = ((IntColumnVector) columnVector).getInt(i);
476+
int value = ((IntColumnVector) columnVector).getInt(row);
477477
timeMilliVector.setSafe(i, value);
478478
}
479479
}

0 commit comments

Comments
 (0)