Skip to content

Commit c091c85

Browse files
committed
fix
1 parent df6ea8a commit c091c85

8 files changed

Lines changed: 216 additions & 296 deletions

File tree

paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowBundleWriter.java

Lines changed: 10 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,10 @@
2525
import org.apache.paimon.data.InternalRow;
2626
import org.apache.paimon.data.columnar.ColumnVector;
2727
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
28-
import org.apache.paimon.deletionvectors.DeletionFileRecordIterator;
29-
import org.apache.paimon.deletionvectors.DeletionVectorJudger;
3028
import org.apache.paimon.format.BundleFormatWriter;
3129
import org.apache.paimon.fs.PositionOutputStream;
3230
import org.apache.paimon.io.BundleRecords;
33-
import org.apache.paimon.io.DeletionFileIteratorRecords;
34-
import org.apache.paimon.io.VectorizedIteratorRecords;
35-
import org.apache.paimon.reader.FileRecordIterator;
36-
import org.apache.paimon.reader.VectorizedRecordIterator;
37-
import org.apache.paimon.utils.IntArrayList;
31+
import org.apache.paimon.io.VectorizedBundleRecords;
3832

3933
import org.apache.arrow.c.ArrowArray;
4034
import org.apache.arrow.c.ArrowSchema;
@@ -83,23 +77,14 @@ public void addElement(InternalRow internalRow) {
8377
public void writeBundle(BundleRecords bundleRecords) throws IOException {
8478
if (bundleRecords instanceof ArrowBundleRecords) {
8579
add(((ArrowBundleRecords) bundleRecords).getVectorSchemaRoot());
86-
return;
87-
} else if (bundleRecords instanceof VectorizedIteratorRecords) {
88-
add(((VectorizedIteratorRecords) bundleRecords).vectorizedIterator().batch(), null);
89-
return;
90-
} else if (bundleRecords instanceof DeletionFileIteratorRecords) {
91-
DeletionFileRecordIterator iterator =
92-
((DeletionFileIteratorRecords) bundleRecords).deletionFileIterator();
93-
FileRecordIterator<InternalRow> recordIterator = iterator.iterator();
94-
if (recordIterator instanceof VectorizedRecordIterator) {
95-
add(((VectorizedRecordIterator) recordIterator).batch(), iterator.deletionVector());
96-
return;
80+
} else if (bundleRecords instanceof VectorizedBundleRecords) {
81+
VectorizedBundleRecords records = (VectorizedBundleRecords) bundleRecords;
82+
add(records.batch(), records.selected());
83+
} else {
84+
for (InternalRow row : bundleRecords) {
85+
addElement(row);
9786
}
9887
}
99-
100-
for (InternalRow row : bundleRecords) {
101-
addElement(row);
102-
}
10388
}
10489

10590
public void add(VectorSchemaRoot vsr) {
@@ -121,40 +106,19 @@ public void add(VectorSchemaRoot vsr) {
121106
}
122107
}
123108

124-
public void add(
125-
VectorizedColumnBatch batch, @Nullable DeletionVectorJudger deletionVectorJudger) {
109+
public void add(VectorizedColumnBatch batch, @Nullable int[] selected) {
126110
if (!arrowFormatWriter.empty()) {
127111
flush();
128112
}
129113

130114
int batchSize = arrowFormatWriter.formatWriter().getBatchSize();
131115
ColumnVector[] columns = batch.columns;
132-
133-
int[] pickedInColumn = null;
134-
int totalNumRows;
135-
136-
if (deletionVectorJudger != null) {
137-
int originNumRows = batch.getNumRows();
138-
IntArrayList picked = new IntArrayList(originNumRows);
139-
for (int i = 0; i < originNumRows; i++) {
140-
if (!deletionVectorJudger.isDeleted(i)) {
141-
picked.add(i);
142-
}
143-
}
144-
if (picked.size() == originNumRows) {
145-
totalNumRows = originNumRows;
146-
} else {
147-
pickedInColumn = picked.toArray();
148-
totalNumRows = pickedInColumn.length;
149-
}
150-
} else {
151-
totalNumRows = batch.getNumRows();
152-
}
116+
int totalNumRows = selected != null ? selected.length : batch.getNumRows();
153117

154118
int startIndex = 0;
155119
while (startIndex < totalNumRows) {
156120
int batchRows = Math.min(batchSize, totalNumRows - startIndex);
157-
arrowFormatWriter.write(columns, pickedInColumn, startIndex, batchRows);
121+
arrowFormatWriter.write(columns, selected, startIndex, batchRows);
158122
startIndex += batchRows;
159123
if (startIndex < totalNumRows) {
160124
flush();

paimon-arrow/src/test/java/org/apache/paimon/arrow/writer/ArrowBundleWriterTest.java

Lines changed: 12 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.paimon.data.columnar.heap.HeapLongVector;
2828
import org.apache.paimon.data.columnar.heap.HeapMapVector;
2929
import org.apache.paimon.data.columnar.heap.HeapRowVector;
30-
import org.apache.paimon.deletionvectors.DeletionVectorJudger;
3130
import org.apache.paimon.fs.PositionOutputStream;
3231
import org.apache.paimon.types.DataTypes;
3332
import org.apache.paimon.types.RowType;
@@ -40,9 +39,7 @@
4039
import java.io.IOException;
4140
import java.util.ArrayList;
4241
import java.util.Arrays;
43-
import java.util.HashSet;
4442
import java.util.List;
45-
import java.util.Set;
4643

4744
import static org.assertj.core.api.Assertions.assertThat;
4845

@@ -105,23 +102,8 @@ public void testAddBatchWithDeletionVector() throws IOException {
105102
new VectorizedColumnBatch(new ColumnVector[] {intVector, longVector});
106103
batch.setNumRows(4);
107104

108-
Set<Long> deleted = new HashSet<>();
109-
deleted.add(1L);
110-
deleted.add(3L);
111-
DeletionVectorJudger judger =
112-
new DeletionVectorJudger() {
113-
@Override
114-
public boolean isDeleted(long position) {
115-
return deleted.contains(position);
116-
}
117-
118-
@Override
119-
public long getCardinality() {
120-
return deleted.size();
121-
}
122-
};
123-
124-
writer.add(batch, judger);
105+
// pick rows 0 and 2 (skip 1 and 3)
106+
writer.add(batch, new int[] {0, 2});
125107
writer.close();
126108

127109
assertThat(nativeWriter.snapshots).hasSize(1);
@@ -151,20 +133,8 @@ public void testAddBatchWithAllDeleted() throws IOException {
151133
VectorizedColumnBatch batch = new VectorizedColumnBatch(new ColumnVector[] {intVector});
152134
batch.setNumRows(3);
153135

154-
DeletionVectorJudger judger =
155-
new DeletionVectorJudger() {
156-
@Override
157-
public boolean isDeleted(long position) {
158-
return true;
159-
}
160-
161-
@Override
162-
public long getCardinality() {
163-
return 3;
164-
}
165-
};
166-
167-
writer.add(batch, judger);
136+
// all deleted -> empty picked array
137+
writer.add(batch, new int[] {});
168138
writer.close();
169139

170140
assertThat(nativeWriter.snapshots).isEmpty();
@@ -190,20 +160,8 @@ public void testAddBatchWithNoneDeleted() throws IOException {
190160
VectorizedColumnBatch batch = new VectorizedColumnBatch(new ColumnVector[] {intVector});
191161
batch.setNumRows(3);
192162

193-
DeletionVectorJudger judger =
194-
new DeletionVectorJudger() {
195-
@Override
196-
public boolean isDeleted(long position) {
197-
return false;
198-
}
199-
200-
@Override
201-
public long getCardinality() {
202-
return 0;
203-
}
204-
};
205-
206-
writer.add(batch, judger);
163+
// none deleted -> pick all rows
164+
writer.add(batch, new int[] {0, 1, 2});
207165
writer.close();
208166

209167
assertThat(nativeWriter.snapshots).hasSize(1);
@@ -249,8 +207,7 @@ public void testAddBatchLargerThanWriterBatchSize() throws IOException {
249207
@Test
250208
public void testAddBatchLargerThanWriterBatchSizeWithDeletion() throws IOException {
251209
RowType rowType = RowType.of(DataTypes.INT());
252-
// batchSize = 2, input has 6 rows, delete rows 1, 3, 5 -> remaining [0,2,4] -> values
253-
// [1,3,5]
210+
// batchSize = 2, input has 6 rows, pick rows 0,2,4 -> values [1,3,5]
254211
ArrowFormatCWriter cWriter = new ArrowFormatCWriter(rowType, 2, true);
255212
ArrowFormatWriter formatWriter = cWriter.formatWriter();
256213
VectorSchemaRoot vsr = formatWriter.getVectorSchemaRoot();
@@ -268,20 +225,7 @@ public void testAddBatchLargerThanWriterBatchSizeWithDeletion() throws IOExcepti
268225
VectorizedColumnBatch batch = new VectorizedColumnBatch(new ColumnVector[] {intVector});
269226
batch.setNumRows(6);
270227

271-
DeletionVectorJudger judger =
272-
new DeletionVectorJudger() {
273-
@Override
274-
public boolean isDeleted(long position) {
275-
return position % 2 == 1;
276-
}
277-
278-
@Override
279-
public long getCardinality() {
280-
return 3;
281-
}
282-
};
283-
284-
writer.add(batch, judger);
228+
writer.add(batch, new int[] {0, 2, 4});
285229
writer.close();
286230

287231
// 3 remaining rows, batchSize=2 -> 2 batches: 2 + 1
@@ -384,38 +328,25 @@ public void testMultipleSmallBatchesWithDeletion() throws IOException {
384328

385329
ArrowBundleWriter writer = new ArrowBundleWriter(outputStream, cWriter, nativeWriter);
386330

387-
DeletionVectorJudger judger =
388-
new DeletionVectorJudger() {
389-
@Override
390-
public boolean isDeleted(long position) {
391-
return position % 2 == 1;
392-
}
393-
394-
@Override
395-
public long getCardinality() {
396-
return 2;
397-
}
398-
};
399-
400-
// batch 1: [10, 20, 30, 40], delete index 1,3 -> remaining [10, 30]
331+
// batch 1: [10, 20, 30, 40], pick index 0,2 -> remaining [10, 30]
401332
HeapIntVector iv1 = new HeapIntVector(4);
402333
iv1.setInt(0, 10);
403334
iv1.setInt(1, 20);
404335
iv1.setInt(2, 30);
405336
iv1.setInt(3, 40);
406337
VectorizedColumnBatch b1 = new VectorizedColumnBatch(new ColumnVector[] {iv1});
407338
b1.setNumRows(4);
408-
writer.add(b1, judger);
339+
writer.add(b1, new int[] {0, 2});
409340

410-
// batch 2: [50, 60, 70, 80], delete index 1,3 -> remaining [50, 70]
341+
// batch 2: [50, 60, 70, 80], pick index 0,2 -> remaining [50, 70]
411342
HeapIntVector iv2 = new HeapIntVector(4);
412343
iv2.setInt(0, 50);
413344
iv2.setInt(1, 60);
414345
iv2.setInt(2, 70);
415346
iv2.setInt(3, 80);
416347
VectorizedColumnBatch b2 = new VectorizedColumnBatch(new ColumnVector[] {iv2});
417348
b2.setNumRows(4);
418-
writer.add(b2, judger);
349+
writer.add(b2, new int[] {0, 2});
419350

420351
writer.close();
421352

paimon-common/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorJudger.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,4 @@ public interface DeletionVectorJudger {
2727
* @return true if the row is marked as deleted, false otherwise.
2828
*/
2929
boolean isDeleted(long position);
30-
31-
/** @return the number of distinct integers added to the DeletionVector. */
32-
long getCardinality();
3330
}

paimon-common/src/main/java/org/apache/paimon/io/DeletionFileIteratorRecords.java

Lines changed: 0 additions & 52 deletions
This file was deleted.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.io;
20+
21+
import org.apache.paimon.data.InternalRow;
22+
import org.apache.paimon.data.columnar.ColumnarRow;
23+
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
24+
25+
import javax.annotation.Nullable;
26+
27+
import java.util.Iterator;
28+
import java.util.NoSuchElementException;
29+
30+
/** {@link BundleRecords} for {@link VectorizedColumnBatch} with optional picked indices. */
31+
public class VectorizedBundleRecords implements BundleRecords {
32+
33+
private final VectorizedColumnBatch batch;
34+
@Nullable private final int[] selected;
35+
36+
public VectorizedBundleRecords(VectorizedColumnBatch batch, @Nullable int[] selected) {
37+
this.batch = batch;
38+
this.selected = selected;
39+
}
40+
41+
public VectorizedColumnBatch batch() {
42+
return batch;
43+
}
44+
45+
@Nullable
46+
public int[] selected() {
47+
return selected;
48+
}
49+
50+
@Override
51+
public long rowCount() {
52+
return selected != null ? selected.length : batch.getNumRows();
53+
}
54+
55+
@Override
56+
public Iterator<InternalRow> iterator() {
57+
ColumnarRow row = new ColumnarRow(batch);
58+
int numRows = selected != null ? selected.length : batch.getNumRows();
59+
return new Iterator<InternalRow>() {
60+
private int index = 0;
61+
62+
@Override
63+
public boolean hasNext() {
64+
return index < numRows;
65+
}
66+
67+
@Override
68+
public InternalRow next() {
69+
if (!hasNext()) {
70+
throw new NoSuchElementException();
71+
}
72+
row.setRowId(selected != null ? selected[index++] : index++);
73+
return row;
74+
}
75+
};
76+
}
77+
}

0 commit comments

Comments
 (0)