Skip to content

Commit 1c842e5

Browse files
committed
Spark test
1 parent 614fccb commit 1c842e5

10 files changed

+170
-85
lines changed

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.iceberg.arrow.vectorized.BaseBatchReader;
2424
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
2525
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader;
26-
import org.apache.iceberg.data.DeleteFilter;
26+
import org.apache.iceberg.io.datafile.DeleteFilter;
2727
import org.apache.iceberg.parquet.VectorizedReader;
2828
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2929
import org.apache.iceberg.util.Pair;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import java.util.Arrays;
2222
import java.util.function.Predicate;
23-
import org.apache.iceberg.data.DeleteFilter;
2423
import org.apache.iceberg.deletes.PositionDeleteIndex;
24+
import org.apache.iceberg.io.datafile.DeleteFilter;
2525
import org.apache.iceberg.util.Pair;
2626
import org.apache.spark.sql.catalyst.InternalRow;
2727
import org.apache.spark.sql.vectorized.ColumnVector;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.comet.parquet.AbstractColumnReader;
2626
import org.apache.comet.parquet.BatchReader;
2727
import org.apache.iceberg.Schema;
28-
import org.apache.iceberg.data.DeleteFilter;
28+
import org.apache.iceberg.io.datafile.DeleteFilter;
2929
import org.apache.iceberg.parquet.VectorizedReader;
3030
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3131
import org.apache.iceberg.spark.SparkSchemaUtil;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.stream.IntStream;
2525
import org.apache.iceberg.MetadataColumns;
2626
import org.apache.iceberg.Schema;
27-
import org.apache.iceberg.data.DeleteFilter;
27+
import org.apache.iceberg.io.datafile.DeleteFilter;
2828
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
2929
import org.apache.iceberg.parquet.VectorizedReader;
3030
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.arrow.vector.NullCheckingForGet;
2525
import org.apache.iceberg.Schema;
2626
import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
27-
import org.apache.iceberg.data.DeleteFilter;
27+
import org.apache.iceberg.io.datafile.DeleteFilter;
2828
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
2929
import org.apache.iceberg.parquet.VectorizedReader;
3030
import org.apache.iceberg.spark.SparkUtil;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java

+106-72
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,27 @@
1919
package org.apache.iceberg.spark.source;
2020

2121
import java.util.Map;
22-
import java.util.Set;
22+
import org.apache.iceberg.ContentScanTask;
2323
import org.apache.iceberg.FileFormat;
24-
import org.apache.iceberg.MetadataColumns;
2524
import org.apache.iceberg.ScanTask;
2625
import org.apache.iceberg.ScanTaskGroup;
2726
import org.apache.iceberg.Schema;
2827
import org.apache.iceberg.Table;
29-
import org.apache.iceberg.expressions.Expression;
3028
import org.apache.iceberg.io.CloseableIterable;
3129
import org.apache.iceberg.io.InputFile;
30+
import org.apache.iceberg.io.datafile.DataFileServiceRegistry;
31+
import org.apache.iceberg.io.datafile.DeleteFilter;
32+
import org.apache.iceberg.io.datafile.ReaderBuilder;
33+
import org.apache.iceberg.io.datafile.ServiceBase;
3234
import org.apache.iceberg.orc.ORC;
3335
import org.apache.iceberg.parquet.Parquet;
34-
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
3536
import org.apache.iceberg.spark.OrcBatchReadConf;
3637
import org.apache.iceberg.spark.ParquetBatchReadConf;
3738
import org.apache.iceberg.spark.ParquetReaderType;
3839
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
3940
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
40-
import org.apache.iceberg.types.TypeUtil;
41+
import org.apache.iceberg.types.Types;
42+
import org.apache.spark.sql.catalyst.InternalRow;
4143
import org.apache.spark.sql.vectorized.ColumnarBatch;
4244

4345
abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {
@@ -59,82 +61,114 @@ abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBa
5961

6062
protected CloseableIterable<ColumnarBatch> newBatchIterable(
6163
InputFile inputFile,
62-
FileFormat format,
63-
long start,
64-
long length,
65-
Expression residual,
66-
Map<Integer, ?> idToConstant,
64+
ContentScanTask<?> task,
65+
Types.StructType unifiedPartitionType,
6766
SparkDeleteFilter deleteFilter) {
68-
switch (format) {
69-
case PARQUET:
70-
return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter);
67+
ReaderBuilder<?> readerBuilder =
68+
DataFileServiceRegistry.read(
69+
task.file().format(),
70+
InternalRow.class.getName(),
71+
parquetConf != null ? parquetConf.readerType().name() : null,
72+
inputFile,
73+
task,
74+
expectedSchema(),
75+
unifiedPartitionType,
76+
deleteFilter)
77+
.split(task.start(), task.length())
78+
.filter(task.residual())
79+
.caseSensitive(caseSensitive())
80+
// Spark eagerly consumes the batches. So the underlying memory allocated could be
81+
// reused
82+
// without worrying about subsequent reads clobbering over each other. This improves
83+
// read performance as every batch read doesn't have to pay the cost of allocating
84+
// memory.
85+
.reuseContainers()
86+
.withNameMapping(nameMapping());
87+
if (parquetConf != null) {
88+
readerBuilder = readerBuilder.recordsPerBatch(parquetConf.batchSize());
89+
} else if (orcConf != null) {
90+
readerBuilder = readerBuilder.recordsPerBatch(orcConf.batchSize());
91+
}
92+
93+
return readerBuilder.build();
94+
}
7195

72-
case ORC:
73-
return newOrcIterable(inputFile, start, length, residual, idToConstant);
96+
public static class IcebergParquetReaderService extends ServiceBase
97+
implements org.apache.iceberg.io.datafile.ReaderService {
98+
@SuppressWarnings("checkstyle:RedundantModifier")
99+
public IcebergParquetReaderService() {
100+
super(FileFormat.PARQUET, InternalRow.class.getName(), ParquetReaderType.ICEBERG.name());
101+
}
74102

75-
default:
76-
throw new UnsupportedOperationException(
77-
"Format: " + format + " not supported for batched reads");
103+
@Override
104+
public ReaderBuilder<?> builder(
105+
InputFile inputFile,
106+
ContentScanTask<?> task,
107+
Schema readSchema,
108+
Types.StructType unifiedPartitionType,
109+
DeleteFilter<?> deleteFilter) {
110+
// get required schema if there are deletes
111+
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : readSchema;
112+
return Parquet.read(inputFile)
113+
.project(requiredSchema)
114+
.createBatchedReaderFunc(
115+
fileSchema ->
116+
VectorizedSparkParquetReaders.buildReader(
117+
requiredSchema,
118+
fileSchema,
119+
constantsMap(task, readSchema, unifiedPartitionType),
120+
(DeleteFilter<InternalRow>) deleteFilter));
78121
}
79122
}
80123

81-
private CloseableIterable<ColumnarBatch> newParquetIterable(
82-
InputFile inputFile,
83-
long start,
84-
long length,
85-
Expression residual,
86-
Map<Integer, ?> idToConstant,
87-
SparkDeleteFilter deleteFilter) {
88-
// get required schema if there are deletes
89-
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();
124+
public static class CometParquetReaderService extends ServiceBase
125+
implements org.apache.iceberg.io.datafile.ReaderService {
126+
@SuppressWarnings("checkstyle:RedundantModifier")
127+
public CometParquetReaderService() {
128+
super(FileFormat.PARQUET, InternalRow.class.getName(), ParquetReaderType.COMET.name());
129+
}
90130

91-
return Parquet.read(inputFile)
92-
.project(requiredSchema)
93-
.split(start, length)
94-
.createBatchedReaderFunc(
95-
fileSchema -> {
96-
if (parquetConf.readerType() == ParquetReaderType.COMET) {
97-
return VectorizedSparkParquetReaders.buildCometReader(
98-
requiredSchema, fileSchema, idToConstant, deleteFilter);
99-
} else {
100-
return VectorizedSparkParquetReaders.buildReader(
101-
requiredSchema, fileSchema, idToConstant, deleteFilter);
102-
}
103-
})
104-
.recordsPerBatch(parquetConf.batchSize())
105-
.filter(residual)
106-
.caseSensitive(caseSensitive())
107-
// Spark eagerly consumes the batches. So the underlying memory allocated could be reused
108-
// without worrying about subsequent reads clobbering over each other. This improves
109-
// read performance as every batch read doesn't have to pay the cost of allocating memory.
110-
.reuseContainers()
111-
.withNameMapping(nameMapping())
112-
.build();
131+
@Override
132+
public ReaderBuilder<?> builder(
133+
InputFile inputFile,
134+
ContentScanTask<?> task,
135+
Schema readSchema,
136+
Types.StructType unifiedPartitionType,
137+
DeleteFilter<?> deleteFilter) {
138+
// get required schema if there are deletes
139+
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : readSchema;
140+
return Parquet.read(inputFile)
141+
.project(requiredSchema)
142+
.createBatchedReaderFunc(
143+
fileSchema ->
144+
VectorizedSparkParquetReaders.buildCometReader(
145+
requiredSchema,
146+
fileSchema,
147+
constantsMap(task, readSchema, unifiedPartitionType),
148+
(DeleteFilter<InternalRow>) deleteFilter));
149+
}
113150
}
114151

115-
private CloseableIterable<ColumnarBatch> newOrcIterable(
116-
InputFile inputFile,
117-
long start,
118-
long length,
119-
Expression residual,
120-
Map<Integer, ?> idToConstant) {
121-
Set<Integer> constantFieldIds = idToConstant.keySet();
122-
Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
123-
Sets.SetView<Integer> constantAndMetadataFieldIds =
124-
Sets.union(constantFieldIds, metadataFieldIds);
125-
Schema schemaWithoutConstantAndMetadataFields =
126-
TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds);
152+
public static class ORCReaderService extends ServiceBase
153+
implements org.apache.iceberg.io.datafile.ReaderService {
154+
@SuppressWarnings("checkstyle:RedundantModifier")
155+
public ORCReaderService() {
156+
super(FileFormat.ORC, InternalRow.class.getName());
157+
}
127158

128-
return ORC.read(inputFile)
129-
.project(schemaWithoutConstantAndMetadataFields)
130-
.split(start, length)
131-
.createBatchedReaderFunc(
132-
fileSchema ->
133-
VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant))
134-
.recordsPerBatch(orcConf.batchSize())
135-
.filter(residual)
136-
.caseSensitive(caseSensitive())
137-
.withNameMapping(nameMapping())
138-
.build();
159+
@Override
160+
public ReaderBuilder<?> builder(
161+
InputFile inputFile,
162+
ContentScanTask<?> task,
163+
Schema readSchema,
164+
Types.StructType unifiedPartitionType,
165+
DeleteFilter<?> deleteFilter) {
166+
Map<Integer, ?> idToConstant = constantsMap(task, readSchema, unifiedPartitionType);
167+
return ORC.read(inputFile)
168+
.project(ORC.schemaWithoutConstantAndMetadataFields(readSchema, idToConstant))
169+
.createBatchedReaderFunc(
170+
fileSchema ->
171+
VectorizedSparkOrcReaders.buildReader(readSchema, fileSchema, idToConstant));
172+
}
139173
}
140174
}

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java

+9
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,15 @@ private Map<String, InputFile> inputFiles() {
190190
}
191191
}
192192

193+
protected static Map<Integer, ?> constantsMap(
194+
ContentScanTask<?> task, Schema readSchema, StructType unifiedPartitionType) {
195+
if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
196+
return PartitionUtil.constantsMap(task, unifiedPartitionType, SparkUtil::internalToSpark);
197+
} else {
198+
return PartitionUtil.constantsMap(task, SparkUtil::internalToSpark);
199+
}
200+
}
201+
193202
protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
194203
private final InternalRowWrapper asStructLike;
195204

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.stream.Stream;
2323
import org.apache.iceberg.ContentFile;
2424
import org.apache.iceberg.FileScanTask;
25+
import org.apache.iceberg.Partitioning;
2526
import org.apache.iceberg.ScanTaskGroup;
2627
import org.apache.iceberg.Schema;
2728
import org.apache.iceberg.Table;
@@ -105,14 +106,7 @@ protected CloseableIterator<ColumnarBatch> open(FileScanTask task) {
105106
? null
106107
: new SparkDeleteFilter(filePath, task.deletes(), counter(), false);
107108

108-
return newBatchIterable(
109-
inputFile,
110-
task.file().format(),
111-
task.start(),
112-
task.length(),
113-
task.residual(),
114-
idToConstant,
115-
deleteFilter)
109+
return newBatchIterable(inputFile, task, Partitioning.partitionType(table()), deleteFilter)
116110
.iterator();
117111
}
118112
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#
2+
# /*
3+
# * Licensed to the Apache Software Foundation (ASF) under one
4+
# * or more contributor license agreements. See the NOTICE file
5+
# * distributed with this work for additional information
6+
# * regarding copyright ownership. The ASF licenses this file
7+
# * to you under the Apache License, Version 2.0 (the
8+
# * "License"); you may not use this file except in compliance
9+
# * with the License. You may obtain a copy of the License at
10+
# *
11+
# * http://www.apache.org/licenses/LICENSE-2.0
12+
# *
13+
# * Unless required by applicable law or agreed to in writing,
14+
# * software distributed under the License is distributed on an
15+
# * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# * KIND, either express or implied. See the License for the
17+
# * specific language governing permissions and limitations
18+
# * under the License.
19+
# */
20+
#
21+
22+
org.apache.iceberg.spark.source.BaseBatchReader$IcebergParquetReaderService
23+
org.apache.iceberg.spark.source.BaseBatchReader$CometParquetReaderService
24+
org.apache.iceberg.spark.source.BaseBatchReader$ORCReaderService
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#
2+
# /*
3+
# * Licensed to the Apache Software Foundation (ASF) under one
4+
# * or more contributor license agreements. See the NOTICE file
5+
# * distributed with this work for additional information
6+
# * regarding copyright ownership. The ASF licenses this file
7+
# * to you under the Apache License, Version 2.0 (the
8+
# * "License"); you may not use this file except in compliance
9+
# * with the License. You may obtain a copy of the License at
10+
# *
11+
# * http://www.apache.org/licenses/LICENSE-2.0
12+
# *
13+
# * Unless required by applicable law or agreed to in writing,
14+
# * software distributed under the License is distributed on an
15+
# * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# * KIND, either express or implied. See the License for the
17+
# * specific language governing permissions and limitations
18+
# * under the License.
19+
# */
20+
#
21+
22+
org.apache.iceberg.spark.source.BaseBatchReader$IcebergParquetReaderService
23+
org.apache.iceberg.spark.source.BaseBatchReader$CometParquetReaderService
24+
org.apache.iceberg.spark.source.BaseBatchReader$ORCReaderService

0 commit comments

Comments
 (0)