Skip to content

Commit 085db63

Browse files
author
Chong Gao
committed
Iceberg 1.11 support for Spark 411, part (3/3): accelerate SparkIncrementalAppendScan; enable 4.1 iceberg CI
Iceberg 1.11 split SparkBatchQueryScan into a new class hierarchy and the incremental-append query path (.option("start-snapshot-id", ...) + .option("end-snapshot-id", ...)) moved into a brand-new class org.apache.iceberg.spark.source.SparkIncrementalAppendScan (package-private, 1.11-only, extends SparkRuntimeFilterableScan). Before 1.11 the same path went through SparkBatchQueryScan and was matched by the existing batch-query ScanRule in IcebergProviderBase, so without a rule for the new class the leaf falls back to CPU. Adds: - GpuSparkIncrementalAppendScan in org.apache.iceberg.spark.source — mirrors GpuSparkBatchQueryScan since both CPU scans extend SparkRuntimeFilterableScan (a SparkPartitioningAwareScan<PartitionScanTask>). Takes the public Scan type and reaches Iceberg internals only through the root-loadable GpuSparkScanAccess bridge + public SupportsRuntimeV2Filtering; it does NOT reference the package-private SparkIncrementalAppendScan directly, so it works under extraClassPath (system-classpath) where the Iceberg classes load in the app classloader and this shimmed class loads in Spark's MutableURLClassLoader (issue #14959). - iceberg111x.IcebergProviderImpl overrides getScans to register a third ScanRule for SparkIncrementalAppendScan on top of the base provider's two rules. The CPU class is loaded by string (ShimReflectionUtils.loadClass) because it is package-private. CI: enable the full Iceberg integration suite on Spark 4.1 / Iceberg 1.11.0. run_iceberg_tests() in jenkins/spark-tests.sh now handles Spark 4.1 (Scala 2.13, Iceberg 1.11.0), matching the run_iceberg_version_detect_tests() change in part (2/3) so the "must stay in sync" invariant between the two CI runners holds and the new 1.11 code paths are exercised by nightly CI. Review follow-ups deferred from part (2/3): - Assert the selected shim package, not just detectedVersion(): expose IcebergProvider.shimPackage() through IcebergProviderAccess and have iceberg_version_detection_test.py check it (e.g. 1.11.0 -> iceberg111x). A correct version paired with a wrong version->shim mapping now fails. - Add a Spark 4.1 version-detection smoke to pre-merge: ci_scala213() boots Spark 4.1 and runs the detection test so the iceberg111x packaging / shim-selection path is exercised before merge (the main suite runs on 4.0.1). - Narrow the run_iceberg_version_detect_tests() "must stay in sync" comment to reflect that the Spark 4.1 row is now covered in pre-merge. Signed-off-by: Chong Gao <res_life@163.com>
1 parent 808b4fb commit 085db63

6 files changed

Lines changed: 187 additions & 11 deletions

File tree

iceberg/iceberg-1-11-x/src/main/scala/com/nvidia/spark/rapids/iceberg/iceberg111x/IcebergProviderImpl.scala

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,48 @@
1616

1717
package com.nvidia.spark.rapids.iceberg.iceberg111x
1818

19+
import scala.reflect.ClassTag
20+
import scala.util.Try
21+
22+
import com.nvidia.spark.rapids.{GpuScan, ScanMeta, ScanRule, ShimReflectionUtils}
1923
import com.nvidia.spark.rapids.iceberg.IcebergProviderBase
24+
import org.apache.iceberg.spark.source.{GpuSparkIncrementalAppendScan, GpuSparkScan}
25+
26+
import org.apache.spark.sql.connector.read.Scan
27+
28+
class IcebergProviderImpl extends IcebergProviderBase {
29+
30+
/**
31+
* Adds a {@code SparkIncrementalAppendScan} rule on top of the base provider's two rules
32+
* ({@code SparkBatchQueryScan}, {@code SparkCopyOnWriteScan}). The incremental-append scan
33+
* is a 1.11-only class — before 1.11 the same query path went through
34+
* {@code SparkBatchQueryScan} and was matched by the base rule. The CPU class is loaded
35+
* by string here because it is package-private and not directly referenceable from
36+
* outside {@code org.apache.iceberg.spark.source}.
37+
*/
38+
override def getScans: Map[Class[_ <: Scan], ScanRule[_ <: Scan]] = {
39+
val cpuIncrementalAppendScanClass = ShimReflectionUtils.loadClass(
40+
"org.apache.iceberg.spark.source.SparkIncrementalAppendScan")
41+
42+
val incrementalRule = new ScanRule[Scan](
43+
(a, conf, p, r) => new ScanMeta[Scan](a, conf, p, r) {
44+
private lazy val convertedScan: Try[GpuSparkScan] = Try(
45+
GpuSparkIncrementalAppendScan.create(a, this.conf, false)
46+
.asInstanceOf[GpuSparkScan])
47+
48+
override def supportsRuntimeFilters: Boolean = true
49+
50+
override def tagSelfForGpu(): Unit = {
51+
GpuSparkScan.tagForGpu(this, convertedScan)
52+
}
53+
54+
override def convertToGpu(): GpuScan = convertedScan.get
55+
},
56+
"Iceberg incremental append scan",
57+
ClassTag(cpuIncrementalAppendScanClass)
58+
)
2059

21-
class IcebergProviderImpl extends IcebergProviderBase
60+
super.getScans + (
61+
cpuIncrementalAppendScanClass.asSubclass(classOf[Scan]) -> incrementalRule)
62+
}
63+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Copyright (c) 2026, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.iceberg.spark.source
18+
19+
import java.util.Objects
20+
21+
import scala.collection.JavaConverters._
22+
23+
import com.nvidia.spark.rapids.{GpuScan, RapidsConf}
24+
import org.apache.iceberg.expressions.Expression
25+
26+
import org.apache.spark.sql.connector.expressions.NamedReference
27+
import org.apache.spark.sql.connector.expressions.filter.Predicate
28+
import org.apache.spark.sql.connector.read.{Scan, Statistics, SupportsRuntimeV2Filtering}
29+
30+
/**
31+
* GPU wrapper for Iceberg 1.11's {@code SparkIncrementalAppendScan} (the
32+
* `.option("start-snapshot-id", ...)` / `.option("end-snapshot-id", ...)` read
33+
* path). This class was introduced in Iceberg 1.11 — before 1.11 the same
34+
* incremental-read path went through {@code SparkBatchQueryScan} (accelerated by
35+
* {@link GpuSparkBatchQueryScan}). It mirrors {@link GpuSparkBatchQueryScan}
36+
* because both CPU scans extend {@code SparkRuntimeFilterableScan} (a
37+
* {@code SparkPartitioningAwareScan<PartitionScanTask>}).
38+
*
39+
* <p>Takes the public {@code Scan} type and reaches Iceberg internals only through
40+
* the root-loadable {@link GpuSparkScanAccess} bridge and public Spark interfaces
41+
* ({@code SupportsRuntimeV2Filtering}). It must NOT reference the package-private
42+
* {@code SparkIncrementalAppendScan} directly: under {@code extraClassPath}
43+
* (system-classpath) the Iceberg classes load in the app classloader while this
44+
* shimmed class loads in Spark's MutableURLClassLoader, so any same-package access
45+
* would fail with {@code IllegalAccessError} (see issue #14959).
46+
*/
47+
class GpuSparkIncrementalAppendScan(
48+
override val cpuScan: Scan,
49+
override val rapidsConf: RapidsConf,
50+
override val queryUsesInputFile: Boolean) extends
51+
GpuSparkPartitioningAwareScan[org.apache.iceberg.PartitionScanTask](
52+
cpuScan, rapidsConf, queryUsesInputFile)
53+
with SupportsRuntimeV2Filtering {
54+
55+
private val runtimeFilterExpressions: List[Expression] =
56+
GpuSparkScanAccess.runtimeFilterExpressions(cpuScan)
57+
.asScala
58+
.toList
59+
60+
private def runtimeFilterScan: SupportsRuntimeV2Filtering =
61+
cpuScan.asInstanceOf[SupportsRuntimeV2Filtering]
62+
63+
override def filterAttributes(): Array[NamedReference] = runtimeFilterScan.filterAttributes()
64+
65+
override def filter(predicates: Array[Predicate]): Unit = runtimeFilterScan.filter(predicates)
66+
67+
override def estimateStatistics(): Statistics = GpuSparkScanAccess.estimateStatistics(cpuScan)
68+
69+
override def equals(obj: Any): Boolean = obj match {
70+
case that: GpuSparkIncrementalAppendScan =>
71+
this.cpuScan == that.cpuScan &&
72+
this.queryUsesInputFile == that.queryUsesInputFile
73+
case _ => false
74+
}
75+
76+
override def hashCode(): Int =
77+
Objects.hash(cpuScan, Boolean.box(queryUsesInputFile))
78+
79+
override def toString: String =
80+
s"GpuSparkIncrementalAppendScan(table=${GpuSparkScanAccess.table(cpuScan)}, " +
81+
s"branch=${GpuSparkScanAccess.branch(cpuScan)}, " +
82+
s"type=${GpuSparkScanAccess.expectedSchema(cpuScan).asStruct()}, " +
83+
s"filters=${GpuSparkScanAccess.filterExpressions(cpuScan)}, " +
84+
s"runtimeFilters=$runtimeFilterExpressions, " +
85+
s"caseSensitive=${GpuSparkScanAccess.caseSensitive(cpuScan)}, " +
86+
s"queryUseInputFile=$queryUsesInputFile)"
87+
88+
override def withInputFile(): GpuScan =
89+
new GpuSparkIncrementalAppendScan(cpuScan, rapidsConf, true)
90+
}
91+
92+
object GpuSparkIncrementalAppendScan {
93+
/** Java-callable factory used by {@code IcebergProviderImpl}. Takes the public
94+
* {@code Scan} type — never the package-private {@code SparkIncrementalAppendScan}. */
95+
def create(cpuScan: Scan, rapidsConf: RapidsConf, queryUsesInputFile: Boolean): GpuScan =
96+
new GpuSparkIncrementalAppendScan(cpuScan, rapidsConf, queryUsesInputFile)
97+
}

integration_tests/src/main/python/iceberg/iceberg_version_detection_test.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,24 @@ def test_iceberg_version_detection():
3232
if expected is None:
3333
pytest.skip("EXPECTED_ICEBERG_VERSION env var not set")
3434

35+
# Shim sub-package selected per Iceberg major.minor, e.g. 1.11.x -> iceberg111x.
36+
# Mirrors IcebergProbeImpl.icebergVersionToShim.
37+
major_minor = ".".join(expected.split(".")[:2])
38+
expected_shim_package = \
39+
"com.nvidia.spark.rapids.iceberg.iceberg{}x".format(major_minor.replace(".", ""))
40+
3541
def check(spark):
36-
jvm = spark.sparkContext._jvm
37-
actual = jvm.com.nvidia.spark.rapids.iceberg.IcebergProviderAccess.detectedVersion()
42+
access = spark.sparkContext._jvm.com.nvidia.spark.rapids.iceberg.IcebergProviderAccess
43+
actual = access.detectedVersion()
3844
assert actual == expected, \
3945
"Iceberg version detection mismatch: expected '{}' on Spark {}, got '{}'".format(
4046
expected, spark_version(), actual)
47+
# Assert the shim package too: a correct detectedVersion() paired with a wrong
48+
# version -> shim mapping (e.g. the 1.11 -> iceberg111x row) would otherwise
49+
# still pass.
50+
actual_shim_package = access.shimPackage()
51+
assert actual_shim_package == expected_shim_package, \
52+
"Iceberg shim package mismatch for {} on Spark {}: expected '{}', got '{}'".format(
53+
expected, spark_version(), expected_shim_package, actual_shim_package)
4154

4255
with_gpu_session(check)

jenkins/spark-premerge-build.sh

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,11 @@ run_iceberg_version_detect_tests() {
167167
return 0
168168
fi
169169

170-
# Supported Iceberg versions per Spark version — must stay in sync with
171-
# run_iceberg_tests() in spark-tests.sh. Note: the Spark 4.1 -> 1.11.0 row is
172-
# listed here, but ci_scala213() currently uses SPARK_VER=4.0.1 so the 4.1 branch
173-
# is not exercised in pre-merge CI. The full 4.1 integration suite is added in the
174-
# stacked follow-up PR; until then, the 1.11.0 commit-ID mapping is nightly-only.
170+
# Supported Iceberg versions per Spark version. The 3.5.x / 4.0.x rows mirror
171+
# run_iceberg_tests() in spark-tests.sh. The Spark 4.1 -> 1.11.0 row is exercised
172+
# in pre-merge by the dedicated Spark 4.1 version-detection smoke in ci_scala213()
173+
# (which boots Spark 4.1); the full Spark 4.1 Iceberg integration suite runs in
174+
# nightly via run_iceberg_tests().
175175
local iceberg_versions
176176
if [[ "$iceberg_spark_ver" == "4.1" ]]; then
177177
iceberg_versions="1.11.0"
@@ -269,6 +269,16 @@ ci_scala213() {
269269
# Moved out of spark-tests.sh DEFAULT mode where JDK 8 causes
270270
# UnsupportedClassVersionError for Iceberg 1.9+ runtime JARs.
271271
run_iceberg_version_detect_tests $SPARK_VER 2.13
272+
273+
# Spark 4.1 / Iceberg 1.11 shim-selection smoke. The main integration suite above
274+
# runs on Spark 4.0.1, so without this the iceberg111x module and the 1.11.0 ->
275+
# iceberg111x mapping added for Spark 4.1 would have no pre-merge coverage. Boot
276+
# Spark 4.1 and assert both the detected version and the selected shim package.
277+
local SPARK_VER_411=4.1.1
278+
local buildver_411="${SPARK_VER_411//./}"
279+
prepare_spark $SPARK_VER_411 2.13
280+
$MVN -f scala2.13/ -U -B $MVN_URM_MIRROR -Dbuildver=$buildver_411 clean package $MVN_BUILD_ARGS -DskipTests=true
281+
run_iceberg_version_detect_tests $SPARK_VER_411 2.13
272282
}
273283

274284
prepare_spark() {

jenkins/spark-tests.sh

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ run_iceberg_tests() {
268268
# get the patch version of Spark
269269
SPARK_PATCH_VER=$(echo "$SPARK_VER" | cut -d. -f3)
270270

271-
if [[ "$ICEBERG_SPARK_VER" != "3.5" && "$ICEBERG_SPARK_VER" != "4.0" ]]; then
271+
if [[ "$ICEBERG_SPARK_VER" != "3.5" && "$ICEBERG_SPARK_VER" != "4.0" \
272+
&& "$ICEBERG_SPARK_VER" != "4.1" ]]; then
272273
echo "!!!! Skipping Iceberg tests. GPU acceleration of Iceberg is not supported on $ICEBERG_SPARK_VER"
273274
return 0
274275
fi
@@ -277,8 +278,15 @@ run_iceberg_tests() {
277278
# Spark 3.5.0-3.5.3 -> Iceberg 1.6.1
278279
# Spark 3.5.4+ -> Iceberg 1.9.2, 1.10.1
279280
# Spark 4.0.x -> Iceberg 1.10.1
281+
# Spark 4.1.x -> Iceberg 1.11.0
280282
local supported_versions
281-
if [[ "$ICEBERG_SPARK_VER" == "4.0" ]]; then
283+
if [[ "$ICEBERG_SPARK_VER" == "4.1" ]]; then
284+
if [[ "$SCALA_BINARY_VER" != "2.13" ]]; then
285+
echo "!!!! Skipping Iceberg tests. Spark 4.1 Iceberg tests require Scala 2.13"
286+
return 0
287+
fi
288+
supported_versions="1.11.0"
289+
elif [[ "$ICEBERG_SPARK_VER" == "4.0" ]]; then
282290
if [[ "$SCALA_BINARY_VER" != "2.13" ]]; then
283291
echo "!!!! Skipping Iceberg tests. Spark 4.0 Iceberg tests require Scala 2.13"
284292
return 0
@@ -302,7 +310,9 @@ run_iceberg_tests() {
302310
echo "Using user-specified ICEBERG_VERSIONS=$ICEBERG_VERSIONS"
303311
else
304312
# Default: test one representative version per Spark patch range
305-
if [[ "$ICEBERG_SPARK_VER" == "4.0" ]]; then
313+
if [[ "$ICEBERG_SPARK_VER" == "4.1" ]]; then
314+
ICEBERG_VERSIONS="1.11.0"
315+
elif [[ "$ICEBERG_SPARK_VER" == "4.0" ]]; then
306316
ICEBERG_VERSIONS="1.10.1"
307317
elif [[ "$SPARK_PATCH_VER" -le 3 ]]; then
308318
ICEBERG_VERSIONS="1.6.1"

sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/IcebergProviderAccess.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,8 @@ private IcebergProviderAccess() {
2424
public static String detectedVersion() {
2525
return IcebergProvider$.MODULE$.detectedVersion();
2626
}
27+
28+
public static String shimPackage() {
29+
return IcebergProvider$.MODULE$.shimPackage();
30+
}
2731
}

0 commit comments

Comments
 (0)