Skip to content

Commit 6491216

Browse files
author
Chong Gao
committed
Iceberg: add iceberg-1-11-x module wired to release411 (Spark 4.1)
Iceberg published iceberg-spark-runtime-4.1_2.13 starting at version 1.11.0; iceberg-1-10-x is not an option on Spark 4.1 because Iceberg never released a 4.1 runtime before 1.11. Adds a new Maven submodule iceberg/iceberg-1-11-x and switches the release411 profile from iceberg/iceberg-stub to use it. Module skeleton (mirrors iceberg-1-10-x with the iceberg111x sub-package and a spark411 shim source dir): - iceberg/iceberg-1-11-x/pom.xml + scala2.13 mirror - iceberg111x/IcebergProviderImpl, ShimUtilsImpl, GpuParquetIOShim - org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan (Iceberg 1.11 copy-on-write scan: SupportsRuntimeV2Filtering with filter(Predicate[])) - spark411/.../GpuInternalRow overriding the new SpecializedGetters methods Spark 4.1 added (getGeometry, getGeography) alongside the existing getVariant Wiring: - parent pom: add spark41x.iceberg.artifact.suffix=4.1 and iceberg.111x.version=1.11.0 properties; swap release411 from iceberg-stub to iceberg-1-11-x with the 4.1 runtime suffix - scala2.13 mirror regenerated via build/make-scala-version-build-files.sh - IcebergProbeImpl: lift the < 4.1.0 Spark cap to < 4.2.0; add 1.11.0 commit-id mapping and "1.11" -> "iceberg111x" shim sub-package - README: document the new row in the Iceberg/Spark support matrix Integration tests: enable iceberg suite on Spark 4.1.x: - spark_session.py: add is_spark_41x() helper and include 4.1.x in is_iceberg_supported_spark() so the @iceberg-marked tests no longer skip on 4.1. - iceberg/__init__.py: update the skip reason to mention 4.1.x. - iceberg_test.py::test_iceberg_read_timetravel: Iceberg 1.11 removed the `.option("snapshot-id", ...)` read path and directs users at Spark's built-in `versionAsOf`. Switching to versionAsOf works on both 1.10 and 1.11. Signed-off-by: Chong Gao <res_life@163.com>
1 parent a30d588 commit 6491216

14 files changed

Lines changed: 486 additions & 15 deletions

File tree

iceberg/README.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,22 @@ and the directory that contains the corresponding support code.
1414
| 1.6.x | Spark 3.5.0-3.5.3 | `iceberg-1-6-x` |
1515
| 1.9.x | Spark 3.5.4-3.5.8 | `iceberg-1-9-x` |
1616
| 1.10.x | Spark 3.5.4-3.5.8, 4.0.x | `iceberg-1-10-x` |
17+
| 1.11.x | Spark 4.1.x | `iceberg-1-11-x` |
1718

18-
Iceberg GPU acceleration is currently supported on Spark 3.5.x and 4.0.x.
19+
Iceberg GPU acceleration is currently supported on Spark 3.5.x, 4.0.x, and 4.1.x.
1920

2021
For Spark 3.5.4+, both `iceberg-1-9-x` and `iceberg-1-10-x` modules are compiled into the
2122
build. The correct version-specific implementation is selected at runtime by probing the
2223
`iceberg-spark-runtime` jar on the classpath. Version-specific code lives in distinct
23-
sub-packages (`iceberg19x`, `iceberg110x`) to avoid class conflicts, and the common
24-
`ShimUtils` dispatcher delegates to the appropriate implementation.
24+
sub-packages (`iceberg19x`, `iceberg110x`, `iceberg111x`) to avoid class conflicts, and the
25+
common `ShimUtils` dispatcher delegates to the appropriate implementation.
2526

2627
For Spark 4.0.x, only `iceberg-1-10-x` is compiled during the build.
2728

29+
For Spark 4.1.x, only `iceberg-1-11-x` is compiled during the build. Apache Iceberg
30+
publishes the `iceberg-spark-runtime-4.1` artifact starting at version 1.11.0, so earlier
31+
Iceberg releases cannot be used with Spark 4.1.
32+
2833
## Code Shared Between Modules
2934

3035
The `common` directory contains code that is shared across some or all of the Iceberg

iceberg/common/src/main/scala/com/nvidia/spark/rapids/iceberg/IcebergProbeImpl.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class IcebergProbeImpl extends IcebergProbe with Logging {
2727
ShimLoader.getShimVersion match {
2828
case _: SparkShimVersion =>
2929
VersionUtils.cmpSparkVersion(3, 5, 0) >= 0 &&
30-
VersionUtils.cmpSparkVersion(4, 1, 0) < 0
30+
VersionUtils.cmpSparkVersion(4, 2, 0) < 0
3131
case _ => false
3232
}
3333
}
@@ -42,7 +42,8 @@ class IcebergProbeImpl extends IcebergProbe with Logging {
4242
"f40208ae6fb2f33e578c2637d3dea1db18739f31" -> "1.9.1",
4343
"071d5606bc6199a0be9b3f274ec7fbf111d88821" -> "1.9.2",
4444
"2114bf631e49af532d66e2ce148ee49dd1dd1f1f" -> "1.10.0",
45-
"ccb8bc435062171e64bc8b7e5f56e6aed9c5b934" -> "1.10.1"
45+
"ccb8bc435062171e64bc8b7e5f56e6aed9c5b934" -> "1.10.1",
46+
"6976e020b894f6a6777704df2b8c4458cb291ae9" -> "1.11.0"
4647
)
4748

4849
// e.g. iceberg-spark-runtime-3.5_2.12-1.10.0-*.jar -> "1.10.0"
@@ -76,7 +77,8 @@ class IcebergProbeImpl extends IcebergProbe with Logging {
7677
private val icebergVersionToShim: Map[String, String] = Map(
7778
"1.6" -> "iceberg16x",
7879
"1.9" -> "iceberg19x",
79-
"1.10" -> "iceberg110x"
80+
"1.10" -> "iceberg110x",
81+
"1.11" -> "iceberg111x"
8082
)
8183

8284
override def shimPackage: String = {

iceberg/iceberg-1-11-x/pom.xml

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Copyright (c) 2026 NVIDIA CORPORATION.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
-->
17+
<project xmlns="http://maven.apache.org/POM/4.0.0"
18+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
19+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<modelVersion>4.0.0</modelVersion>
21+
22+
<parent>
23+
<groupId>com.nvidia</groupId>
24+
<artifactId>rapids-4-spark-parent_2.12</artifactId>
25+
<version>26.08.0-SNAPSHOT</version>
26+
<relativePath>../../pom.xml</relativePath>
27+
</parent>
28+
29+
<artifactId>rapids-4-spark-iceberg-1-11-x_2.12</artifactId>
30+
<name>RAPIDS Accelerator for Apache Iceberg</name>
31+
<description>Apache Iceberg support for the RAPIDS Accelerator for Apache Spark</description>
32+
<version>26.08.0-SNAPSHOT</version>
33+
34+
<properties>
35+
<rapids.module>../iceberg/iceberg-1-11-x</rapids.module>
36+
<rapids.compressed.artifact>false</rapids.compressed.artifact>
37+
<rapids.shim.jar.phase>package</rapids.shim.jar.phase>
38+
<maven.scaladoc.skip>true</maven.scaladoc.skip>
39+
<!--
40+
Spark 4.x catalyst class files (Table, WriteBuilder, etc.) carry
41+
@Deprecated(since = "...") annotations. Reading them under a Java 8
42+
target makes javac emit "Cannot find annotation method 'since()' in
43+
type 'Deprecated'" against those class files, which the project's
44+
-Werror promotes to an error. Exclude classfile warnings (and -options
45+
for parity with release411) so this module's Java sources compile
46+
under Spark 4.1 builds.
47+
-->
48+
<scala.javac.args>-Xlint:all,-serial,-path,-try,-processing,-options,-classfile|-Werror</scala.javac.args>
49+
</properties>
50+
51+
<dependencies>
52+
<dependency>
53+
<groupId>com.nvidia</groupId>
54+
<artifactId>rapids-4-spark-sql_${scala.binary.version}</artifactId>
55+
<classifier>${spark.version.classifier}</classifier>
56+
</dependency>
57+
<dependency>
58+
<groupId>org.apache.iceberg</groupId>
59+
<artifactId>iceberg-spark-runtime-${iceberg.artifact.suffix}_${scala.binary.version}</artifactId>
60+
<version>${iceberg.111x.version}</version>
61+
<scope>provided</scope>
62+
</dependency>
63+
</dependencies>
64+
65+
<build>
66+
<plugins>
67+
<plugin>
68+
<groupId>org.codehaus.mojo</groupId>
69+
<artifactId>build-helper-maven-plugin</artifactId>
70+
<executions>
71+
<execution>
72+
<id>add-common-sources</id>
73+
<phase>generate-sources</phase>
74+
<goals>
75+
<goal>add-source</goal>
76+
</goals>
77+
<configuration>
78+
<sources>
79+
<source>${spark.rapids.source.basedir}/iceberg/common/src/main/java</source>
80+
<source>${spark.rapids.source.basedir}/iceberg/common/src/main/scala</source>
81+
</sources>
82+
</configuration>
83+
</execution>
84+
</executions>
85+
</plugin>
86+
<plugin>
87+
<groupId>net.alchim31.maven</groupId>
88+
<artifactId>scala-maven-plugin</artifactId>
89+
</plugin>
90+
<plugin>
91+
<groupId>org.apache.rat</groupId>
92+
<artifactId>apache-rat-plugin</artifactId>
93+
</plugin>
94+
</plugins>
95+
</build>
96+
</project>
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright (c) 2026, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.nvidia.spark.rapids.iceberg.iceberg111x;
18+
19+
import com.nvidia.spark.rapids.GpuMetric;
20+
import com.nvidia.spark.rapids.RapidsConf;
21+
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile;
22+
import com.nvidia.spark.rapids.iceberg.IcebergShimUtils;
23+
import org.apache.hadoop.fs.Path;
24+
import org.apache.iceberg.*;
25+
import org.apache.iceberg.io.FileIO;
26+
import org.apache.iceberg.io.StorageCredential;
27+
import org.apache.iceberg.io.SupportsStorageCredentials;
28+
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
29+
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
30+
import org.apache.iceberg.spark.SparkUtil;
31+
import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan;
32+
import org.apache.iceberg.spark.source.GpuSparkScan;
33+
import org.apache.iceberg.types.Types;
34+
import org.apache.iceberg.util.PartitionUtil;
35+
import org.apache.spark.sql.connector.read.Scan;
36+
37+
import java.io.IOException;
38+
import java.util.Collections;
39+
import java.util.HashMap;
40+
import java.util.Map;
41+
42+
/** Iceberg 1.11.x shim: uses {@code SparkUtil::internalToSpark} and a cache-aware footer path. */
43+
public class ShimUtilsImpl implements IcebergShimUtils {
44+
@Override
45+
public String locationOf(ContentFile<?> f) {
46+
return f.location();
47+
}
48+
49+
@Override
50+
public Map<Integer, ?> constantsMap(FileScanTask task, Schema readSchema, Table table) {
51+
if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
52+
Types.StructType partitionType = Partitioning.partitionType(table);
53+
return PartitionUtil.constantsMap(task,
54+
partitionType,
55+
SparkUtil::internalToSpark);
56+
} else {
57+
return PartitionUtil.constantsMap(task, SparkUtil::internalToSpark);
58+
}
59+
}
60+
61+
@Override
62+
public Map<String, Map<String, String>> storageCredentialOverlays(FileIO fileIO) {
63+
if (!(fileIO instanceof SupportsStorageCredentials)) {
64+
return Collections.emptyMap();
65+
}
66+
Map<String, Map<String, String>> result = new HashMap<>();
67+
for (StorageCredential sc : ((SupportsStorageCredentials) fileIO).credentials()) {
68+
result.put(sc.prefix(), sc.config());
69+
}
70+
return result;
71+
}
72+
73+
@Override
74+
public ParquetFileReader openParquetReader(
75+
IcebergInputFile inputFile,
76+
Path filePath,
77+
ParquetReadOptions options,
78+
scala.collection.immutable.Map<String, GpuMetric> metrics) throws IOException {
79+
return GpuParquetIOShim.openReader(inputFile, filePath, options, metrics);
80+
}
81+
82+
@Override
83+
public GpuSparkScan newCopyOnWriteScan(
84+
Scan cpuScan,
85+
RapidsConf rapidsConf,
86+
boolean queryUsesInputFile) {
87+
return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile);
88+
}
89+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2026, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.nvidia.spark.rapids.iceberg.iceberg111x
18+
19+
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
20+
import com.nvidia.spark.rapids.GpuMetric
21+
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile
22+
import com.nvidia.spark.rapids.iceberg.parquet.converter.ToIcebergShaded
23+
import com.nvidia.spark.rapids.parquet.{HMBInputFile, ParquetFooterUtils}
24+
import org.apache.hadoop.fs.Path
25+
import org.apache.iceberg.parquet.GpuParquetIO
26+
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions
27+
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader
28+
29+
/**
30+
* Iceberg 1.11.x shim: reads the footer via `FileCache` and injects it into `ParquetFileReader`
31+
* through the 4-arg `(InputFile, ParquetMetadata, ParquetReadOptions, SeekableInputStream)`
32+
* constructor that is available from iceberg 1.10.x onward. Lives in the versioned
33+
* `iceberg111x` package so each minor-version shim can evolve independently without the
34+
* shade-plugin class-name collision that the 1.10.x split introduced.
35+
*/
36+
object GpuParquetIOShim {
37+
def openReader(
38+
inputFile: IcebergInputFile,
39+
filePath: Path,
40+
options: ParquetReadOptions,
41+
metrics: Map[String, GpuMetric]): ParquetFileReader = {
42+
val metadata = withResource(ParquetFooterUtils.getFooterBuffer(
43+
inputFile, metrics,
44+
ParquetFooterUtils.readFooterBufferFromInputFile(inputFile, filePath))) { hmb =>
45+
val shadedHmbFile = ToIcebergShaded.shade(new HMBInputFile(hmb))
46+
withResource(shadedHmbFile.newStream()) { hmbStream =>
47+
ParquetFileReader.readFooter(shadedHmbFile, options, hmbStream)
48+
}
49+
}
50+
val realFile = GpuParquetIO.file(inputFile.getDelegate)
51+
closeOnExcept(realFile.newStream()) { stream =>
52+
new ParquetFileReader(realFile, metadata, options, stream)
53+
}
54+
}
55+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright (c) 2022-2026, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.nvidia.spark.rapids.iceberg.iceberg111x
18+
19+
import com.nvidia.spark.rapids.iceberg.IcebergProviderBase
20+
21+
class IcebergProviderImpl extends IcebergProviderBase
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (c) 2026, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.iceberg.spark.source
18+
19+
import com.nvidia.spark.rapids.{GpuScan, RapidsConf}
20+
21+
import org.apache.spark.sql.connector.expressions.NamedReference
22+
import org.apache.spark.sql.connector.expressions.filter.Predicate
23+
import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeV2Filtering}
24+
25+
/**
26+
* Iceberg 1.11.x copy-on-write scan: {@code SupportsRuntimeV2Filtering} with
27+
* {@code filter(Array[Predicate])}.
28+
*/
29+
class GpuSparkCopyOnWriteScan(
30+
cpuScanArg: Scan,
31+
rapidsConfArg: RapidsConf,
32+
queryUsesInputFileArg: Boolean)
33+
extends GpuSparkCopyOnWriteScanBase(cpuScanArg, rapidsConfArg, queryUsesInputFileArg)
34+
with SupportsRuntimeV2Filtering {
35+
36+
private def runtimeFilterScan: SupportsRuntimeV2Filtering =
37+
cpuScan.asInstanceOf[SupportsRuntimeV2Filtering]
38+
39+
override def filterAttributes(): Array[NamedReference] = runtimeFilterScan.filterAttributes()
40+
41+
override def filter(predicates: Array[Predicate]): Unit = runtimeFilterScan.filter(predicates)
42+
43+
override def withInputFile(): GpuScan =
44+
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, true)
45+
}
46+
47+
object GpuSparkCopyOnWriteScan {
48+
/** Java-callable factory used by {@code ShimUtilsImpl.newCopyOnWriteScan}. */
49+
def create(cpuScan: Scan, rapidsConf: RapidsConf, queryUsesInputFile: Boolean)
50+
: GpuSparkScan =
51+
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, queryUsesInputFile)
52+
}

0 commit comments

Comments
 (0)