Skip to content

Commit 0b46c1c

Browse files
res-lifeChong Gao
andauthored
Iceberg 1.11 support for Spark 411, part (1/3): extract version-divergent scan APIs behind a shim (#14881)
Stacked work for #14853 (1/3) — common-code preparation for adding iceberg-1-11-x. ### Depends on * #14866 Rebase on this PR after #14866 merged. ### Description Refactors `iceberg/common` so the `SparkScan` / `SparkCopyOnWriteScan` / `SparkBatch` / `DataWriteResult` APIs that diverge between Iceberg 1.10.x and 1.11.x are hidden behind a small interface, with per-Iceberg-version implementations in `iceberg-1-6-x` / `iceberg-1-9-x` / `iceberg-1-10-x`. **No behavior change for the Iceberg versions this PR ships**; sets the stage for the follow-up PR that adds `iceberg-1-11-x`. Common changes: - `GpuSparkCopyOnWriteScan` → renamed to `GpuSparkCopyOnWriteScanBase` (abstract). The runtime-filter trait + `filter` method live in a per-version concrete subclass (1.6/1.9/1.10 mix in `SupportsRuntimeFiltering` with `filter(Filter[])`; 1.11 will mix in `SupportsRuntimeV2Filtering` with `filter(Predicate[])`). - `GpuSparkScan`: rewrite `hasNestedType` via Spark's `readSchema()` + Spark types so it no longer depends on the 1.10-only `cpuScan.expectedSchema()`. Dispatch `SparkCopyOnWriteScan` construction through the new `ShimUtils.newCopyOnWriteScan` factory. - `GpuSparkBatchQueryScan.toString` uses `cpuScan.description()` (available in both 1.10 and 1.11) instead of `branch` / `expectedSchema` / `filterExpressions` (1.11 removed these). - `GpuSparkBatchQueryScan.runtimeFilterExpressions` reflective field-read tolerates both the 1.10 name (`runtimeFilterExpressions`) and the 1.11 name (`runtimeFilters`). - `GpuSparkBatch`: same tolerance for `expectedSchema` (1.10) vs `projection` (1.11). - `GpuSparkWrite`: type-annotate `new Array[DataFile](0)` so Scala 2.13 doesn't infer `Array[Nothing]` under 1.11's wildcarded `DataWriteResult.dataFiles()`. - `IcebergShimUtils` / `ShimUtils`: add `newCopyOnWriteScan(Scan, RapidsConf, Boolean): GpuScan` factory. The parameter is Spark's public `Scan` because Iceberg's `SparkCopyOnWriteScan` is package-private — cross-package callers cannot reference it directly. Per-Iceberg-version module changes (1.6 / 1.9 / 1.10, all identical for the V1 path): - New `GpuSparkCopyOnWriteScan` in `org.apache.iceberg.spark.source` (so it can reference the package-private `SparkCopyOnWriteScan`). Companion object exposes `create(Scan, ...): GpuScan` for cross-package callers. - `ShimUtilsImpl.java` implements `newCopyOnWriteScan` via `GpuSparkCopyOnWriteScan.create`. The two `try/catch` field-name fallbacks (in `GpuSparkBatchQueryScan` and `GpuSparkBatch`) are tactical and will be pushed behind proper per-version `IcebergShimUtils` methods in a later cleanup PR. ### Checklists Documentation - [ ] Updated for new or modified user-facing features or behaviors - [x] No user-facing change Testing - [ ] Added or modified tests to cover new code paths - [x] Covered by existing tests (3.5.x + 4.0.x iceberg integration tests in \`integration_tests/src/main/python/iceberg/\` — exercises the new dispatch path with no behavior change vs. before this PR.) - [ ] Not required Performance - [ ] Tests ran and results are added in the PR description - [ ] Issue filed with a link in the PR description - [x] Not required Signed-off-by: Chong Gao <res_life@163.com> Co-authored-by: Chong Gao <res_life@163.com>
1 parent 1ff1da3 commit 0b46c1c

11 files changed

Lines changed: 309 additions & 36 deletions

File tree

iceberg/common/src/main/java/com/nvidia/spark/rapids/iceberg/IcebergShimUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.nvidia.spark.rapids.GpuMetric;
2020
import com.nvidia.spark.rapids.NoopMetric$;
21+
import com.nvidia.spark.rapids.RapidsConf;
2122
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile;
2223
import org.apache.hadoop.fs.Path;
2324
import org.apache.iceberg.ContentFile;
@@ -28,6 +29,8 @@
2829
import org.apache.iceberg.parquet.GpuParquetIO;
2930
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
3031
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
32+
import org.apache.iceberg.spark.source.GpuSparkScan;
33+
import org.apache.spark.sql.connector.read.Scan;
3134
import scala.Option;
3235

3336
import java.io.IOException;
@@ -98,4 +101,18 @@ default ParquetFileReader openParquetReader(
98101
missCounter.$plus$eq(1L);
99102
return ParquetFileReader.open(GpuParquetIO.file(inputFile.getDelegate()), options);
100103
}
104+
105+
/**
106+
* Constructs the version-appropriate {@code GpuSparkCopyOnWriteScan} subclass.
107+
*
108+
* <p>Iceberg 1.6.x, 1.9.x, and 1.10.x have {@code SparkCopyOnWriteScan}
109+
* implementing {@code SupportsRuntimeFiltering} with {@code filter(Filter[])};
110+
* Iceberg 1.11.x switched to {@code SupportsRuntimeV2Filtering} with
111+
* {@code filter(Predicate[])}. The concrete class therefore differs per Iceberg
112+
* version and is constructed here rather than directly in common code.
113+
*/
114+
GpuSparkScan newCopyOnWriteScan(
115+
Scan cpuScan,
116+
RapidsConf rapidsConf,
117+
boolean queryUsesInputFile);
101118
}

iceberg/common/src/main/java/com/nvidia/spark/rapids/iceberg/ShimUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.nvidia.spark.rapids.iceberg;
1818

1919
import com.nvidia.spark.rapids.GpuMetric;
20+
import com.nvidia.spark.rapids.RapidsConf;
2021
import com.nvidia.spark.rapids.ShimLoader;
2122
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile;
2223

@@ -28,6 +29,8 @@
2829
import org.apache.iceberg.io.FileIO;
2930
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
3031
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
32+
import org.apache.iceberg.spark.source.GpuSparkScan;
33+
import org.apache.spark.sql.connector.read.Scan;
3134

3235
import java.io.IOException;
3336
import java.util.Map;
@@ -71,4 +74,11 @@ public static ParquetFileReader openParquetReader(
7174
scala.collection.immutable.Map<String, GpuMetric> metrics) throws IOException {
7275
return IMPL.openParquetReader(inputFile, filePath, options, metrics);
7376
}
77+
78+
public static GpuSparkScan newCopyOnWriteScan(
79+
Scan cpuScan,
80+
RapidsConf rapidsConf,
81+
boolean queryUsesInputFile) {
82+
return IMPL.newCopyOnWriteScan(cpuScan, rapidsConf, queryUsesInputFile);
83+
}
7484
}

iceberg/common/src/main/java/org/apache/iceberg/spark/source/GpuSparkScanAccess.java

Lines changed: 73 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
package org.apache.iceberg.spark.source;
1818

1919
import java.lang.reflect.Field;
20+
import java.lang.reflect.InvocationTargetException;
21+
import java.lang.reflect.Method;
22+
import java.util.Collections;
2023
import java.util.List;
2124

2225
import org.apache.iceberg.BaseMetadataTable;
@@ -60,31 +63,40 @@ public static boolean isMetadataScan(Scan scan) {
6063
}
6164

6265
public static SparkReadConf readConf(Scan scan) {
63-
return readField(sparkScan(scan), "readConf", SparkReadConf.class);
66+
return readField(sparkScan(scan), SparkReadConf.class, "readConf");
6467
}
6568

6669
public static Table table(Scan scan) {
6770
return sparkScan(scan).table();
6871
}
6972

7073
public static String branch(Scan scan) {
71-
return sparkScan(scan).branch();
74+
// Iceberg 1.10.x and earlier: protected method SparkScan.branch(). Iceberg 1.11.x
75+
// removed it entirely; return null for display purposes.
76+
return invokeMethod(sparkScan(scan), String.class, "branch");
7277
}
7378

7479
public static boolean caseSensitive(Scan scan) {
7580
return sparkScan(scan).caseSensitive();
7681
}
7782

7883
public static Schema expectedSchema(Scan scan) {
79-
return sparkScan(scan).expectedSchema();
84+
// Iceberg 1.10.x: SparkScan.expectedSchema(); Iceberg 1.11.x renamed it to
85+
// SparkScan.projection().
86+
return invokeMethod(sparkScan(scan), Schema.class, "expectedSchema", "projection");
8087
}
8188

8289
public static Statistics estimateStatistics(Scan scan) {
8390
return sparkScan(scan).estimateStatistics();
8491
}
8592

93+
@SuppressWarnings("unchecked")
8694
public static List<Expression> filterExpressions(Scan scan) {
87-
return sparkScan(scan).filterExpressions();
95+
// Iceberg 1.10.x: SparkScan.filterExpressions(); Iceberg 1.11.x renamed it to
96+
// SparkScan.filters().
97+
List<Expression> r = (List<Expression>)
98+
invokeMethod(sparkScan(scan), List.class, "filterExpressions", "filters");
99+
return r != null ? r : Collections.emptyList();
88100
}
89101

90102
public static Types.StructType groupingKeyType(Scan scan) {
@@ -101,11 +113,17 @@ public static Batch toBatch(Scan scan) {
101113

102114
@SuppressWarnings("unchecked")
103115
public static List<Expression> runtimeFilterExpressions(Scan scan) {
104-
return (List<Expression>) readField(scan, "runtimeFilterExpressions", List.class);
116+
// Iceberg 1.6.x / 1.9.x / 1.10.x: field "runtimeFilterExpressions" on
117+
// SparkBatchQueryScan. Iceberg 1.11.x: field "runtimeFilters" on the new parent
118+
// class SparkRuntimeFilterableScan.
119+
return (List<Expression>) readField(scan, List.class,
120+
"runtimeFilterExpressions", "runtimeFilters");
105121
}
106122

107123
public static Schema expectedSchema(Batch batch) {
108-
return readField(batch, "expectedSchema", Schema.class);
124+
// Iceberg 1.10.x: field "expectedSchema" on SparkBatch.
125+
// Iceberg 1.11.x: renamed to "projection".
126+
return readField(batch, Schema.class, "expectedSchema", "projection");
109127
}
110128

111129
public static Table table(InputPartition partition) {
@@ -128,26 +146,66 @@ private static SparkInputPartition sparkInputPartition(InputPartition partition)
128146
return (SparkInputPartition) partition;
129147
}
130148

131-
private static <T> T readField(Object target, String fieldName, Class<T> fieldType) {
149+
/**
150+
* Read the first existing field from {@code fieldNames} on {@code target} (or its
151+
* superclasses). Used when a field was renamed across Iceberg versions — list the
152+
* candidate names in priority order. Throws if none of the names exist.
153+
*/
154+
private static <T> T readField(Object target, Class<T> fieldType, String... fieldNames) {
155+
Field field = findField(target.getClass(), fieldNames);
156+
if (field == null) {
157+
throw new IllegalStateException(
158+
"None of fields " + String.join(",", fieldNames)
159+
+ " exist on " + target.getClass().getName());
160+
}
132161
try {
133-
Field field = findField(target.getClass(), fieldName);
134162
field.setAccessible(true);
135163
return fieldType.cast(field.get(target));
136164
} catch (IllegalAccessException e) {
137165
throw new IllegalStateException(
138-
"Unable to read " + fieldName + " from " + target.getClass().getName(), e);
166+
"Unable to read " + field.getName() + " from " + target.getClass().getName(), e);
139167
}
140168
}
141169

142-
private static Field findField(Class<?> targetClass, String fieldName) {
170+
private static Field findField(Class<?> targetClass, String... fieldNames) {
143171
Class<?> current = targetClass;
144172
while (current != null) {
145-
try {
146-
return current.getDeclaredField(fieldName);
147-
} catch (NoSuchFieldException e) {
148-
current = current.getSuperclass();
173+
for (String fieldName : fieldNames) {
174+
try {
175+
return current.getDeclaredField(fieldName);
176+
} catch (NoSuchFieldException ignore) {
177+
// try the next name (or the superclass)
178+
}
179+
}
180+
current = current.getSuperclass();
181+
}
182+
return null;
183+
}
184+
185+
/**
186+
* Invoke the first existing zero-arg method from {@code methodNames} on
187+
* {@code target} (or its superclasses). Used when a protected method was renamed or
188+
* removed across Iceberg versions — list the candidate names in priority order.
189+
* Returns {@code null} if none of the names exist (caller decides the fallback).
190+
*/
191+
private static <T> T invokeMethod(Object target, Class<T> returnType, String... methodNames) {
192+
Class<?> current = target.getClass();
193+
while (current != null) {
194+
for (String methodName : methodNames) {
195+
try {
196+
Method m = current.getDeclaredMethod(methodName);
197+
m.setAccessible(true);
198+
Object result = m.invoke(target);
199+
return result == null ? null : returnType.cast(result);
200+
} catch (NoSuchMethodException ignore) {
201+
// try the next name (or the superclass)
202+
} catch (IllegalAccessException | InvocationTargetException e) {
203+
throw new IllegalStateException(
204+
"Unable to invoke " + methodName + " on " + target.getClass().getName(), e);
205+
}
149206
}
207+
current = current.getSuperclass();
150208
}
151-
throw new IllegalStateException("No field " + fieldName + " in " + targetClass.getName());
209+
return null;
152210
}
153211
}

iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScan.scala renamed to iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkCopyOnWriteScanBase.scala

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,32 @@ package org.apache.iceberg.spark.source
1818

1919
import java.util.Objects
2020

21-
import com.nvidia.spark.rapids.{GpuScan, RapidsConf}
21+
import com.nvidia.spark.rapids.RapidsConf
2222
import org.apache.iceberg.FileScanTask
2323

24-
import org.apache.spark.sql.connector.expressions.NamedReference
25-
import org.apache.spark.sql.connector.read.{Scan, Statistics, SupportsRuntimeFiltering}
26-
import org.apache.spark.sql.sources.Filter
24+
import org.apache.spark.sql.connector.read.{Scan, Statistics}
2725

28-
class GpuSparkCopyOnWriteScan(
26+
/**
27+
* Version-agnostic base for the GPU copy-on-write scan. Iceberg 1.6.x, 1.9.x,
28+
* and 1.10.x have {@code SparkCopyOnWriteScan} implementing
29+
* {@code SupportsRuntimeFiltering} with {@code filter(Filter[])}; Iceberg 1.11.x
30+
* switched to {@code SupportsRuntimeV2Filtering} with {@code filter(Predicate[])}.
31+
* The per-Iceberg-version concrete subclass lives in {@code iceberg-1-6-x} /
32+
* {@code iceberg-1-9-x} / {@code iceberg-1-10-x} (and {@code iceberg-1-11-x}
33+
* once it lands) and mixes in the matching Spark runtime-filter trait + delegates
34+
* {@code filter} to the matching Iceberg API.
35+
*/
36+
abstract class GpuSparkCopyOnWriteScanBase(
2937
override val cpuScan: Scan,
3038
override val rapidsConf: RapidsConf,
3139
override val queryUsesInputFile: Boolean) extends
32-
GpuSparkPartitioningAwareScan[FileScanTask](cpuScan, rapidsConf, queryUsesInputFile)
33-
with SupportsRuntimeFiltering {
34-
35-
private def runtimeFilterScan: SupportsRuntimeFiltering =
36-
cpuScan.asInstanceOf[SupportsRuntimeFiltering]
37-
38-
override def filterAttributes(): Array[NamedReference] = runtimeFilterScan.filterAttributes()
40+
GpuSparkPartitioningAwareScan[FileScanTask](cpuScan, rapidsConf, queryUsesInputFile) {
3941

4042
override def estimateStatistics(): Statistics = GpuSparkScanAccess.estimateStatistics(cpuScan)
4143

4244
override def equals(obj: Any): Boolean = {
4345
obj match {
44-
case that: GpuSparkCopyOnWriteScan =>
46+
case that: GpuSparkCopyOnWriteScanBase =>
4547
this.cpuScan == that.cpuScan &&
4648
this.queryUsesInputFile == that.queryUsesInputFile
4749
case _ => false
@@ -60,11 +62,4 @@ class GpuSparkCopyOnWriteScan(
6062
s"caseSensitive=${GpuSparkScanAccess.caseSensitive(cpuScan)}, " +
6163
s"queryUseInputFile=$queryUsesInputFile)"
6264
}
63-
64-
/** Create a version of this scan with input file name support */
65-
override def withInputFile(): GpuScan = {
66-
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, true)
67-
}
68-
69-
override def filter(filters: Array[Filter]): Unit = runtimeFilterScan.filter(filters)
7065
}

iceberg/common/src/main/scala/org/apache/iceberg/spark/source/GpuSparkScan.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
2020
import scala.util.{Failure, Success, Try}
2121

2222
import com.nvidia.spark.rapids._
23+
import com.nvidia.spark.rapids.iceberg.ShimUtils
2324
import org.apache.iceberg.ScanTaskGroup
2425
import org.apache.iceberg.spark.GpuSparkReadConf
2526
import org.apache.iceberg.types.Types
@@ -81,7 +82,7 @@ object GpuSparkScan {
8182
if (GpuSparkScanAccess.isBatchQueryScan(cpuScan)) {
8283
new GpuSparkBatchQueryScan(cpuScan, rapidsConf, false)
8384
} else if (GpuSparkScanAccess.isCopyOnWriteScan(cpuScan)) {
84-
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, false)
85+
ShimUtils.newCopyOnWriteScan(cpuScan, rapidsConf, false)
8586
} else {
8687
throw new IllegalArgumentException(
8788
s"Currently iceberg support only supports batch query scan and copy-on-write scan, " +

iceberg/iceberg-1-10-x/src/main/java/com/nvidia/spark/rapids/iceberg/iceberg110x/ShimUtilsImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.nvidia.spark.rapids.iceberg.iceberg110x;
1818

1919
import com.nvidia.spark.rapids.GpuMetric;
20+
import com.nvidia.spark.rapids.RapidsConf;
2021
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile;
2122
import com.nvidia.spark.rapids.iceberg.IcebergShimUtils;
2223
import org.apache.hadoop.fs.Path;
@@ -27,8 +28,11 @@
2728
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
2829
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
2930
import org.apache.iceberg.spark.SparkUtil;
31+
import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan;
32+
import org.apache.iceberg.spark.source.GpuSparkScan;
3033
import org.apache.iceberg.types.Types;
3134
import org.apache.iceberg.util.PartitionUtil;
35+
import org.apache.spark.sql.connector.read.Scan;
3236

3337
import java.io.IOException;
3438
import java.util.Collections;
@@ -74,4 +78,12 @@ public ParquetFileReader openParquetReader(
7478
scala.collection.immutable.Map<String, GpuMetric> metrics) throws IOException {
7579
return GpuParquetIOShim.openReader(inputFile, filePath, options, metrics);
7680
}
81+
82+
@Override
83+
public GpuSparkScan newCopyOnWriteScan(
84+
Scan cpuScan,
85+
RapidsConf rapidsConf,
86+
boolean queryUsesInputFile) {
87+
return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile);
88+
}
7789
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (c) 2026, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.iceberg.spark.source
18+
19+
import com.nvidia.spark.rapids.{GpuScan, RapidsConf}
20+
21+
import org.apache.spark.sql.connector.expressions.NamedReference
22+
import org.apache.spark.sql.connector.read.{Scan, SupportsRuntimeFiltering}
23+
import org.apache.spark.sql.sources.Filter
24+
25+
/**
26+
* Iceberg 1.10.x copy-on-write scan: {@code SupportsRuntimeFiltering} with
27+
* {@code filter(Array[Filter])}.
28+
*/
29+
class GpuSparkCopyOnWriteScan(
30+
cpuScanArg: Scan,
31+
rapidsConfArg: RapidsConf,
32+
queryUsesInputFileArg: Boolean)
33+
extends GpuSparkCopyOnWriteScanBase(cpuScanArg, rapidsConfArg, queryUsesInputFileArg)
34+
with SupportsRuntimeFiltering {
35+
36+
private def runtimeFilterScan: SupportsRuntimeFiltering =
37+
cpuScan.asInstanceOf[SupportsRuntimeFiltering]
38+
39+
override def filterAttributes(): Array[NamedReference] = runtimeFilterScan.filterAttributes()
40+
41+
override def filter(filters: Array[Filter]): Unit = runtimeFilterScan.filter(filters)
42+
43+
override def withInputFile(): GpuScan =
44+
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, true)
45+
}
46+
47+
object GpuSparkCopyOnWriteScan {
48+
/** Java-callable factory used by {@code ShimUtilsImpl.newCopyOnWriteScan}. */
49+
def create(cpuScan: Scan, rapidsConf: RapidsConf, queryUsesInputFile: Boolean)
50+
: GpuSparkScan =
51+
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, queryUsesInputFile)
52+
}

0 commit comments

Comments
 (0)