Skip to content

Commit 2d8e0f3

Browse files
committed
Spark test
1 parent 8d1ec06 commit 2d8e0f3

10 files changed

+164
-88
lines changed

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.iceberg.arrow.vectorized.BaseBatchReader;
2424
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
2525
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader;
26-
import org.apache.iceberg.data.DeleteFilter;
26+
import org.apache.iceberg.io.datafile.DeleteFilter;
2727
import org.apache.iceberg.parquet.VectorizedReader;
2828
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2929
import org.apache.iceberg.util.Pair;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import java.util.Arrays;
2222
import java.util.function.Predicate;
23-
import org.apache.iceberg.data.DeleteFilter;
2423
import org.apache.iceberg.deletes.PositionDeleteIndex;
24+
import org.apache.iceberg.io.datafile.DeleteFilter;
2525
import org.apache.iceberg.util.Pair;
2626
import org.apache.spark.sql.catalyst.InternalRow;
2727
import org.apache.spark.sql.vectorized.ColumnVector;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.comet.parquet.AbstractColumnReader;
2626
import org.apache.comet.parquet.BatchReader;
2727
import org.apache.iceberg.Schema;
28-
import org.apache.iceberg.data.DeleteFilter;
28+
import org.apache.iceberg.io.datafile.DeleteFilter;
2929
import org.apache.iceberg.parquet.VectorizedReader;
3030
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3131
import org.apache.iceberg.spark.SparkSchemaUtil;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.stream.IntStream;
2525
import org.apache.iceberg.MetadataColumns;
2626
import org.apache.iceberg.Schema;
27-
import org.apache.iceberg.data.DeleteFilter;
27+
import org.apache.iceberg.io.datafile.DeleteFilter;
2828
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
2929
import org.apache.iceberg.parquet.VectorizedReader;
3030
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.arrow.vector.NullCheckingForGet;
2525
import org.apache.iceberg.Schema;
2626
import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
27-
import org.apache.iceberg.data.DeleteFilter;
27+
import org.apache.iceberg.io.datafile.DeleteFilter;
2828
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
2929
import org.apache.iceberg.parquet.VectorizedReader;
3030
import org.apache.iceberg.spark.SparkUtil;

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java

+104-72
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,28 @@
1919
package org.apache.iceberg.spark.source;
2020

2121
import java.util.Map;
22-
import java.util.Set;
22+
import org.apache.iceberg.ContentScanTask;
2323
import org.apache.iceberg.FileFormat;
24-
import org.apache.iceberg.MetadataColumns;
2524
import org.apache.iceberg.ScanTask;
2625
import org.apache.iceberg.ScanTaskGroup;
2726
import org.apache.iceberg.Schema;
2827
import org.apache.iceberg.Table;
29-
import org.apache.iceberg.expressions.Expression;
3028
import org.apache.iceberg.io.CloseableIterable;
3129
import org.apache.iceberg.io.InputFile;
30+
import org.apache.iceberg.io.datafile.DataFileServiceRegistry;
31+
import org.apache.iceberg.io.datafile.DeleteFilter;
32+
import org.apache.iceberg.io.datafile.ReaderBuilder;
33+
import org.apache.iceberg.io.datafile.ReaderService;
34+
import org.apache.iceberg.io.datafile.ServiceBase;
3235
import org.apache.iceberg.orc.ORC;
3336
import org.apache.iceberg.parquet.Parquet;
34-
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
3537
import org.apache.iceberg.spark.OrcBatchReadConf;
3638
import org.apache.iceberg.spark.ParquetBatchReadConf;
3739
import org.apache.iceberg.spark.ParquetReaderType;
3840
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
3941
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
40-
import org.apache.iceberg.types.TypeUtil;
42+
import org.apache.iceberg.types.Types;
43+
import org.apache.spark.sql.catalyst.InternalRow;
4144
import org.apache.spark.sql.vectorized.ColumnarBatch;
4245

4346
abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {
@@ -59,82 +62,111 @@ abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBa
5962

6063
protected CloseableIterable<ColumnarBatch> newBatchIterable(
6164
InputFile inputFile,
62-
FileFormat format,
63-
long start,
64-
long length,
65-
Expression residual,
66-
Map<Integer, ?> idToConstant,
65+
ContentScanTask<?> task,
66+
Types.StructType unifiedPartitionType,
6767
SparkDeleteFilter deleteFilter) {
68-
switch (format) {
69-
case PARQUET:
70-
return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter);
68+
ReaderBuilder<?> readerBuilder =
69+
DataFileServiceRegistry.read(
70+
task.file().format(),
71+
InternalRow.class.getName(),
72+
parquetConf != null ? parquetConf.readerType().name() : null,
73+
inputFile,
74+
task,
75+
expectedSchema(),
76+
unifiedPartitionType,
77+
deleteFilter)
78+
.split(task.start(), task.length())
79+
.filter(task.residual())
80+
.caseSensitive(caseSensitive())
81+
// Spark eagerly consumes the batches. So the underlying memory allocated could be
82+
// reused
83+
// without worrying about subsequent reads clobbering over each other. This improves
84+
// read performance as every batch read doesn't have to pay the cost of allocating
85+
// memory.
86+
.reuseContainers()
87+
.withNameMapping(nameMapping());
88+
if (parquetConf != null) {
89+
readerBuilder = readerBuilder.recordsPerBatch(parquetConf.batchSize());
90+
} else if (orcConf != null) {
91+
readerBuilder = readerBuilder.recordsPerBatch(orcConf.batchSize());
92+
}
93+
94+
return readerBuilder.build();
95+
}
7196

72-
case ORC:
73-
return newOrcIterable(inputFile, start, length, residual, idToConstant);
97+
public static class IcebergParquetReaderService extends ServiceBase implements ReaderService {
98+
@SuppressWarnings("checkstyle:RedundantModifier")
99+
public IcebergParquetReaderService() {
100+
super(FileFormat.PARQUET, InternalRow.class.getName(), ParquetReaderType.ICEBERG.name());
101+
}
74102

75-
default:
76-
throw new UnsupportedOperationException(
77-
"Format: " + format + " not supported for batched reads");
103+
@Override
104+
public ReaderBuilder<?> builder(
105+
InputFile inputFile,
106+
ContentScanTask<?> task,
107+
Schema readSchema,
108+
Types.StructType unifiedPartitionType,
109+
DeleteFilter<?> deleteFilter) {
110+
// get required schema if there are deletes
111+
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : readSchema;
112+
return Parquet.read(inputFile)
113+
.project(requiredSchema)
114+
.createBatchedReaderFunc(
115+
fileSchema ->
116+
VectorizedSparkParquetReaders.buildReader(
117+
requiredSchema,
118+
fileSchema,
119+
constantsMap(task, readSchema, unifiedPartitionType),
120+
(DeleteFilter<InternalRow>) deleteFilter));
78121
}
79122
}
80123

81-
private CloseableIterable<ColumnarBatch> newParquetIterable(
82-
InputFile inputFile,
83-
long start,
84-
long length,
85-
Expression residual,
86-
Map<Integer, ?> idToConstant,
87-
SparkDeleteFilter deleteFilter) {
88-
// get required schema if there are deletes
89-
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();
124+
public static class CometParquetReaderService extends ServiceBase implements ReaderService {
125+
@SuppressWarnings("checkstyle:RedundantModifier")
126+
public CometParquetReaderService() {
127+
super(FileFormat.PARQUET, InternalRow.class.getName(), ParquetReaderType.COMET.name());
128+
}
90129

91-
return Parquet.read(inputFile)
92-
.project(requiredSchema)
93-
.split(start, length)
94-
.createBatchedReaderFunc(
95-
fileSchema -> {
96-
if (parquetConf.readerType() == ParquetReaderType.COMET) {
97-
return VectorizedSparkParquetReaders.buildCometReader(
98-
requiredSchema, fileSchema, idToConstant, deleteFilter);
99-
} else {
100-
return VectorizedSparkParquetReaders.buildReader(
101-
requiredSchema, fileSchema, idToConstant, deleteFilter);
102-
}
103-
})
104-
.recordsPerBatch(parquetConf.batchSize())
105-
.filter(residual)
106-
.caseSensitive(caseSensitive())
107-
// Spark eagerly consumes the batches. So the underlying memory allocated could be reused
108-
// without worrying about subsequent reads clobbering over each other. This improves
109-
// read performance as every batch read doesn't have to pay the cost of allocating memory.
110-
.reuseContainers()
111-
.withNameMapping(nameMapping())
112-
.build();
130+
@Override
131+
public ReaderBuilder<?> builder(
132+
InputFile inputFile,
133+
ContentScanTask<?> task,
134+
Schema readSchema,
135+
Types.StructType unifiedPartitionType,
136+
DeleteFilter<?> deleteFilter) {
137+
// get required schema if there are deletes
138+
Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : readSchema;
139+
return Parquet.read(inputFile)
140+
.project(requiredSchema)
141+
.createBatchedReaderFunc(
142+
fileSchema ->
143+
VectorizedSparkParquetReaders.buildCometReader(
144+
requiredSchema,
145+
fileSchema,
146+
constantsMap(task, readSchema, unifiedPartitionType),
147+
(DeleteFilter<InternalRow>) deleteFilter));
148+
}
113149
}
114150

115-
private CloseableIterable<ColumnarBatch> newOrcIterable(
116-
InputFile inputFile,
117-
long start,
118-
long length,
119-
Expression residual,
120-
Map<Integer, ?> idToConstant) {
121-
Set<Integer> constantFieldIds = idToConstant.keySet();
122-
Set<Integer> metadataFieldIds = MetadataColumns.metadataFieldIds();
123-
Sets.SetView<Integer> constantAndMetadataFieldIds =
124-
Sets.union(constantFieldIds, metadataFieldIds);
125-
Schema schemaWithoutConstantAndMetadataFields =
126-
TypeUtil.selectNot(expectedSchema(), constantAndMetadataFieldIds);
151+
public static class ORCReaderService extends ServiceBase implements ReaderService {
152+
@SuppressWarnings("checkstyle:RedundantModifier")
153+
public ORCReaderService() {
154+
super(FileFormat.ORC, InternalRow.class.getName());
155+
}
127156

128-
return ORC.read(inputFile)
129-
.project(schemaWithoutConstantAndMetadataFields)
130-
.split(start, length)
131-
.createBatchedReaderFunc(
132-
fileSchema ->
133-
VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant))
134-
.recordsPerBatch(orcConf.batchSize())
135-
.filter(residual)
136-
.caseSensitive(caseSensitive())
137-
.withNameMapping(nameMapping())
138-
.build();
157+
@Override
158+
public ReaderBuilder<?> builder(
159+
InputFile inputFile,
160+
ContentScanTask<?> task,
161+
Schema readSchema,
162+
Types.StructType unifiedPartitionType,
163+
DeleteFilter<?> deleteFilter) {
164+
Map<Integer, ?> idToConstant = constantsMap(task, readSchema, unifiedPartitionType);
165+
return ORC.read(inputFile)
166+
.project(ORC.schemaWithoutConstantAndMetadataFields(readSchema, idToConstant))
167+
.createBatchedReaderFunc(
168+
fileSchema ->
169+
VectorizedSparkOrcReaders.buildReader(readSchema, fileSchema, idToConstant));
170+
}
139171
}
140172
}

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java

+9
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,15 @@ private Map<String, InputFile> inputFiles() {
190190
}
191191
}
192192

193+
protected static Map<Integer, ?> constantsMap(
194+
ContentScanTask<?> task, Schema readSchema, StructType unifiedPartitionType) {
195+
if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
196+
return PartitionUtil.constantsMap(task, unifiedPartitionType, SparkUtil::internalToSpark);
197+
} else {
198+
return PartitionUtil.constantsMap(task, SparkUtil::internalToSpark);
199+
}
200+
}
201+
193202
protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
194203
private final InternalRowWrapper asStructLike;
195204

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java

+2-11
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818
*/
1919
package org.apache.iceberg.spark.source;
2020

21-
import java.util.Map;
2221
import java.util.stream.Stream;
2322
import org.apache.iceberg.ContentFile;
2423
import org.apache.iceberg.FileScanTask;
24+
import org.apache.iceberg.Partitioning;
2525
import org.apache.iceberg.ScanTaskGroup;
2626
import org.apache.iceberg.Schema;
2727
import org.apache.iceberg.Table;
@@ -95,8 +95,6 @@ protected CloseableIterator<ColumnarBatch> open(FileScanTask task) {
9595
// update the current file for Spark's filename() function
9696
InputFileBlockHolder.set(filePath, task.start(), task.length());
9797

98-
Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
99-
10098
InputFile inputFile = getInputFile(filePath);
10199
Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");
102100

@@ -105,14 +103,7 @@ protected CloseableIterator<ColumnarBatch> open(FileScanTask task) {
105103
? null
106104
: new SparkDeleteFilter(filePath, task.deletes(), counter(), false);
107105

108-
return newBatchIterable(
109-
inputFile,
110-
task.file().format(),
111-
task.start(),
112-
task.length(),
113-
task.residual(),
114-
idToConstant,
115-
deleteFilter)
106+
return newBatchIterable(inputFile, task, Partitioning.partitionType(table()), deleteFilter)
116107
.iterator();
117108
}
118109
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
org.apache.iceberg.spark.source.BaseBatchReader$IcebergParquetReaderService
21+
org.apache.iceberg.spark.source.BaseBatchReader$CometParquetReaderService
22+
org.apache.iceberg.spark.source.BaseBatchReader$ORCReaderService
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
org.apache.iceberg.spark.source.BaseBatchReader$IcebergParquetReaderService
21+
org.apache.iceberg.spark.source.BaseBatchReader$CometParquetReaderService
22+
org.apache.iceberg.spark.source.BaseBatchReader$ORCReaderService

0 commit comments

Comments
 (0)