Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions iceberg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@ and the directory that contains the corresponding support code.
| 1.6.x | Spark 3.5.0-3.5.3 | `iceberg-1-6-x` |
| 1.9.x | Spark 3.5.4-3.5.8 | `iceberg-1-9-x` |
| 1.10.x | Spark 3.5.4-3.5.8, 4.0.x | `iceberg-1-10-x` |
| 1.11.x | Spark 4.1.x | `iceberg-1-11-x` |

Iceberg GPU acceleration is currently supported on Spark 3.5.x and 4.0.x.
Iceberg GPU acceleration is currently supported on Spark 3.5.x, 4.0.x, and 4.1.x.

For Spark 3.5.4+, both `iceberg-1-9-x` and `iceberg-1-10-x` modules are compiled into the
build. The correct version-specific implementation is selected at runtime by probing the
`iceberg-spark-runtime` jar on the classpath. Version-specific code lives in distinct
sub-packages (`iceberg19x`, `iceberg110x`) to avoid class conflicts, and the common
`ShimUtils` dispatcher delegates to the appropriate implementation.
sub-packages (`iceberg19x`, `iceberg110x`, `iceberg111x`) to avoid class conflicts, and the
common `ShimUtils` dispatcher delegates to the appropriate implementation.

For Spark 4.0.x, only `iceberg-1-10-x` is compiled during the build.

For Spark 4.1.x, only `iceberg-1-11-x` is compiled during the build. Apache Iceberg
publishes the `iceberg-spark-runtime-4.1` artifact starting at version 1.11.0, so earlier
Iceberg releases cannot be used with Spark 4.1.

## Code Shared Between Modules

The `common` directory contains code that is shared across some or all of the Iceberg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,26 @@ public static Table table(Scan scan) {
}

public static String branch(Scan scan) {
// Iceberg 1.10.x and earlier: protected method SparkScan.branch(). Iceberg 1.11.x
// removed it entirely; return null for display purposes.
return invokeMethod(sparkScan(scan), String.class, "branch");
// Iceberg 1.10.x and earlier exposed a protected SparkScan.branch() method;
// 1.11.x removed it but the concrete scan classes still carry a private
// `branch` field (included in description()). Read the field directly so this
// works across all supported Iceberg versions. Returns null when no branch is
// set (or, defensively, when the field is absent).
Object target = sparkScan(scan);
Field f = findField(target.getClass(), "branch");
if (f == null) {
return null;
}
try {
f.setAccessible(true);
Object v = f.get(target);
return v == null ? null : v.toString();
} catch (IllegalAccessException | RuntimeException e) {
// RuntimeException also covers InaccessibleObjectException (Java 9+ module
// encapsulation) and SecurityException, so any access failure degrades to
// null per the contract above rather than escaping from this display-only path.
return null;
}
Comment on lines +84 to +93

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 setAccessible can throw uncaught InaccessibleObjectException

f.setAccessible(true) can throw InaccessibleObjectException (which extends RuntimeException, not IllegalAccessException) in Java 9+ environments with module encapsulation. Only IllegalAccessException is caught here, so a security or module access rejection would propagate as an uncaught exception. Since branch() is used only for display purposes (query description), the catch block should also cover RuntimeException (or InaccessibleObjectException) to maintain the existing "return null on any access failure" contract.

}

public static boolean caseSensitive(Scan scan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class IcebergProbeImpl extends IcebergProbe with Logging {
ShimLoader.getShimVersion match {
case _: SparkShimVersion =>
VersionUtils.cmpSparkVersion(3, 5, 0) >= 0 &&
VersionUtils.cmpSparkVersion(4, 1, 0) < 0
VersionUtils.cmpSparkVersion(4, 2, 0) < 0
case _ => false
}
}
Expand All @@ -42,7 +42,8 @@ class IcebergProbeImpl extends IcebergProbe with Logging {
"f40208ae6fb2f33e578c2637d3dea1db18739f31" -> "1.9.1",
"071d5606bc6199a0be9b3f274ec7fbf111d88821" -> "1.9.2",
"2114bf631e49af532d66e2ce148ee49dd1dd1f1f" -> "1.10.0",
"ccb8bc435062171e64bc8b7e5f56e6aed9c5b934" -> "1.10.1"
"ccb8bc435062171e64bc8b7e5f56e6aed9c5b934" -> "1.10.1",
"6976e020b894f6a6777704df2b8c4458cb291ae9" -> "1.11.0"
)

// e.g. iceberg-spark-runtime-3.5_2.12-1.10.0-*.jar -> "1.10.0"
Expand Down Expand Up @@ -76,7 +77,8 @@ class IcebergProbeImpl extends IcebergProbe with Logging {
private val icebergVersionToShim: Map[String, String] = Map(
"1.6" -> "iceberg16x",
"1.9" -> "iceberg19x",
"1.10" -> "iceberg110x"
"1.10" -> "iceberg110x",
"1.11" -> "iceberg111x"
)

override def shimPackage: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,20 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering}
import org.apache.spark.sql.sources.Filter

/**
* Iceberg 1.6.x copy-on-write scan: {@code SupportsRuntimeFiltering} with
* {@code filter(Array[Filter])}.
* Copy-on-write scan for the Iceberg versions that expose the V1 runtime-filter
* contract: {@code SupportsRuntimeFiltering} with {@code filter(Array[Filter])}.
* This covers Iceberg 1.6.x, 1.9.x, and 1.10.x. Iceberg 1.11.x switched to
* {@code SupportsRuntimeV2Filtering} with {@code filter(Array[Predicate])} and
* therefore ships its own per-version subclass.
*
* <p>Because this class depends only on the public {@code Scan} +
* {@code SupportsRuntimeFiltering} types (Iceberg internals are reached through
* the root-loadable {@link GpuSparkScanAccess} bridge in the base class), the V1
* path lives once in {@code iceberg/common} and is instantiated by every
* V1-version {@code ShimUtilsImpl} via {@link #create}, rather than being copied
* per module.
*/
class GpuSparkCopyOnWriteScan(
class GpuSparkCopyOnWriteV1Scan(
cpuScanArg: Scan,
rapidsConfArg: RapidsConf,
queryUsesInputFileArg: Boolean)
Expand All @@ -41,12 +51,12 @@ class GpuSparkCopyOnWriteScan(
override def filter(filters: Array[Filter]): Unit = runtimeFilterScan.filter(filters)

override def withInputFile(): GpuScan =
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, true)
new GpuSparkCopyOnWriteV1Scan(cpuScan, rapidsConf, true)
}

object GpuSparkCopyOnWriteScan {
/** Java-callable factory used by {@code ShimUtilsImpl.newCopyOnWriteScan}. */
object GpuSparkCopyOnWriteV1Scan {
/** Java-callable factory used by the 1.6.x / 1.9.x / 1.10.x {@code ShimUtilsImpl}. */
def create(cpuScan: Scan, rapidsConf: RapidsConf, queryUsesInputFile: Boolean)
: GpuSparkScan =
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, queryUsesInputFile)
new GpuSparkCopyOnWriteV1Scan(cpuScan, rapidsConf, queryUsesInputFile)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan;
import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteV1Scan;
import org.apache.iceberg.spark.source.GpuSparkScan;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;
Expand Down Expand Up @@ -84,6 +84,6 @@ public GpuSparkScan newCopyOnWriteScan(
Scan cpuScan,
RapidsConf rapidsConf,
boolean queryUsesInputFile) {
return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile);
return GpuSparkCopyOnWriteV1Scan.create(cpuScan, rapidsConf, queryUsesInputFile);
}
}
96 changes: 96 additions & 0 deletions iceberg/iceberg-1-11-x/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2026 NVIDIA CORPORATION.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-parent_2.12</artifactId>
<version>26.08.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>rapids-4-spark-iceberg-1-11-x_2.12</artifactId>
<name>RAPIDS Accelerator for Apache Iceberg</name>
<description>Apache Iceberg support for the RAPIDS Accelerator for Apache Spark</description>
<version>26.08.0-SNAPSHOT</version>

<properties>
<rapids.module>../iceberg/iceberg-1-11-x</rapids.module>
<rapids.compressed.artifact>false</rapids.compressed.artifact>
<rapids.shim.jar.phase>package</rapids.shim.jar.phase>
<maven.scaladoc.skip>true</maven.scaladoc.skip>
<!--
Spark 4.x catalyst class files (Table, WriteBuilder, etc.) carry
@Deprecated(since = "...") annotations. Reading them under a Java 8
target makes javac emit "Cannot find annotation method 'since()' in
type 'Deprecated'" against those class files, which the project's
-Werror promotes to an error. Exclude classfile warnings (and -options
for parity with release411) so this module's Java sources compile
under Spark 4.1 builds.
-->
<scala.javac.args>-Xlint:all,-serial,-path,-try,-processing,-options,-classfile|-Werror</scala.javac.args>
</properties>

<dependencies>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-sql_${scala.binary.version}</artifactId>
<classifier>${spark.version.classifier}</classifier>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-${iceberg.artifact.suffix}_${scala.binary.version}</artifactId>
<version>${iceberg.111x.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-common-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${spark.rapids.source.basedir}/iceberg/common/src/main/java</source>
<source>${spark.rapids.source.basedir}/iceberg/common/src/main/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.iceberg.iceberg111x;

import com.nvidia.spark.rapids.GpuMetric;
import com.nvidia.spark.rapids.RapidsConf;
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile;
import com.nvidia.spark.rapids.iceberg.IcebergShimUtils;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.*;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.StorageCredential;
import org.apache.iceberg.io.SupportsStorageCredentials;
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan;
import org.apache.iceberg.spark.source.GpuSparkScan;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.sql.connector.read.Scan;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/** Iceberg 1.11.x shim: uses {@code SparkUtil::internalToSpark} and a cache-aware footer path. */
public class ShimUtilsImpl implements IcebergShimUtils {
@Override
public String locationOf(ContentFile<?> f) {
return f.location();
}

@Override
public Map<Integer, ?> constantsMap(FileScanTask task, Schema readSchema, Table table) {
if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
Types.StructType partitionType = Partitioning.partitionType(table);
return PartitionUtil.constantsMap(task,
partitionType,
SparkUtil::internalToSpark);
} else {
return PartitionUtil.constantsMap(task, SparkUtil::internalToSpark);
}
}

@Override
public Map<String, Map<String, String>> storageCredentialOverlays(FileIO fileIO) {
if (!(fileIO instanceof SupportsStorageCredentials)) {
return Collections.emptyMap();
}
Map<String, Map<String, String>> result = new HashMap<>();
for (StorageCredential sc : ((SupportsStorageCredentials) fileIO).credentials()) {
result.put(sc.prefix(), sc.config());
}
return result;
}

@Override
public ParquetFileReader openParquetReader(
IcebergInputFile inputFile,
Path filePath,
ParquetReadOptions options,
scala.collection.immutable.Map<String, GpuMetric> metrics) throws IOException {
return GpuParquetIOShim.openReader(inputFile, filePath, options, metrics);
}

@Override
public GpuSparkScan newCopyOnWriteScan(
Scan cpuScan,
RapidsConf rapidsConf,
boolean queryUsesInputFile) {
return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.iceberg.iceberg111x

import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuMetric
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile
import com.nvidia.spark.rapids.iceberg.parquet.converter.ToIcebergShaded
import com.nvidia.spark.rapids.parquet.{HMBInputFile, ParquetFooterUtils}
import org.apache.hadoop.fs.Path
import org.apache.iceberg.parquet.GpuParquetIO
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader

/**
* Iceberg 1.11.x shim: reads the footer via `FileCache` and injects it into `ParquetFileReader`
* through the 4-arg `(InputFile, ParquetMetadata, ParquetReadOptions, SeekableInputStream)`
* constructor that is available from iceberg 1.10.x onward. Lives in the versioned
* `iceberg111x` package so each minor-version shim can evolve independently without the
* shade-plugin class-name collision that the 1.10.x split introduced.
*/
object GpuParquetIOShim {
def openReader(
inputFile: IcebergInputFile,
filePath: Path,
options: ParquetReadOptions,
metrics: Map[String, GpuMetric]): ParquetFileReader = {
val metadata = withResource(ParquetFooterUtils.getFooterBuffer(
inputFile, metrics,
ParquetFooterUtils.readFooterBufferFromInputFile(inputFile, filePath))) { hmb =>
val shadedHmbFile = ToIcebergShaded.shade(new HMBInputFile(hmb))
withResource(shadedHmbFile.newStream()) { hmbStream =>
ParquetFileReader.readFooter(shadedHmbFile, options, hmbStream)
}
}
val realFile = GpuParquetIO.file(inputFile.getDelegate)
closeOnExcept(realFile.newStream()) { stream =>
new ParquetFileReader(realFile, metadata, options, stream)
}
}
}
Loading