Skip to content

Commit 50ed143

Browse files
committed
Move batch size to config
1 parent 0fca6e2 commit 50ed143

File tree

5 files changed

+16
-14
lines changed

5 files changed

+16
-14
lines changed

arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,7 @@ public class ArrowReader extends CloseableGroup {
123123
private final EncryptionManager encryption;
124124
private final int batchSize;
125125
private final boolean reuseContainers;
126-
127-
public static final String ARROW_OBJECT_MODEL = "arrow";
126+
private static final String ARROW_OBJECT_MODEL = "arrow";
128127

129128
public static void register() {
130129
ObjectModelRegistry.registerReader(
@@ -344,7 +343,7 @@ CloseableIterator<ColumnarBatch> open(FileScanTask task) {
344343
ObjectModelRegistry.readBuilder(FileFormat.PARQUET, ARROW_OBJECT_MODEL, location)
345344
.project(expectedSchema)
346345
.split(task.start(), task.length())
347-
.recordsPerBatch(batchSize)
346+
.set(ReadBuilder.RECORDS_PER_BATCH_KEY, String.valueOf(batchSize))
348347
.filter(task.residual(), filterCaseSensitive);
349348

350349
if (reuseContainers) {

core/src/main/java/org/apache/iceberg/io/ReadBuilder.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
* @param <R> type of the reader
3131
*/
3232
public interface ReadBuilder<R extends ReadBuilder<R>> {
33+
/** The key for the batch size in case of vectorized reads. */
34+
String RECORDS_PER_BATCH_KEY = "iceberg.records-per-batch";
35+
3336
/**
3437
* Restricts the read to the given range: [start, start + length).
3538
*
@@ -90,11 +93,6 @@ default R reuseContainers(boolean newReuseContainers) {
9093
return (R) this;
9194
}
9295

93-
/** Sets the batch size for vectorized readers. */
94-
default R recordsPerBatch(int numRowsPerBatch) {
95-
throw new UnsupportedOperationException("Not supported");
96-
}
97-
9896
/**
9997
* Accessors for constant field values. Used for calculating values in the result which are coming
10098
* from metadata, and not coming from the data files themselves. The keys of the map are the

orc/src/main/java/org/apache/iceberg/orc/ORC.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -927,9 +927,10 @@ public ReadBuilder constantFieldAccessors(Map<Integer, ?> newConstantFieldAccess
927927
return this;
928928
}
929929

930-
@Override
930+
@Deprecated
931931
public ReadBuilder recordsPerBatch(int numRecordsPerBatch) {
932932
this.recordsPerBatch = numRecordsPerBatch;
933+
set(RECORDS_PER_BATCH_KEY, String.valueOf(numRecordsPerBatch));
933934
return this;
934935
}
935936

@@ -969,7 +970,7 @@ public <D> CloseableIterable<D> build() {
969970
filterCaseSensitive,
970971
filter,
971972
batchReader,
972-
recordsPerBatch);
973+
conf.getInt(RECORDS_PER_BATCH_KEY, recordsPerBatch));
973974
}
974975
}
975976

parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -1429,9 +1429,10 @@ public B reuseContainers(boolean newReuseContainers) {
14291429
return (B) this;
14301430
}
14311431

1432-
@Override
1432+
@Deprecated
14331433
public B recordsPerBatch(int numRowsPerBatch) {
14341434
this.maxRecordsPerBatch = numRowsPerBatch;
1435+
set(RECORDS_PER_BATCH_KEY, String.valueOf(numRowsPerBatch));
14351436
return (B) this;
14361437
}
14371438

@@ -1593,7 +1594,9 @@ private <D> CloseableIterable<D> buildFunctionBasedReader(ParquetReadOptions opt
15931594
filter,
15941595
reuseContainers,
15951596
filterCaseSensitive,
1596-
maxRecordsPerBatch);
1597+
properties.containsKey(RECORDS_PER_BATCH_KEY)
1598+
? Integer.parseInt(properties.get(RECORDS_PER_BATCH_KEY))
1599+
: maxRecordsPerBatch);
15971600
} else {
15981601
Function<MessageType, ParquetValueReader<?>> readBuilder =
15991602
readerFuncWithSchema != null

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,13 @@ protected CloseableIterable<ColumnarBatch> newBatchIterable(
7878
if (parquetConf != null) {
7979
readBuilder =
8080
readBuilder
81-
.recordsPerBatch(parquetConf.batchSize())
81+
.set(ReadBuilder.RECORDS_PER_BATCH_KEY, String.valueOf(parquetConf.batchSize()))
8282
.set(
8383
VectorizedSparkParquetReaders.PARQUET_READER_TYPE,
8484
parquetConf.readerType().name());
8585
} else if (orcConf != null) {
86-
readBuilder = readBuilder.recordsPerBatch(orcConf.batchSize());
86+
readBuilder =
87+
readBuilder.set(ReadBuilder.RECORDS_PER_BATCH_KEY, String.valueOf(orcConf.batchSize()));
8788
}
8889

8990
if (readBuilder instanceof Parquet.SupportsDeleteFilter<?>) {

0 commit comments

Comments
 (0)