Skip to content

Commit 808b4fb

Browse files
res-lifeChong Gaoclaude
authored
Iceberg 1.11 support for Spark 411, part (2/3): add iceberg-1-11-x module (#14882)
Part (2/3) of #14853. Part (1/3) (#14881) is merged; this PR is now based directly on `main`. Part (3/3) (#14883) stacks on this one. ### Description Apache Iceberg published `iceberg-spark-runtime-4.1_2.13` starting at version 1.11.0 ([apache/iceberg#14155](apache/iceberg#14155) added Spark 4.1 support, released in [1.11.0](https://github.com/apache/iceberg/releases/tag/apache-iceberg-1.11.0)). `iceberg-1-10-x` is not an option on Spark 4.1 because Iceberg never released a 4.1 runtime before 1.11. Adds a new Maven submodule `iceberg/iceberg-1-11-x` and switches the `release411` profile from `iceberg/iceberg-stub` to use it. Module skeleton (mirrors `iceberg-1-10-x` with the `iceberg111x` sub-package and a `spark411` shim source dir): - `iceberg/iceberg-1-11-x/pom.xml` + scala2.13 mirror - `iceberg111x/IcebergProviderImpl`, `ShimUtilsImpl`, `GpuParquetIOShim` - `org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan` — Iceberg 1.11 copy-on-write scan: `SupportsRuntimeV2Filtering` with `filter(Predicate[])` - `spark411/.../GpuInternalRow` overrides the new `SpecializedGetters` methods Spark 4.1 added (`getGeometry`, `getGeography`) alongside the existing `getVariant` Wiring: - parent pom: add `spark41x.iceberg.artifact.suffix=4.1` and `iceberg.111x.version=1.11.0`; swap `release411` from `iceberg-stub` to `iceberg-1-11-x` - `scala2.13` mirror regenerated via `build/make-scala-version-build-files.sh` - `IcebergProbeImpl`: lift the `< 4.1.0` Spark cap to `< 4.2.0`; add the 1.11.0 commit-id mapping and `"1.11" -> "iceberg111x"` shim sub-package - `iceberg/README.md`: document the new row in the Iceberg/Spark support matrix Enable iceberg integration tests on Spark 4.1.x: - `spark_session.py`: add `is_spark_41x()` and include 4.1.x in `is_iceberg_supported_spark()` - `iceberg/__init__.py`: update the skip reason to mention 4.1.x - `iceberg_test.py::test_iceberg_read_timetravel`: Iceberg 1.11 removed the `.option("snapshot-id", ...)` read API and directs users at Spark's built-in `versionAsOf` (works on both 1.10 and 1.11) Also folds in two non-blocking review nits from the merged part (1/3) (#14881): - **Dedup the V1 copy-on-write scan** — the identical 1.6.x/1.9.x/1.10.x `GpuSparkCopyOnWriteScan` classes (post-#14866 they depend only on public `Scan` + `SupportsRuntimeFiltering`) are replaced by a single common `GpuSparkCopyOnWriteV1Scan` in `iceberg/common`, instantiated by all three V1 `ShimUtilsImpl`s. Only the 1.11 V2 path keeps a version-specific class. - **`GpuSparkScanAccess.branch()`** reads the private `branch` field directly instead of the removed-in-1.11 `SparkScan.branch()` method, so it resolves across 1.6.x–1.11.x (was returning `null` on 1.11). ### Checklists Documentation - [x] Updated for new or modified user-facing features or behaviors (`iceberg/README.md` adds the Iceberg 1.11.x / Spark 4.1.x row to the support matrix.) - [ ] No user-facing change Testing - [x] Added or modified tests to cover new code paths (`spark_session.py` + `iceberg/__init__.py` re-enable the iceberg suite on Spark 4.1.x; `test_iceberg_read_timetravel` switched to `versionAsOf`.) - [ ] Covered by existing tests - [ ] Not required Performance - [ ] Tests ran and results are added in the PR description - [ ] Issue filed with a link in the PR description - [x] Not required --------- Signed-off-by: Chong Gao <res_life@163.com> Co-authored-by: Chong Gao <res_life@163.com> Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent e57b9b2 commit 808b4fb

22 files changed

Lines changed: 500 additions & 97 deletions

File tree

iceberg/README.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,22 @@ and the directory that contains the corresponding support code.
1414
| 1.6.x | Spark 3.5.0-3.5.3 | `iceberg-1-6-x` |
1515
| 1.9.x | Spark 3.5.4-3.5.8 | `iceberg-1-9-x` |
1616
| 1.10.x | Spark 3.5.4-3.5.8, 4.0.x | `iceberg-1-10-x` |
17+
| 1.11.x | Spark 4.1.x | `iceberg-1-11-x` |
1718

18-
Iceberg GPU acceleration is currently supported on Spark 3.5.x and 4.0.x.
19+
Iceberg GPU acceleration is currently supported on Spark 3.5.x, 4.0.x, and 4.1.x.
1920

2021
For Spark 3.5.4+, both `iceberg-1-9-x` and `iceberg-1-10-x` modules are compiled into the
2122
build. The correct version-specific implementation is selected at runtime by probing the
2223
`iceberg-spark-runtime` jar on the classpath. Version-specific code lives in distinct
23-
sub-packages (`iceberg19x`, `iceberg110x`) to avoid class conflicts, and the common
24-
`ShimUtils` dispatcher delegates to the appropriate implementation.
24+
sub-packages (`iceberg19x`, `iceberg110x`, `iceberg111x`) to avoid class conflicts, and the
25+
common `ShimUtils` dispatcher delegates to the appropriate implementation.
2526

2627
For Spark 4.0.x, only `iceberg-1-10-x` is compiled during the build.
2728

29+
For Spark 4.1.x, only `iceberg-1-11-x` is compiled during the build. Apache Iceberg
30+
publishes the `iceberg-spark-runtime-4.1` artifact starting at version 1.11.0, so earlier
31+
Iceberg releases cannot be used with Spark 4.1.
32+
2833
## Code Shared Between Modules
2934

3035
The `common` directory contains code that is shared across some or all of the Iceberg

iceberg/common/src/main/java/org/apache/iceberg/spark/source/GpuSparkScanAccess.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,26 @@ public static Table table(Scan scan) {
7171
}
7272

7373
public static String branch(Scan scan) {
74-
// Iceberg 1.10.x and earlier: protected method SparkScan.branch(). Iceberg 1.11.x
75-
// removed it entirely; return null for display purposes.
76-
return invokeMethod(sparkScan(scan), String.class, "branch");
74+
// Iceberg 1.10.x and earlier exposed a protected SparkScan.branch() method;
75+
// 1.11.x removed it but the concrete scan classes still carry a private
76+
// `branch` field (included in description()). Read the field directly so this
77+
// works across all supported Iceberg versions. Returns null when no branch is
78+
// set (or, defensively, when the field is absent).
79+
Object target = sparkScan(scan);
80+
Field f = findField(target.getClass(), "branch");
81+
if (f == null) {
82+
return null;
83+
}
84+
try {
85+
f.setAccessible(true);
86+
Object v = f.get(target);
87+
return v == null ? null : v.toString();
88+
} catch (IllegalAccessException | RuntimeException e) {
89+
// RuntimeException also covers InaccessibleObjectException (Java 9+ module
90+
// encapsulation) and SecurityException, so any access failure degrades to
91+
// null per the contract above rather than escaping from this display-only path.
92+
return null;
93+
}
7794
}
7895

7996
public static boolean caseSensitive(Scan scan) {

iceberg/common/src/main/scala/com/nvidia/spark/rapids/iceberg/IcebergProbeImpl.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class IcebergProbeImpl extends IcebergProbe with Logging {
2727
ShimLoader.getShimVersion match {
2828
case _: SparkShimVersion =>
2929
VersionUtils.cmpSparkVersion(3, 5, 0) >= 0 &&
30-
VersionUtils.cmpSparkVersion(4, 1, 0) < 0
30+
VersionUtils.cmpSparkVersion(4, 2, 0) < 0
3131
case _ => false
3232
}
3333
}
@@ -42,7 +42,8 @@ class IcebergProbeImpl extends IcebergProbe with Logging {
4242
"f40208ae6fb2f33e578c2637d3dea1db18739f31" -> "1.9.1",
4343
"071d5606bc6199a0be9b3f274ec7fbf111d88821" -> "1.9.2",
4444
"2114bf631e49af532d66e2ce148ee49dd1dd1f1f" -> "1.10.0",
45-
"ccb8bc435062171e64bc8b7e5f56e6aed9c5b934" -> "1.10.1"
45+
"ccb8bc435062171e64bc8b7e5f56e6aed9c5b934" -> "1.10.1",
46+
"6976e020b894f6a6777704df2b8c4458cb291ae9" -> "1.11.0"
4647
)
4748

4849
// e.g. iceberg-spark-runtime-3.5_2.12-1.10.0-*.jar -> "1.10.0"
@@ -76,7 +77,8 @@ class IcebergProbeImpl extends IcebergProbe with Logging {
7677
private val icebergVersionToShim: Map[String, String] = Map(
7778
"1.6" -> "iceberg16x",
7879
"1.9" -> "iceberg19x",
79-
"1.10" -> "iceberg110x"
80+
"1.10" -> "iceberg110x",
81+
"1.11" -> "iceberg111x"
8082
)
8183

8284
override def shimPackage: String = {

iceberg/iceberg-1-6-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala renamed to iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteV1Scan.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,20 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering}
2323
import org.apache.spark.sql.sources.Filter
2424

2525
/**
26-
* Iceberg 1.6.x copy-on-write scan: {@code SupportsRuntimeFiltering} with
27-
* {@code filter(Array[Filter])}.
26+
* Copy-on-write scan for the Iceberg versions that expose the V1 runtime-filter
27+
* contract: {@code SupportsRuntimeFiltering} with {@code filter(Array[Filter])}.
28+
* This covers Iceberg 1.6.x, 1.9.x, and 1.10.x. Iceberg 1.11.x switched to
29+
* {@code SupportsRuntimeV2Filtering} with {@code filter(Array[Predicate])} and
30+
* therefore ships its own per-version subclass.
31+
*
32+
* <p>Because this class depends only on the public {@code Scan} +
33+
* {@code SupportsRuntimeFiltering} types (Iceberg internals are reached through
34+
* the root-loadable {@link GpuSparkScanAccess} bridge in the base class), the V1
35+
* path lives once in {@code iceberg/common} and is instantiated by every
36+
* V1-version {@code ShimUtilsImpl} via {@link #create}, rather than being copied
37+
* per module.
2838
*/
29-
class GpuSparkCopyOnWriteScan(
39+
class GpuSparkCopyOnWriteV1Scan(
3040
cpuScanArg: Scan,
3141
rapidsConfArg: RapidsConf,
3242
queryUsesInputFileArg: Boolean)
@@ -41,12 +51,12 @@ class GpuSparkCopyOnWriteScan(
4151
override def filter(filters: Array[Filter]): Unit = runtimeFilterScan.filter(filters)
4252

4353
override def withInputFile(): GpuScan =
44-
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, true)
54+
new GpuSparkCopyOnWriteV1Scan(cpuScan, rapidsConf, true)
4555
}
4656

47-
object GpuSparkCopyOnWriteScan {
48-
/** Java-callable factory used by {@code ShimUtilsImpl.newCopyOnWriteScan}. */
57+
object GpuSparkCopyOnWriteV1Scan {
58+
/** Java-callable factory used by the 1.6.x / 1.9.x / 1.10.x {@code ShimUtilsImpl}. */
4959
def create(cpuScan: Scan, rapidsConf: RapidsConf, queryUsesInputFile: Boolean)
5060
: GpuSparkScan =
51-
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, queryUsesInputFile)
61+
new GpuSparkCopyOnWriteV1Scan(cpuScan, rapidsConf, queryUsesInputFile)
5262
}

iceberg/iceberg-1-10-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg110x/ShimUtilsImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
2929
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
3030
import org.apache.iceberg.spark.SparkUtil;
31-
import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan;
31+
import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteV1Scan;
3232
import org.apache.iceberg.spark.source.GpuSparkScan;
3333
import org.apache.iceberg.types.Types;
3434
import org.apache.iceberg.util.PartitionUtil;
@@ -84,6 +84,6 @@ public GpuSparkScan newCopyOnWriteScan(
8484
Scan cpuScan,
8585
RapidsConf rapidsConf,
8686
boolean queryUsesInputFile) {
87-
return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile);
87+
return GpuSparkCopyOnWriteV1Scan.create(cpuScan, rapidsConf, queryUsesInputFile);
8888
}
8989
}

iceberg/iceberg-1-11-x/pom.xml

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Copyright (c) 2026 NVIDIA CORPORATION.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
-->
17+
<project xmlns="http://maven.apache.org/POM/4.0.0"
18+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
22+
<parent>
23+
<groupId>com.nvidia</groupId>
24+
<artifactId>rapids-4-spark-parent_2.12</artifactId>
25+
<version>26.08.0-SNAPSHOT</version>
26+
<relativePath>../../pom.xml</relativePath>
27+
</parent>
28+
29+
<artifactId>rapids-4-spark-iceberg-1-11-x_2.12</artifactId>
30+
<name>RAPIDS Accelerator for Apache Iceberg</name>
31+
<description>Apache Iceberg support for the RAPIDS Accelerator for Apache Spark</description>
32+
<version>26.08.0-SNAPSHOT</version>
33+
34+
<properties>
35+
<rapids.module>../iceberg/iceberg-1-11-x</rapids.module>
36+
<rapids.compressed.artifact>false</rapids.compressed.artifact>
37+
<rapids.shim.jar.phase>package</rapids.shim.jar.phase>
38+
<maven.scaladoc.skip>true</maven.scaladoc.skip>
39+
<!--
40+
Spark 4.x catalyst class files (Table, WriteBuilder, etc.) carry
41+
@Deprecated(since = "...") annotations. Reading them under a Java 8
42+
target makes javac emit "Cannot find annotation method 'since()' in
43+
type 'Deprecated'" against those class files, which the project's
44+
-Werror promotes to an error. Exclude classfile warnings (and -options
45+
for parity with release411) so this module's Java sources compile
46+
under Spark 4.1 builds.
47+
-->
48+
<scala.javac.args>-Xlint:all,-serial,-path,-try,-processing,-options,-classfile|-Werror</scala.javac.args>
49+
</properties>
50+
51+
<dependencies>
52+
<dependency>
53+
<groupId>com.nvidia</groupId>
54+
<artifactId>rapids-4-spark-sql_${scala.binary.version}</artifactId>
55+
<classifier>${spark.version.classifier}</classifier>
56+
</dependency>
57+
<dependency>
58+
<groupId>org.apache.iceberg</groupId>
59+
<artifactId>iceberg-spark-runtime-${iceberg.artifact.suffix}_${scala.binary.version}</artifactId>
60+
<version>${iceberg.111x.version}</version>
61+
<scope>provided</scope>
62+
</dependency>
63+
</dependencies>
64+
65+
<build>
66+
<plugins>
67+
<plugin>
68+
<groupId>org.codehaus.mojo</groupId>
69+
<artifactId>build-helper-maven-plugin</artifactId>
70+
<executions>
71+
<execution>
72+
<id>add-common-sources</id>
73+
<phase>generate-sources</phase>
74+
<goals>
75+
<goal>add-source</goal>
76+
</goals>
77+
<configuration>
78+
<sources>
79+
<source>${spark.rapids.source.basedir}/iceberg/common/src/main/java</source>
80+
<source>${spark.rapids.source.basedir}/iceberg/common/src/main/scala</source>
81+
</sources>
82+
</configuration>
83+
</execution>
84+
</executions>
85+
</plugin>
86+
<plugin>
87+
<groupId>net.alchim31.maven</groupId>
88+
<artifactId>scala-maven-plugin</artifactId>
89+
</plugin>
90+
<plugin>
91+
<groupId>org.apache.rat</groupId>
92+
<artifactId>apache-rat-plugin</artifactId>
93+
</plugin>
94+
</plugins>
95+
</build>
96+
</project>
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright (c) 2026, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.nvidia.spark.rapids.iceberg.iceberg111x;
18+
19+
import com.nvidia.spark.rapids.GpuMetric;
20+
import com.nvidia.spark.rapids.RapidsConf;
21+
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile;
22+
import com.nvidia.spark.rapids.iceberg.IcebergShimUtils;
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.iceberg.*;
25+
import org.apache.iceberg.io.FileIO;
26+
import org.apache.iceberg.io.StorageCredential;
27+
import org.apache.iceberg.io.SupportsStorageCredentials;
28+
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
29+
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
30+
import org.apache.iceberg.spark.SparkUtil;
31+
import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan;
32+
import org.apache.iceberg.spark.source.GpuSparkScan;
33+
import org.apache.iceberg.types.Types;
34+
import org.apache.iceberg.util.PartitionUtil;
35+
import org.apache.spark.sql.connector.read.Scan;
36+
37+
import java.io.IOException;
38+
import java.util.Collections;
39+
import java.util.HashMap;
40+
import java.util.Map;
41+
42+
/** Iceberg 1.11.x shim: uses {@code SparkUtil::internalToSpark} and a cache-aware footer path. */
43+
public class ShimUtilsImpl implements IcebergShimUtils {
44+
@Override
45+
public String locationOf(ContentFile<?> f) {
46+
return f.location();
47+
}
48+
49+
@Override
50+
public Map<Integer, ?> constantsMap(FileScanTask task, Schema readSchema, Table table) {
51+
if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
52+
Types.StructType partitionType = Partitioning.partitionType(table);
53+
return PartitionUtil.constantsMap(task,
54+
partitionType,
55+
SparkUtil::internalToSpark);
56+
} else {
57+
return PartitionUtil.constantsMap(task, SparkUtil::internalToSpark);
58+
}
59+
}
60+
61+
@Override
62+
public Map<String, Map<String, String>> storageCredentialOverlays(FileIO fileIO) {
63+
if (!(fileIO instanceof SupportsStorageCredentials)) {
64+
return Collections.emptyMap();
65+
}
66+
Map<String, Map<String, String>> result = new HashMap<>();
67+
for (StorageCredential sc : ((SupportsStorageCredentials) fileIO).credentials()) {
68+
result.put(sc.prefix(), sc.config());
69+
}
70+
return result;
71+
}
72+
73+
@Override
74+
public ParquetFileReader openParquetReader(
75+
IcebergInputFile inputFile,
76+
Path filePath,
77+
ParquetReadOptions options,
78+
scala.collection.immutable.Map<String, GpuMetric> metrics) throws IOException {
79+
return GpuParquetIOShim.openReader(inputFile, filePath, options, metrics);
80+
}
81+
82+
@Override
83+
public GpuSparkScan newCopyOnWriteScan(
84+
Scan cpuScan,
85+
RapidsConf rapidsConf,
86+
boolean queryUsesInputFile) {
87+
return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile);
88+
}
89+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2026, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.nvidia.spark.rapids.iceberg.iceberg111x
18+
19+
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
20+
import com.nvidia.spark.rapids.GpuMetric
21+
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile
22+
import com.nvidia.spark.rapids.iceberg.parquet.converter.ToIcebergShaded
23+
import com.nvidia.spark.rapids.parquet.{HMBInputFile, ParquetFooterUtils}
24+
import org.apache.hadoop.fs.Path
25+
import org.apache.iceberg.parquet.GpuParquetIO
26+
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions
27+
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader
28+
29+
/**
30+
* Iceberg 1.11.x shim: reads the footer via `FileCache` and injects it into `ParquetFileReader`
31+
* through the 4-arg `(InputFile, ParquetMetadata, ParquetReadOptions, SeekableInputStream)`
32+
* constructor that is available from iceberg 1.10.x onward. Lives in the versioned
33+
* `iceberg111x` package so each minor-version shim can evolve independently without the
34+
* shade-plugin class-name collision that the 1.10.x split introduced.
35+
*/
36+
object GpuParquetIOShim {
37+
def openReader(
38+
inputFile: IcebergInputFile,
39+
filePath: Path,
40+
options: ParquetReadOptions,
41+
metrics: Map[String, GpuMetric]): ParquetFileReader = {
42+
val metadata = withResource(ParquetFooterUtils.getFooterBuffer(
43+
inputFile, metrics,
44+
ParquetFooterUtils.readFooterBufferFromInputFile(inputFile, filePath))) { hmb =>
45+
val shadedHmbFile = ToIcebergShaded.shade(new HMBInputFile(hmb))
46+
withResource(shadedHmbFile.newStream()) { hmbStream =>
47+
ParquetFileReader.readFooter(shadedHmbFile, options, hmbStream)
48+
}
49+
}
50+
val realFile = GpuParquetIO.file(inputFile.getDelegate)
51+
closeOnExcept(realFile.newStream()) { stream =>
52+
new ParquetFileReader(realFile, metadata, options, stream)
53+
}
54+
}
55+
}

0 commit comments

Comments
 (0)