Skip to content

Commit 2efac10

Browse files
committed
Spark test
1 parent fb55128 commit 2efac10

10 files changed

+158
-92
lines changed

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.iceberg.arrow.vectorized.BaseBatchReader;
2424
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
2525
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader;
26-
import org.apache.iceberg.data.DeleteFilter;
26+
import org.apache.iceberg.io.datafile.DeleteFilter;
2727
import org.apache.iceberg.parquet.VectorizedReader;
2828
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2929
import org.apache.iceberg.util.Pair;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import java.util.Arrays;
2222
import java.util.function.Predicate;
23-
import org.apache.iceberg.data.DeleteFilter;
2423
import org.apache.iceberg.deletes.PositionDeleteIndex;
24+
import org.apache.iceberg.io.datafile.DeleteFilter;
2525
import org.apache.iceberg.util.Pair;
2626
import org.apache.spark.sql.catalyst.InternalRow;
2727
import org.apache.spark.sql.vectorized.ColumnVector;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.comet.parquet.AbstractColumnReader;
2626
import org.apache.comet.parquet.BatchReader;
2727
import org.apache.iceberg.Schema;
28-
import org.apache.iceberg.data.DeleteFilter;
28+
import org.apache.iceberg.io.datafile.DeleteFilter;
2929
import org.apache.iceberg.parquet.VectorizedReader;
3030
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3131
import org.apache.iceberg.spark.SparkSchemaUtil;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.stream.IntStream;
2525
import org.apache.iceberg.MetadataColumns;
2626
import org.apache.iceberg.Schema;
27-
import org.apache.iceberg.data.DeleteFilter;
27+
import org.apache.iceberg.io.datafile.DeleteFilter;
2828
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
2929
import org.apache.iceberg.parquet.VectorizedReader;
3030
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.arrow.vector.NullCheckingForGet;
2525
import org.apache.iceberg.Schema;
2626
import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
27-
import org.apache.iceberg.data.DeleteFilter;
27+
import org.apache.iceberg.io.datafile.DeleteFilter;
2828
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
2929
import org.apache.iceberg.parquet.VectorizedReader;
3030
import org.apache.iceberg.spark.SparkUtil;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java

+102-74
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,27 @@
1919
package org.apache.iceberg.spark.source;
2020

2121
import java.util.Map;
22-
import java.util.Set;
22+
import org.apache.iceberg.ContentScanTask;
2323
import org.apache.iceberg.FileFormat;
24-
import org.apache.iceberg.MetadataColumns;
2524
import org.apache.iceberg.ScanTask;
2625
import org.apache.iceberg.ScanTaskGroup;
2726
import org.apache.iceberg.Schema;
2827
import org.apache.iceberg.Table;
29-
import org.apache.iceberg.expressions.Expression;
3028
import org.apache.iceberg.io.CloseableIterable;
3129
import org.apache.iceberg.io.InputFile;
30+
import org.apache.iceberg.io.datafile.DataFileServiceRegistry;
31+
import org.apache.iceberg.io.datafile.DeleteFilter;
32+
import org.apache.iceberg.io.datafile.ReaderBuilder;
33+
import org.apache.iceberg.io.datafile.ReaderService;
34+
import org.apache.iceberg.io.datafile.ServiceBase;
3235
import org.apache.iceberg.orc.ORC;
3336
import org.apache.iceberg.parquet.Parquet;
34-
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
3537
import org.apache.iceberg.spark.OrcBatchReadConf;
3638
import org.apache.iceberg.spark.ParquetBatchReadConf;
3739
import org.apache.iceberg.spark.ParquetReaderType;
3840
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
3941
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
40-
import org.apache.iceberg.types.TypeUtil;
42+
import org.apache.spark.sql.catalyst.InternalRow;
4143
import org.apache.spark.sql.vectorized.ColumnarBatch;
4244

4345
abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {
@@ -58,83 +60,109 @@ abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBa
5860
}
5961

6062
protected CloseableIterable<ColumnarBatch> newBatchIterable(
61-
InputFile inputFile,
62-
FileFormat format,
63-
long start,
64-
long length,
65-
Expression residual,
66-
Map<Integer, ?> idToConstant,
67-
SparkDeleteFilter deleteFilter) {
68-
switch (format) {
69-
case PARQUET:
70-
return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter);
63+
InputFile inputFile, ContentScanTask<?> task, Table table, SparkDeleteFilter deleteFilter) {
64+
ReaderBuilder<?> readerBuilder =
65+
DataFileServiceRegistry.read(
66+
task.file().format(),
67+
InternalRow.class.getName(),
68+
parquetConf != null ? parquetConf.readerType().name() : null,
69+
inputFile,
70+
task,
71+
expectedSchema(),
72+
table,
73+
deleteFilter)
74+
.split(task.start(), task.length())
75+
.filter(task.residual())
76+
.caseSensitive(caseSensitive())
77+
// Spark eagerly consumes the batches. So the underlying memory allocated could be
78+
// reused
79+
// without worrying about subsequent reads clobbering over each other. This improves
80+
// read performance as every batch read doesn't have to pay the cost of allocating
81+
// memory.
82+
.reuseContainers()
83+
.withNameMapping(nameMapping());
84+
if (parquetConf != null) {
85+
readerBuilder = readerBuilder.recordsPerBatch(parquetConf.batchSize());
86+
} else if (orcConf != null) {
87+
readerBuilder = readerBuilder.recordsPerBatch(orcConf.batchSize());
88+
}
89+
90+
return readerBuilder.build();
91+
}
7192

72-
case ORC:
73-
return newOrcIterable(inputFile, start, length, residual, idToConstant);
93+
public static class IcebergParquetReaderService extends ServiceBase implements ReaderService {
94+
@SuppressWarnings("checkstyle:RedundantModifier")
95+
public IcebergParquetReaderService() {
96+
super(FileFormat.PARQUET, InternalRow.class.getName(), ParquetReaderType.ICEBERG.name());
97+
}
7498

75-
default:
76-
throw new UnsupportedOperationException(
77-
"Format: " + format + " not supported for batched reads");
99+
@Override
100+
public ReaderBuilder<?> builder(
101+
InputFile inputFile,
102+
ContentScanTask<?> task,
103+
Schema readSchema,
104+
Table table,
105+
DeleteFilter<?> deleteFilter) {
106+
// get required schema if there are deletes
107+
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : readSchema;
108+
return Parquet.read(inputFile)
109+
.project(requiredSchema)
110+
.createBatchedReaderFunc(
111+
fileSchema ->
112+
VectorizedSparkParquetReaders.buildReader(
113+
requiredSchema,
114+
fileSchema,
115+
constantsMap(task, readSchema, table),
116+
(DeleteFilter<InternalRow>) deleteFilter));
78117
}
79118
}
80119

81-
private CloseableIterable<ColumnarBatch> newParquetIterable(
82-
InputFile inputFile,
83-
long start,
84-
long length,
85-
Expression residual,
86-
Map<Integer, ?> idToConstant,
87-
SparkDeleteFilter deleteFilter) {
88-
// get required schema if there are deletes
89-
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();
120+
public static class CometParquetReaderService extends ServiceBase implements ReaderService {
121+
@SuppressWarnings("checkstyle:RedundantModifier")
122+
public CometParquetReaderService() {
123+
super(FileFormat.PARQUET, InternalRow.class.getName(), ParquetReaderType.COMET.name());
124+
}
90125

91-
return Parquet.read(inputFile)
92-
.project(requiredSchema)
93-
.split(start, length)
94-
.createBatchedReaderFunc(
95-
fileSchema -> {
96-
if (parquetConf.readerType() == ParquetReaderType.COMET) {
97-
return VectorizedSparkParquetReaders.buildCometReader(
98-
requiredSchema, fileSchema, idToConstant, deleteFilter);
99-
} else {
100-
return VectorizedSparkParquetReaders.buildReader(
101-
requiredSchema, fileSchema, idToConstant, deleteFilter);
102-
}
103-
})
104-
.recordsPerBatch(parquetConf.batchSize())
105-
.filter(residual)
106-
.caseSensitive(caseSensitive())
107-
// Spark eagerly consumes the batches. So the underlying memory allocated could be reused
108-
// without worrying about subsequent reads clobbering over each other. This improves
109-
// read performance as every batch read doesn't have to pay the cost of allocating memory.
110-
.reuseContainers()
111-
.withNameMapping(nameMapping())
112-
.build();
126+
@Override
127+
public ReaderBuilder<?> builder(
128+
InputFile inputFile,
129+
ContentScanTask<?> task,
130+
Schema readSchema,
131+
Table table,
132+
DeleteFilter<?> deleteFilter) {
133+
// get required schema if there are deletes
134+
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : readSchema;
135+
return Parquet.read(inputFile)
136+
.project(requiredSchema)
137+
.createBatchedReaderFunc(
138+
fileSchema ->
139+
VectorizedSparkParquetReaders.buildCometReader(
140+
requiredSchema,
141+
fileSchema,
142+
constantsMap(task, readSchema, table),
143+
(DeleteFilter<InternalRow>) deleteFilter));
144+
}
113145
}
114146

115-
private CloseableIterable<ColumnarBatch> newOrcIterable(
116-
InputFile inputFile,
117-
long start,
118-
long length,
119-
Expression residual,
120-
Map<Integer, ?> idToConstant) {
121-
Set<Integer> constantFieldIds = idToConstant.keySet();
122-
Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
123-
Sets.SetView<Integer> constantAndMetadataFieldIds =
124-
Sets.union(constantFieldIds, metadataFieldIds);
125-
Schema schemaWithoutConstantAndMetadataFields =
126-
TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds);
147+
public static class ORCReaderService extends ServiceBase implements ReaderService {
148+
@SuppressWarnings("checkstyle:RedundantModifier")
149+
public ORCReaderService() {
150+
super(FileFormat.ORC, InternalRow.class.getName());
151+
}
127152

128-
return ORC.read(inputFile)
129-
.project(schemaWithoutConstantAndMetadataFields)
130-
.split(start, length)
131-
.createBatchedReaderFunc(
132-
fileSchema ->
133-
VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant))
134-
.recordsPerBatch(orcConf.batchSize())
135-
.filter(residual)
136-
.caseSensitive(caseSensitive())
137-
.withNameMapping(nameMapping())
138-
.build();
153+
@Override
154+
public ReaderBuilder<?> builder(
155+
InputFile inputFile,
156+
ContentScanTask<?> task,
157+
Schema readSchema,
158+
Table table,
159+
DeleteFilter<?> deleteFilter) {
160+
Map<Integer, ?> idToConstant = constantsMap(task, readSchema, table);
161+
return ORC.read(inputFile)
162+
.project(ORC.schemaWithoutConstantAndMetadataFields(readSchema, idToConstant))
163+
.createBatchedReaderFunc(
164+
fileSchema ->
165+
VectorizedSparkOrcReaders.buildReader(readSchema, fileSchema, idToConstant));
166+
}
139167
}
140168
}

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,13 @@ private Map<String, InputFile> inputFiles() {
182182
}
183183

184184
protected Map<Integer, ?> constantsMap(ContentScanTask<?> task, Schema readSchema) {
185+
return constantsMap(task, readSchema, table);
186+
}
187+
188+
protected static Map<Integer, ?> constantsMap(
189+
ContentScanTask<?> task, Schema readSchema, Table tableToRead) {
185190
if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
186-
StructType partitionType = Partitioning.partitionType(table);
191+
StructType partitionType = Partitioning.partitionType(tableToRead);
187192
return PartitionUtil.constantsMap(task, partitionType, SparkUtil::internalToSpark);
188193
} else {
189194
return PartitionUtil.constantsMap(task, SparkUtil::internalToSpark);

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java

+1-12
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.iceberg.spark.source;
2020

21-
import java.util.Map;
2221
import java.util.stream.Stream;
2322
import org.apache.iceberg.ContentFile;
2423
import org.apache.iceberg.FileScanTask;
@@ -95,8 +94,6 @@ protected CloseableIterator<ColumnarBatch> open(FileScanTask task) {
9594
// update the current file for Spark's filename() function
9695
InputFileBlockHolder.set(filePath, task.start(), task.length());
9796

98-
Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
99-
10097
InputFile inputFile = getInputFile(filePath);
10198
Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");
10299

@@ -105,14 +102,6 @@ protected CloseableIterator<ColumnarBatch> open(FileScanTask task) {
105102
? null
106103
: new SparkDeleteFilter(filePath, task.deletes(), counter(), false);
107104

108-
return newBatchIterable(
109-
inputFile,
110-
task.file().format(),
111-
task.start(),
112-
task.length(),
113-
task.residual(),
114-
idToConstant,
115-
deleteFilter)
116-
.iterator();
105+
return newBatchIterable(inputFile, task, table(), deleteFilter).iterator();
117106
}
118107
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
org.apache.iceberg.spark.source.BaseBatchReader$IcebergParquetReaderService
21+
org.apache.iceberg.spark.source.BaseBatchReader$CometParquetReaderService
22+
org.apache.iceberg.spark.source.BaseBatchReader$ORCReaderService
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
org.apache.iceberg.spark.source.BaseBatchReader$IcebergParquetReaderService
21+
org.apache.iceberg.spark.source.BaseBatchReader$CometParquetReaderService
22+
org.apache.iceberg.spark.source.BaseBatchReader$ORCReaderService

0 commit comments

Comments
 (0)