From 911e7cc8dffd5766dbf5ef9129ac10349818c86b Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Tue, 26 May 2026 09:06:39 +0800 Subject: [PATCH 1/4] Iceberg 1.11 support for Spark 411, part (2/3): add iceberg-1-11-x module Adds the iceberg-1-11-x module and wires it into the release411 profile so Spark 4.1.x uses Iceberg 1.11 (Iceberg only publishes iceberg-spark-runtime-4.1 from 1.11.0 on; iceberg-1-10-x is not an option on 4.1). Module: - iceberg/iceberg-1-11-x (pom + scala2.13 mirror), iceberg111x sub-package (IcebergProviderImpl, ShimUtilsImpl, GpuParquetIOShim), the 1.11 V2 copy-on-write scan, and a spark411 GpuInternalRow overriding the new SpecializedGetters methods (getGeometry/getGeography) added in Spark 4.1. - Parent pom: spark41x.iceberg.artifact.suffix=4.1, iceberg.111x.version=1.11.0; release411 swapped from iceberg-stub to iceberg-1-11-x. - IcebergProbeImpl: lift the Spark cap to < 4.2.0; add the 1.11.0 commit-id mapping and the "1.11" -> "iceberg111x" shim sub-package. - README + integration_tests wiring (is_spark_41x(), versionAsOf timetravel). Folds in two non-blocking review nits from the merged part (1/3) (#14881): - Dedup the V1 copy-on-write scan into a single common GpuSparkCopyOnWriteV1Scan in iceberg/common, instantiated by all three V1 ShimUtilsImpls. Only the 1.11 V2 path keeps a version-specific class. - GpuSparkScanAccess.branch() reads the private `branch` field directly instead of the removed-in-1.11 SparkScan.branch() method (was returning null on 1.11). Review follow-ups on this PR: - iceberg_version_detection_test.py now gates on iceberg_unsupported_mark (3.5/4.0/4.1) instead of is_spark_35x, and jenkins/spark-premerge-build.sh runs the version-detection test for Spark 4.1 with Iceberg 1.11.0, so the new 1.11.0 commit-id mapping in IcebergProbeImpl gets automated coverage. - New iceberg111x/IcebergProviderImpl.scala uses the current-year-only copyright header per the new-file convention. Signed-off-by: Chong Gao --- iceberg/README.md | 11 ++- .../spark/source/GpuSparkScanAccess.java | 20 +++- .../rapids/iceberg/IcebergProbeImpl.scala | 8 +- .../source/GpuSparkCopyOnWriteV1Scan.scala} | 24 +++-- .../iceberg/iceberg110x/ShimUtilsImpl.java | 4 +- iceberg/iceberg-1-11-x/pom.xml | 96 +++++++++++++++++++ .../iceberg/iceberg111x/ShimUtilsImpl.java | 89 +++++++++++++++++ .../iceberg111x/GpuParquetIOShim.scala | 55 +++++++++++ .../iceberg111x/IcebergProviderImpl.scala | 21 ++++ .../source/GpuSparkCopyOnWriteScan.scala | 16 ++-- .../spark/rapids/iceberg/GpuInternalRow.java | 46 +++++++++ .../iceberg/iceberg16x/ShimUtilsImpl.java | 4 +- .../iceberg/iceberg19x/ShimUtilsImpl.java | 4 +- .../source/GpuSparkCopyOnWriteScan.scala | 52 ---------- .../src/main/python/iceberg/__init__.py | 2 +- .../src/main/python/iceberg/iceberg_test.py | 2 +- .../iceberg/iceberg_version_detection_test.py | 9 +- .../src/main/python/spark_session.py | 5 +- jenkins/spark-premerge-build.sh | 7 +- pom.xml | 9 +- scala2.13/iceberg/iceberg-1-11-x/pom.xml | 96 +++++++++++++++++++ scala2.13/pom.xml | 9 +- 22 files changed, 493 insertions(+), 96 deletions(-) rename iceberg/{iceberg-1-6-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala => common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteV1Scan.scala} (59%) create mode 100644 iceberg/iceberg-1-11-x/pom.xml create mode 100644 iceberg/iceberg-1-11-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg111x/ShimUtilsImpl.java create mode 100644 iceberg/iceberg-1-11-x/src/main/scala/com/nvidia/spark/rapids/iceberg/iceberg111x/GpuParquetIOShim.scala create mode 100644 iceberg/iceberg-1-11-x/src/main/scala/com/nvidia/spark/rapids/iceberg/iceberg111x/IcebergProviderImpl.scala rename iceberg/{iceberg-1-10-x => iceberg-1-11-x}/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala (77%) create mode 100644 iceberg/iceberg-1-11-x/src/main/spark411/java/com/nvidia/spark/rapids/iceberg/GpuInternalRow.java delete mode 100644 iceberg/iceberg-1-9-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala create mode 100644 scala2.13/iceberg/iceberg-1-11-x/pom.xml diff --git a/iceberg/README.md b/iceberg/README.md index 0b4d5e9eec5..293eaee3e5b 100644 --- a/iceberg/README.md +++ b/iceberg/README.md @@ -14,17 +14,22 @@ and the directory that contains the corresponding support code. | 1.6.x | Spark 3.5.0-3.5.3 | `iceberg-1-6-x` | | 1.9.x | Spark 3.5.4-3.5.8 | `iceberg-1-9-x` | | 1.10.x | Spark 3.5.4-3.5.8, 4.0.x | `iceberg-1-10-x` | +| 1.11.x | Spark 4.1.x | `iceberg-1-11-x` | -Iceberg GPU acceleration is currently supported on Spark 3.5.x and 4.0.x. +Iceberg GPU acceleration is currently supported on Spark 3.5.x, 4.0.x, and 4.1.x. For Spark 3.5.4+, both `iceberg-1-9-x` and `iceberg-1-10-x` modules are compiled into the build. The correct version-specific implementation is selected at runtime by probing the `iceberg-spark-runtime` jar on the classpath. Version-specific code lives in distinct -sub-packages (`iceberg19x`, `iceberg110x`) to avoid class conflicts, and the common -`ShimUtils` dispatcher delegates to the appropriate implementation. +sub-packages (`iceberg19x`, `iceberg110x`, `iceberg111x`) to avoid class conflicts, and the +common `ShimUtils` dispatcher delegates to the appropriate implementation. For Spark 4.0.x, only `iceberg-1-10-x` is compiled during the build. +For Spark 4.1.x, only `iceberg-1-11-x` is compiled during the build. Apache Iceberg +publishes the `iceberg-spark-runtime-4.1` artifact starting at version 1.11.0, so earlier +Iceberg releases cannot be used with Spark 4.1. + ## Code Shared Between Modules The `common` directory contains code that is shared across some or all of the Iceberg diff --git a/iceberg/common/src/main/java/org/apache/iceberg/spark/source/GpuSparkScanAccess.java b/iceberg/common/src/main/java/org/apache/iceberg/spark/source/GpuSparkScanAccess.java index 29664f3401f..56467eb155d 100644 --- a/iceberg/common/src/main/java/org/apache/iceberg/spark/source/GpuSparkScanAccess.java +++ b/iceberg/common/src/main/java/org/apache/iceberg/spark/source/GpuSparkScanAccess.java @@ -71,9 +71,23 @@ public static Table table(Scan scan) { } public static String branch(Scan scan) { - // Iceberg 1.10.x and earlier: protected method SparkScan.branch(). Iceberg 1.11.x - // removed it entirely; return null for display purposes. - return invokeMethod(sparkScan(scan), String.class, "branch"); + // Iceberg 1.10.x and earlier exposed a protected SparkScan.branch() method; + // 1.11.x removed it but the concrete scan classes still carry a private + // `branch` field (included in description()). Read the field directly so this + // works across all supported Iceberg versions. Returns null when no branch is + // set (or, defensively, when the field is absent). + Object target = sparkScan(scan); + Field f = findField(target.getClass(), "branch"); + if (f == null) { + return null; + } + try { + f.setAccessible(true); + Object v = f.get(target); + return v == null ? null : v.toString(); + } catch (IllegalAccessException e) { + return null; + } } public static boolean caseSensitive(Scan scan) { diff --git a/iceberg/common/src/main/scala/com/nvidia/spark/rapids/iceberg/IcebergProbeImpl.scala b/iceberg/common/src/main/scala/com/nvidia/spark/rapids/iceberg/IcebergProbeImpl.scala index a18b169fa5e..7e91b6b08f7 100644 --- a/iceberg/common/src/main/scala/com/nvidia/spark/rapids/iceberg/IcebergProbeImpl.scala +++ b/iceberg/common/src/main/scala/com/nvidia/spark/rapids/iceberg/IcebergProbeImpl.scala @@ -27,7 +27,7 @@ class IcebergProbeImpl extends IcebergProbe with Logging { ShimLoader.getShimVersion match { case _: SparkShimVersion => VersionUtils.cmpSparkVersion(3, 5, 0) >= 0 && - VersionUtils.cmpSparkVersion(4, 1, 0) < 0 + VersionUtils.cmpSparkVersion(4, 2, 0) < 0 case _ => false } } @@ -42,7 +42,8 @@ class IcebergProbeImpl extends IcebergProbe with Logging { "f40208ae6fb2f33e578c2637d3dea1db18739f31" -> "1.9.1", "071d5606bc6199a0be9b3f274ec7fbf111d88821" -> "1.9.2", "2114bf631e49af532d66e2ce148ee49dd1dd1f1f" -> "1.10.0", - "ccb8bc435062171e64bc8b7e5f56e6aed9c5b934" -> "1.10.1" + "ccb8bc435062171e64bc8b7e5f56e6aed9c5b934" -> "1.10.1", + "6976e020b894f6a6777704df2b8c4458cb291ae9" -> "1.11.0" ) // e.g. iceberg-spark-runtime-3.5_2.12-1.10.0-*.jar -> "1.10.0" @@ -76,7 +77,8 @@ class IcebergProbeImpl extends IcebergProbe with Logging { private val icebergVersionToShim: Map[String, String] = Map( "1.6" -> "iceberg16x", "1.9" -> "iceberg19x", - "1.10" -> "iceberg110x" + "1.10" -> "iceberg110x", + "1.11" -> "iceberg111x" ) override def shimPackage: String = { diff --git a/iceberg/iceberg-1-6-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala b/iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteV1Scan.scala similarity index 59% rename from iceberg/iceberg-1-6-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala rename to iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteV1Scan.scala index 426269ec1ca..c36ccb747f6 100644 --- a/iceberg/iceberg-1-6-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala +++ b/iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteV1Scan.scala @@ -23,10 +23,20 @@ import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering} import org.apache.spark.sql.sources.Filter /** - * Iceberg 1.6.x copy-on-write scan: {@code SupportsRuntimeFiltering} with - * {@code filter(Array[Filter])}. + * Copy-on-write scan for the Iceberg versions that expose the V1 runtime-filter + * contract: {@code SupportsRuntimeFiltering} with {@code filter(Array[Filter])}. + * This covers Iceberg 1.6.x, 1.9.x, and 1.10.x. Iceberg 1.11.x switched to + * {@code SupportsRuntimeV2Filtering} with {@code filter(Array[Predicate])} and + * therefore ships its own per-version subclass. + * + *

Because this class depends only on the public {@code Scan} + + * {@code SupportsRuntimeFiltering} types (Iceberg internals are reached through + * the root-loadable {@link GpuSparkScanAccess} bridge in the base class), the V1 + * path lives once in {@code iceberg/common} and is instantiated by every + * V1-version {@code ShimUtilsImpl} via {@link #create}, rather than being copied + * per module. */ -class GpuSparkCopyOnWriteScan( +class GpuSparkCopyOnWriteV1Scan( cpuScanArg: Scan, rapidsConfArg: RapidsConf, queryUsesInputFileArg: Boolean) @@ -41,12 +51,12 @@ class GpuSparkCopyOnWriteScan( override def filter(filters: Array[Filter]): Unit = runtimeFilterScan.filter(filters) override def withInputFile(): GpuScan = - new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, true) + new GpuSparkCopyOnWriteV1Scan(cpuScan, rapidsConf, true) } -object GpuSparkCopyOnWriteScan { - /** Java-callable factory used by {@code ShimUtilsImpl.newCopyOnWriteScan}. */ +object GpuSparkCopyOnWriteV1Scan { + /** Java-callable factory used by the 1.6.x / 1.9.x / 1.10.x {@code ShimUtilsImpl}. */ def create(cpuScan: Scan, rapidsConf: RapidsConf, queryUsesInputFile: Boolean) : GpuSparkScan = - new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, queryUsesInputFile) + new GpuSparkCopyOnWriteV1Scan(cpuScan, rapidsConf, queryUsesInputFile) } diff --git a/iceberg/iceberg-1-10-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg110x/ShimUtilsImpl.java b/iceberg/iceberg-1-10-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg110x/ShimUtilsImpl.java index bc3343deb8a..2aa4b463fc0 100644 --- a/iceberg/iceberg-1-10-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg110x/ShimUtilsImpl.java +++ b/iceberg/iceberg-1-10-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg110x/ShimUtilsImpl.java @@ -28,7 +28,7 @@ import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions; import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader; import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan; +import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteV1Scan; import org.apache.iceberg.spark.source.GpuSparkScan; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PartitionUtil; @@ -84,6 +84,6 @@ public GpuSparkScan newCopyOnWriteScan( Scan cpuScan, RapidsConf rapidsConf, boolean queryUsesInputFile) { - return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile); + return GpuSparkCopyOnWriteV1Scan.create(cpuScan, rapidsConf, queryUsesInputFile); } } diff --git a/iceberg/iceberg-1-11-x/pom.xml b/iceberg/iceberg-1-11-x/pom.xml new file mode 100644 index 00000000000..51f05280ce4 --- /dev/null +++ b/iceberg/iceberg-1-11-x/pom.xml @@ -0,0 +1,96 @@ + + + + 4.0.0 + + + com.nvidia + rapids-4-spark-parent_2.12 + 26.08.0-SNAPSHOT + ../../pom.xml + + + rapids-4-spark-iceberg-1-11-x_2.12 + RAPIDS Accelerator for Apache Iceberg + Apache Iceberg support for the RAPIDS Accelerator for Apache Spark + 26.08.0-SNAPSHOT + + + ../iceberg/iceberg-1-11-x + false + package + true + + -Xlint:all,-serial,-path,-try,-processing,-options,-classfile|-Werror + + + + + com.nvidia + rapids-4-spark-sql_${scala.binary.version} + ${spark.version.classifier} + + + org.apache.iceberg + iceberg-spark-runtime-${iceberg.artifact.suffix}_${scala.binary.version} + ${iceberg.111x.version} + provided + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-common-sources + generate-sources + + add-source + + + + ${spark.rapids.source.basedir}/iceberg/common/src/main/java + ${spark.rapids.source.basedir}/iceberg/common/src/main/scala + + + + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.rat + apache-rat-plugin + + + + diff --git a/iceberg/iceberg-1-11-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg111x/ShimUtilsImpl.java b/iceberg/iceberg-1-11-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg111x/ShimUtilsImpl.java new file mode 100644 index 00000000000..126da3640a0 --- /dev/null +++ b/iceberg/iceberg-1-11-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg111x/ShimUtilsImpl.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.iceberg.iceberg111x; + +import com.nvidia.spark.rapids.GpuMetric; +import com.nvidia.spark.rapids.RapidsConf; +import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile; +import com.nvidia.spark.rapids.iceberg.IcebergShimUtils; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.*; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.StorageCredential; +import org.apache.iceberg.io.SupportsStorageCredentials; +import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions; +import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan; +import org.apache.iceberg.spark.source.GpuSparkScan; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PartitionUtil; +import org.apache.spark.sql.connector.read.Scan; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** Iceberg 1.11.x shim: uses {@code SparkUtil::internalToSpark} and a cache-aware footer path. */ +public class ShimUtilsImpl implements IcebergShimUtils { + @Override + public String locationOf(ContentFile f) { + return f.location(); + } + + @Override + public Map constantsMap(FileScanTask task, Schema readSchema, Table table) { + if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) { + Types.StructType partitionType = Partitioning.partitionType(table); + return PartitionUtil.constantsMap(task, + partitionType, + SparkUtil::internalToSpark); + } else { + return PartitionUtil.constantsMap(task, SparkUtil::internalToSpark); + } + } + + @Override + public Map> storageCredentialOverlays(FileIO fileIO) { + if (!(fileIO instanceof SupportsStorageCredentials)) { + return Collections.emptyMap(); + } + Map> result = new HashMap<>(); + for (StorageCredential sc : ((SupportsStorageCredentials) fileIO).credentials()) { + result.put(sc.prefix(), sc.config()); + } + return result; + } + + @Override + public ParquetFileReader openParquetReader( + IcebergInputFile inputFile, + Path filePath, + ParquetReadOptions options, + scala.collection.immutable.Map metrics) throws IOException { + return GpuParquetIOShim.openReader(inputFile, filePath, options, metrics); + } + + @Override + public GpuSparkScan newCopyOnWriteScan( + Scan cpuScan, + RapidsConf rapidsConf, + boolean queryUsesInputFile) { + return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile); + } +} diff --git a/iceberg/iceberg-1-11-x/src/main/scala/com/nvidia/spark/rapids/iceberg/iceberg111x/GpuParquetIOShim.scala b/iceberg/iceberg-1-11-x/src/main/scala/com/nvidia/spark/rapids/iceberg/iceberg111x/GpuParquetIOShim.scala new file mode 100644 index 00000000000..348becf0854 --- /dev/null +++ b/iceberg/iceberg-1-11-x/src/main/scala/com/nvidia/spark/rapids/iceberg/iceberg111x/GpuParquetIOShim.scala @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.iceberg.iceberg111x + +import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} +import com.nvidia.spark.rapids.GpuMetric +import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile +import com.nvidia.spark.rapids.iceberg.parquet.converter.ToIcebergShaded +import com.nvidia.spark.rapids.parquet.{HMBInputFile, ParquetFooterUtils} +import org.apache.hadoop.fs.Path +import org.apache.iceberg.parquet.GpuParquetIO +import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions +import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader + +/** + * Iceberg 1.11.x shim: reads the footer via `FileCache` and injects it into `ParquetFileReader` + * through the 4-arg `(InputFile, ParquetMetadata, ParquetReadOptions, SeekableInputStream)` + * constructor that is available from iceberg 1.10.x onward. Lives in the versioned + * `iceberg111x` package so each minor-version shim can evolve independently without the + * shade-plugin class-name collision that the 1.10.x split introduced. + */ +object GpuParquetIOShim { + def openReader( + inputFile: IcebergInputFile, + filePath: Path, + options: ParquetReadOptions, + metrics: Map[String, GpuMetric]): ParquetFileReader = { + val metadata = withResource(ParquetFooterUtils.getFooterBuffer( + inputFile, metrics, + ParquetFooterUtils.readFooterBufferFromInputFile(inputFile, filePath))) { hmb => + val shadedHmbFile = ToIcebergShaded.shade(new HMBInputFile(hmb)) + withResource(shadedHmbFile.newStream()) { hmbStream => + ParquetFileReader.readFooter(shadedHmbFile, options, hmbStream) + } + } + val realFile = GpuParquetIO.file(inputFile.getDelegate) + closeOnExcept(realFile.newStream()) { stream => + new ParquetFileReader(realFile, metadata, options, stream) + } + } +} diff --git a/iceberg/iceberg-1-11-x/src/main/scala/com/nvidia/spark/rapids/iceberg/iceberg111x/IcebergProviderImpl.scala b/iceberg/iceberg-1-11-x/src/main/scala/com/nvidia/spark/rapids/iceberg/iceberg111x/IcebergProviderImpl.scala new file mode 100644 index 00000000000..7f981c550d0 --- /dev/null +++ b/iceberg/iceberg-1-11-x/src/main/scala/com/nvidia/spark/rapids/iceberg/iceberg111x/IcebergProviderImpl.scala @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.iceberg.iceberg111x + +import com.nvidia.spark.rapids.iceberg.IcebergProviderBase + +class IcebergProviderImpl extends IcebergProviderBase diff --git a/iceberg/iceberg-1-10-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala b/iceberg/iceberg-1-11-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala similarity index 77% rename from iceberg/iceberg-1-10-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala rename to iceberg/iceberg-1-11-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala index f0d59bf202d..8ddac3182ae 100644 --- a/iceberg/iceberg-1-10-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala +++ b/iceberg/iceberg-1-11-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala @@ -19,26 +19,26 @@ package org.apache.iceberg.spark.source import com.nvidia.spark.rapids.{GpuScan, RapidsConf} import org.apache.spark.sql.connector.expressions.NamedReference -import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering} -import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.connector.expressions.filter.Predicate +import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeV2Filtering} /** - * Iceberg 1.10.x copy-on-write scan: {@code SupportsRuntimeFiltering} with - * {@code filter(Array[Filter])}. + * Iceberg 1.11.x copy-on-write scan: {@code SupportsRuntimeV2Filtering} with + * {@code filter(Array[Predicate])}. */ class GpuSparkCopyOnWriteScan( cpuScanArg: Scan, rapidsConfArg: RapidsConf, queryUsesInputFileArg: Boolean) extends GpuSparkCopyOnWriteScanBase(cpuScanArg, rapidsConfArg, queryUsesInputFileArg) - with SupportsRuntimeFiltering { + with SupportsRuntimeV2Filtering { - private def runtimeFilterScan: SupportsRuntimeFiltering = - cpuScan.asInstanceOf[SupportsRuntimeFiltering] + private def runtimeFilterScan: SupportsRuntimeV2Filtering = + cpuScan.asInstanceOf[SupportsRuntimeV2Filtering] override def filterAttributes(): Array[NamedReference] = runtimeFilterScan.filterAttributes() - override def filter(filters: Array[Filter]): Unit = runtimeFilterScan.filter(filters) + override def filter(predicates: Array[Predicate]): Unit = runtimeFilterScan.filter(predicates) override def withInputFile(): GpuScan = new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, true) diff --git a/iceberg/iceberg-1-11-x/src/main/spark411/java/com/nvidia/spark/rapids/iceberg/GpuInternalRow.java b/iceberg/iceberg-1-11-x/src/main/spark411/java/com/nvidia/spark/rapids/iceberg/GpuInternalRow.java new file mode 100644 index 00000000000..0853e6c4193 --- /dev/null +++ b/iceberg/iceberg-1-11-x/src/main/spark411/java/com/nvidia/spark/rapids/iceberg/GpuInternalRow.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/*** spark-rapids-shim-json-lines +{"spark": "411"} +spark-rapids-shim-json-lines ***/ + +package com.nvidia.spark.rapids.iceberg; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.unsafe.types.GeographyVal; +import org.apache.spark.unsafe.types.GeometryVal; +import org.apache.spark.unsafe.types.VariantVal; + +public class GpuInternalRow extends GpuInternalRowBase { + public GpuInternalRow(InternalRow row) { + super(row); + } + + @Override + public VariantVal getVariant(int ordinal) { + return getWrapped().getVariant(ordinal); + } + + @Override + public GeometryVal getGeometry(int ordinal) { + return getWrapped().getGeometry(ordinal); + } + + @Override + public GeographyVal getGeography(int ordinal) { + return getWrapped().getGeography(ordinal); + } +} diff --git a/iceberg/iceberg-1-6-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg16x/ShimUtilsImpl.java b/iceberg/iceberg-1-6-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg16x/ShimUtilsImpl.java index 7319d27392c..1412b3450b9 100644 --- a/iceberg/iceberg-1-6-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg16x/ShimUtilsImpl.java +++ b/iceberg/iceberg-1-6-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg16x/ShimUtilsImpl.java @@ -21,7 +21,7 @@ import org.apache.iceberg.*; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.spark.source.GpuBaseReader; -import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan; +import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteV1Scan; import org.apache.iceberg.spark.source.GpuSparkScan; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PartitionUtil; @@ -64,6 +64,6 @@ public GpuSparkScan newCopyOnWriteScan( Scan cpuScan, RapidsConf rapidsConf, boolean queryUsesInputFile) { - return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile); + return GpuSparkCopyOnWriteV1Scan.create(cpuScan, rapidsConf, queryUsesInputFile); } } diff --git a/iceberg/iceberg-1-9-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg19x/ShimUtilsImpl.java b/iceberg/iceberg-1-9-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg19x/ShimUtilsImpl.java index 35d8a1cf701..dcb8453fc42 100644 --- a/iceberg/iceberg-1-9-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg19x/ShimUtilsImpl.java +++ b/iceberg/iceberg-1-9-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg19x/ShimUtilsImpl.java @@ -23,7 +23,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.StorageCredential; import org.apache.iceberg.io.SupportsStorageCredentials; -import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan; +import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteV1Scan; import org.apache.iceberg.spark.source.GpuSparkScan; import org.apache.iceberg.spark.source.GpuStructInternalRow; import org.apache.iceberg.types.Type; @@ -86,6 +86,6 @@ public GpuSparkScan newCopyOnWriteScan( Scan cpuScan, RapidsConf rapidsConf, boolean queryUsesInputFile) { - return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile); + return GpuSparkCopyOnWriteV1Scan.create(cpuScan, rapidsConf, queryUsesInputFile); } } diff --git a/iceberg/iceberg-1-9-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala b/iceberg/iceberg-1-9-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala deleted file mode 100644 index 21ad9076d51..00000000000 --- a/iceberg/iceberg-1-9-x/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2026, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iceberg.spark.source - -import com.nvidia.spark.rapids.{GpuScan, RapidsConf} - -import org.apache.spark.sql.connector.expressions.NamedReference -import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering} -import org.apache.spark.sql.sources.Filter - -/** - * Iceberg 1.9.x copy-on-write scan: {@code SupportsRuntimeFiltering} with - * {@code filter(Array[Filter])}. - */ -class GpuSparkCopyOnWriteScan( - cpuScanArg: Scan, - rapidsConfArg: RapidsConf, - queryUsesInputFileArg: Boolean) - extends GpuSparkCopyOnWriteScanBase(cpuScanArg, rapidsConfArg, queryUsesInputFileArg) - with SupportsRuntimeFiltering { - - private def runtimeFilterScan: SupportsRuntimeFiltering = - cpuScan.asInstanceOf[SupportsRuntimeFiltering] - - override def filterAttributes(): Array[NamedReference] = runtimeFilterScan.filterAttributes() - - override def filter(filters: Array[Filter]): Unit = runtimeFilterScan.filter(filters) - - override def withInputFile(): GpuScan = - new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, true) -} - -object GpuSparkCopyOnWriteScan { - /** Java-callable factory used by {@code ShimUtilsImpl.newCopyOnWriteScan}. */ - def create(cpuScan: Scan, rapidsConf: RapidsConf, queryUsesInputFile: Boolean) - : GpuSparkScan = - new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, queryUsesInputFile) -} diff --git a/integration_tests/src/main/python/iceberg/__init__.py b/integration_tests/src/main/python/iceberg/__init__.py index db38d7c4f8a..3f591793b43 100644 --- a/integration_tests/src/main/python/iceberg/__init__.py +++ b/integration_tests/src/main/python/iceberg/__init__.py @@ -28,7 +28,7 @@ iceberg_unsupported_mark = pytest.mark.skipif( not is_iceberg_supported_spark(), - reason="Iceberg acceleration requires Spark 3.5.x or 4.0.x") + reason="Iceberg acceleration requires Spark 3.5.x, 4.0.x, or 4.1.x") # iceberg supported types iceberg_table_gen = MappingProxyType({ diff --git a/integration_tests/src/main/python/iceberg/iceberg_test.py b/integration_tests/src/main/python/iceberg/iceberg_test.py index df24fcb4d4a..87345b108d1 100644 --- a/integration_tests/src/main/python/iceberg/iceberg_test.py +++ b/integration_tests/src/main/python/iceberg/iceberg_test.py @@ -340,7 +340,7 @@ def setup_snapshots(spark): "ORDER BY committed_at").head()[0] first_snapshot_id = with_cpu_session(setup_snapshots) assert_gpu_and_cpu_are_equal_collect( - lambda spark : spark.read.option("snapshot-id", first_snapshot_id) \ + lambda spark : spark.read.option("versionAsOf", first_snapshot_id) \ .format("iceberg").load("{}".format(full_table)), conf={'spark.rapids.sql.format.parquet.reader.type': reader_type}) diff --git a/integration_tests/src/main/python/iceberg/iceberg_version_detection_test.py b/integration_tests/src/main/python/iceberg/iceberg_version_detection_test.py index 443fb4e9476..54896d061f7 100644 --- a/integration_tests/src/main/python/iceberg/iceberg_version_detection_test.py +++ b/integration_tests/src/main/python/iceberg/iceberg_version_detection_test.py @@ -15,12 +15,15 @@ import os import pytest +from iceberg import iceberg_unsupported_mark from marks import allow_non_gpu, iceberg -from spark_session import is_spark_35x, with_gpu_session +from spark_session import with_gpu_session from spark_init_internal import spark_version -pytestmark = pytest.mark.skipif(not is_spark_35x(), - reason="Iceberg support only for Spark 3.5.x") +# Iceberg version detection is exercised on every Spark version that supports +# Iceberg (3.5.x / 4.0.x / 4.1.x) so the per-version commit-id -> version mappings +# in IcebergProbeImpl — including the 1.11.0 mapping used on Spark 4.1 — are covered. +pytestmark = iceberg_unsupported_mark @allow_non_gpu(any=True) @iceberg diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 57c3dd1fa3c..c0105c0f3d6 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -252,8 +252,11 @@ def is_spark_35x(): def is_spark_40x(): return "4.0.0" <= spark_version() < "4.1.0" +def is_spark_41x(): + return "4.1.0" <= spark_version() < "4.2.0" + def is_iceberg_supported_spark(): - return is_spark_35x() or is_spark_40x() + return is_spark_35x() or is_spark_40x() or is_spark_41x() def is_spark_400_or_later(): return spark_version() >= "4.0.0" diff --git a/jenkins/spark-premerge-build.sh b/jenkins/spark-premerge-build.sh index 7cf7a68af04..8e130944f5a 100755 --- a/jenkins/spark-premerge-build.sh +++ b/jenkins/spark-premerge-build.sh @@ -161,7 +161,8 @@ run_iceberg_version_detect_tests() { local spark_patch_ver spark_patch_ver=$(echo "$spark_ver" | cut -d. -f3) - if [[ "$iceberg_spark_ver" != "3.5" && "$iceberg_spark_ver" != "4.0" ]]; then + if [[ "$iceberg_spark_ver" != "3.5" && "$iceberg_spark_ver" != "4.0" \ + && "$iceberg_spark_ver" != "4.1" ]]; then echo "!!!! Skipping Iceberg version detection. Not supported on Spark $iceberg_spark_ver" return 0 fi @@ -169,7 +170,9 @@ run_iceberg_version_detect_tests() { # Supported Iceberg versions per Spark version — must stay in sync with # run_iceberg_tests() in spark-tests.sh. local iceberg_versions - if [[ "$iceberg_spark_ver" == "4.0" ]]; then + if [[ "$iceberg_spark_ver" == "4.1" ]]; then + iceberg_versions="1.11.0" + elif [[ "$iceberg_spark_ver" == "4.0" ]]; then iceberg_versions="1.10.1" elif [[ "$spark_patch_ver" -le 3 ]]; then iceberg_versions="1.6.1" diff --git a/pom.xml b/pom.xml index 450211bcc4a..73ee102c87d 100644 --- a/pom.xml +++ b/pom.xml @@ -690,14 +690,15 @@ ${spark411.version} 1.13.1 rapids-4-spark-delta-41x - ${spark40x.iceberg.artifact.suffix} - ${iceberg.110x.version} + ${spark41x.iceberg.artifact.suffix} + ${iceberg.111x.version} + rapids-4-spark-iceberg-1-11-x 2.0.7 -Xlint:all,-serial,-path,-try,-processing,-options|-Werror delta-lake/delta-41x - iceberg/iceberg-stub + iceberg/iceberg-1-11-x @@ -924,10 +925,12 @@ 1.6.1 3.5 4.0 + 4.1 3.3 1.6.1 1.9.2 1.10.1 + 1.11.0 ${iceberg.version} ${iceberg.16x.version} + + 4.0.0 + + + com.nvidia + rapids-4-spark-parent_2.13 + 26.08.0-SNAPSHOT + ../../pom.xml + + + rapids-4-spark-iceberg-1-11-x_2.13 + RAPIDS Accelerator for Apache Iceberg + Apache Iceberg support for the RAPIDS Accelerator for Apache Spark + 26.08.0-SNAPSHOT + + + ../iceberg/iceberg-1-11-x + false + package + true + + -Xlint:all,-serial,-path,-try,-processing,-options,-classfile|-Werror + + + + + com.nvidia + rapids-4-spark-sql_${scala.binary.version} + ${spark.version.classifier} + + + org.apache.iceberg + iceberg-spark-runtime-${iceberg.artifact.suffix}_${scala.binary.version} + ${iceberg.111x.version} + provided + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-common-sources + generate-sources + + add-source + + + + ${spark.rapids.source.basedir}/iceberg/common/src/main/java + ${spark.rapids.source.basedir}/iceberg/common/src/main/scala + + + + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.rat + apache-rat-plugin + + + + diff --git a/scala2.13/pom.xml b/scala2.13/pom.xml index 6b9a9aa8d68..857f572e747 100644 --- a/scala2.13/pom.xml +++ b/scala2.13/pom.xml @@ -690,14 +690,15 @@ ${spark411.version} 1.13.1 rapids-4-spark-delta-41x - ${spark40x.iceberg.artifact.suffix} - ${iceberg.110x.version} + ${spark41x.iceberg.artifact.suffix} + ${iceberg.111x.version} + rapids-4-spark-iceberg-1-11-x 2.0.7 -Xlint:all,-serial,-path,-try,-processing,-options|-Werror delta-lake/delta-41x - iceberg/iceberg-stub + iceberg/iceberg-1-11-x @@ -924,10 +925,12 @@ 1.6.1 3.5 4.0 + 4.1 3.3 1.6.1 1.9.2 1.10.1 + 1.11.0 ${iceberg.version} ${iceberg.16x.version}