Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@

import com.nvidia.spark.rapids.GpuMetric;
import com.nvidia.spark.rapids.NoopMetric$;
import com.nvidia.spark.rapids.RapidsConf;
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.parquet.GpuParquetIO;
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.iceberg.spark.source.GpuSparkScan;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.Scan;
import scala.Option;

import java.io.IOException;
Expand Down Expand Up @@ -98,4 +103,48 @@ default ParquetFileReader openParquetReader(
missCounter.$plus$eq(1L);
return ParquetFileReader.open(GpuParquetIO.file(inputFile.getDelegate()), options);
}

/**
* Constructs the version-appropriate {@code GpuSparkCopyOnWriteScan} subclass.
*
* <p>Iceberg 1.6.x, 1.9.x, and 1.10.x have {@code SparkCopyOnWriteScan} implementing
* {@code SupportsRuntimeFiltering} with {@code filter(Filter[])}; Iceberg 1.11.x
* switched to {@code SupportsRuntimeV2Filtering} with {@code filter(Predicate[])}.
* The concrete class therefore differs per Iceberg version and is constructed
* here rather than directly in common code.
*
* <p>The parameter is declared as the public {@code Scan} interface because
* Iceberg's {@code SparkCopyOnWriteScan} is package-private — callers outside
* {@code org.apache.iceberg.spark.source} cannot reference it directly. Each
* impl downcasts inside a helper that lives in the right package.
*/
GpuSparkScan newCopyOnWriteScan(
Scan cpuScan,
RapidsConf rapidsConf,
boolean queryUsesInputFile);

/**
* Reads the {@code runtimeFilterExpressions} list off a {@code SparkBatchQueryScan}.
* The list lives on different classes per Iceberg version:
* <ul>
* <li>1.6.x / 1.9.x / 1.10.x: field {@code runtimeFilterExpressions} on
* {@code SparkBatchQueryScan}.</li>
* <li>1.11.x: field {@code runtimeFilters} on the new parent class
* {@code SparkRuntimeFilterableScan}.</li>
* </ul>
*
* <p>Hidden behind the shim so common code does not have to probe field names.
* Returns an empty list if reflection fails.
*/
java.util.List<Expression> runtimeFiltersOf(Scan cpuScan);

/**
* Reads the expected/projection Iceberg {@link Schema} off a {@code SparkBatch}.
* The field name differs per Iceberg version:
* <ul>
* <li>1.6.x / 1.9.x / 1.10.x: {@code expectedSchema}.</li>
* <li>1.11.x: {@code projection}.</li>
* </ul>
*/
Schema batchProjectionOf(Batch cpuBatch);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.nvidia.spark.rapids.iceberg;

import com.nvidia.spark.rapids.GpuMetric;
import com.nvidia.spark.rapids.RapidsConf;
import com.nvidia.spark.rapids.ShimLoader;
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile;

Expand All @@ -25,11 +26,16 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.iceberg.spark.source.GpuSparkScan;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.Scan;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -71,4 +77,19 @@ public static ParquetFileReader openParquetReader(
scala.collection.immutable.Map<String, GpuMetric> metrics) throws IOException {
return IMPL.openParquetReader(inputFile, filePath, options, metrics);
}

public static GpuSparkScan newCopyOnWriteScan(
Scan cpuScan,
RapidsConf rapidsConf,
boolean queryUsesInputFile) {
return IMPL.newCopyOnWriteScan(cpuScan, rapidsConf, queryUsesInputFile);
}

public static List<Expression> runtimeFiltersOf(Scan cpuScan) {
return IMPL.runtimeFiltersOf(cpuScan);
}

public static Schema batchProjectionOf(Batch cpuBatch) {
return IMPL.batchProjectionOf(cpuBatch);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,8 @@ package org.apache.iceberg.spark.source

import java.util.Objects

import org.apache.hadoop.shaded.org.apache.commons.lang3.reflect.FieldUtils
import org.apache.iceberg.{Schema, SchemaParser}
import com.nvidia.spark.rapids.iceberg.ShimUtils
import org.apache.iceberg.SchemaParser

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory}
Expand All @@ -37,8 +37,7 @@ class GpuSparkBatch(
}

override def planInputPartitions(): Array[InputPartition] = {
val expectedSchema = FieldUtils.readField(cpuBatch, "expectedSchema", true)
.asInstanceOf[Schema]
val expectedSchema = ShimUtils.batchProjectionOf(cpuBatch)
val expectedSchemaString = SchemaParser.toJson(expectedSchema)

val sparkContext = SparkSession.getActiveSession.get.sparkContext
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@ import java.util.Objects
import scala.collection.JavaConverters._

import com.nvidia.spark.rapids.{GpuScan, RapidsConf}
import org.apache.hadoop.shaded.org.apache.commons.lang3.reflect.FieldUtils
import com.nvidia.spark.rapids.iceberg.ShimUtils
import org.apache.iceberg.PartitionScanTask
import org.apache.iceberg.expressions.Expression

Expand All @@ -36,11 +36,8 @@ class GpuSparkBatchQueryScan(
GpuSparkPartitioningAwareScan[PartitionScanTask](cpuScan, rapidsConf, queryUsesInputFile)
with SupportsRuntimeV2Filtering {

private val runtimeFilterExpressions: List[Expression] = FieldUtils.readField(
cpuScan, "runtimeFilterExpressions", true)
.asInstanceOf[java.util.List[Expression]]
.asScala
.toList
private val runtimeFilterExpressions: List[Expression] =
ShimUtils.runtimeFiltersOf(cpuScan).asScala.toList

override def filterAttributes(): Array[NamedReference] = cpuScan.filterAttributes()

Expand All @@ -62,13 +59,9 @@ class GpuSparkBatchQueryScan(
}

override def toString: String = {
s"GpuSparkBatchQueryScan(table=${cpuScan.table()}, )" +
s"branch=${cpuScan.branch()}, " +
s"type=${cpuScan.expectedSchema().asStruct()}, " +
s"filters=${cpuScan.filterExpressions()}, " +
s"GpuSparkBatchQueryScan(${cpuScan.description()}, " +
s"runtimeFilters=$runtimeFilterExpressions, " +
s"caseSensitive=${cpuScan.caseSensitive()}, " +
s"queryUseInputFile=$queryUsesInputFile"
s"queryUseInputFile=$queryUsesInputFile)"
}

/** Create a version of this scan with input file name support */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,27 +18,35 @@ package org.apache.iceberg.spark.source

import java.util.Objects

import com.nvidia.spark.rapids.{GpuScan, RapidsConf}
import com.nvidia.spark.rapids.RapidsConf
import org.apache.iceberg.FileScanTask

import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.read.{Statistics, SupportsRuntimeFiltering}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.connector.read.Statistics

class GpuSparkCopyOnWriteScan(
/**
* Version-agnostic base for the GPU copy-on-write scan. Iceberg 1.6.x, 1.9.x,
* and 1.10.x have {@code SparkCopyOnWriteScan implements SupportsRuntimeFiltering}
* with {@code filter(Filter[])}; Iceberg 1.11.x switched to
* {@code SupportsRuntimeV2Filtering} with {@code filter(Predicate[])}. The
* per-version concrete subclass lives in {@code iceberg-1-6-x} / {@code iceberg-1-9-x}
* / {@code iceberg-1-10-x} (and {@code iceberg-1-11-x} once it lands) and mixes
* in the matching Spark runtime-filter trait + delegates {@code filter}
* to the matching Iceberg API.
*/
abstract class GpuSparkCopyOnWriteScanBase(
override val cpuScan: SparkCopyOnWriteScan,
override val rapidsConf: RapidsConf,
override val queryUsesInputFile: Boolean) extends
GpuSparkPartitioningAwareScan[FileScanTask](cpuScan, rapidsConf, queryUsesInputFile)
with SupportsRuntimeFiltering {
GpuSparkPartitioningAwareScan[FileScanTask](cpuScan, rapidsConf, queryUsesInputFile) {

override def filterAttributes(): Array[NamedReference] = cpuScan.filterAttributes()
def filterAttributes(): Array[NamedReference] = cpuScan.filterAttributes()

override def estimateStatistics(): Statistics = cpuScan.estimateStatistics()

override def equals(obj: Any): Boolean = {
obj match {
case that: GpuSparkCopyOnWriteScan =>
case that: GpuSparkCopyOnWriteScanBase =>
this.cpuScan == that.cpuScan &&
this.queryUsesInputFile == that.queryUsesInputFile
case _ => false
Expand All @@ -50,18 +58,7 @@ class GpuSparkCopyOnWriteScan(
}

override def toString: String = {
s"GpuSparkCopyOnWriteScan(table=${cpuScan.table()}, " +
s"branch=${cpuScan.branch()}, " +
s"type=${cpuScan.expectedSchema().asStruct()}, " +
s"filters=${cpuScan.filterExpressions()}, " +
s"caseSensitive=${cpuScan.caseSensitive()}, " +
s"GpuSparkCopyOnWriteScan(${cpuScan.description()}, " +
s"queryUseInputFile=$queryUsesInputFile)"
}

/** Create a version of this scan with input file name support */
override def withInputFile(): GpuScan = {
new GpuSparkCopyOnWriteScan(cpuScan, rapidsConf, true)
}

override def filter(filters: Array[Filter]): Unit = cpuScan.filter(filters)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,10 @@

package org.apache.iceberg.spark.source

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.iceberg.ShimUtils
import org.apache.hadoop.shaded.org.apache.commons.lang3.reflect.FieldUtils
import org.apache.iceberg.{BaseMetadataTable, ScanTaskGroup}
import org.apache.iceberg.spark.{GpuSparkReadConf, SparkReadConf}
Expand All @@ -28,7 +28,7 @@ import org.apache.iceberg.types.Types
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{ArrayType, MapType, StructType}


abstract class GpuSparkScan(val cpuScan: SparkScan,
Expand Down Expand Up @@ -65,11 +65,12 @@ abstract class GpuSparkScan(val cpuScan: SparkScan,
protected def taskGroups(): Seq[_ <: ScanTaskGroup[_]]

def hasNestedType: Boolean = {
cpuScan.expectedSchema()
.asStruct()
.fields()
.asScala
.exists { field => field.`type`().isNestedType }
cpuScan.readSchema().fields.exists { field =>
field.dataType match {
case _: StructType | _: ArrayType | _: MapType => true
case _ => false
}
}
}
}

Expand All @@ -85,7 +86,7 @@ object GpuSparkScan {
case icebergScan: SparkBatchQueryScan =>
new GpuSparkBatchQueryScan(icebergScan, rapidsConf, false)
case s: SparkCopyOnWriteScan =>
new GpuSparkCopyOnWriteScan(s, rapidsConf, false)
ShimUtils.newCopyOnWriteScan(s, rapidsConf, false)
case _ =>
throw new IllegalArgumentException(
s"Currently iceberg support only supports batch query scan and copy-on-write scan, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ class GpuUnpartitionedDataWriter(
close()

val result = delegate.result()
val taskCommit = new TaskCommit(result.dataFiles().toArray(new Array(0)))
val taskCommit = new TaskCommit(result.dataFiles().toArray(new Array[DataFile](0)))
taskCommit.reportOutputMetrics()
taskCommit
}
Expand Down Expand Up @@ -441,7 +441,7 @@ class GpuPartitionedDataWriter(
close()

val result = delegate.result()
val taskCommit = new TaskCommit(result.dataFiles().toArray(new Array(0)))
val taskCommit = new TaskCommit(result.dataFiles().toArray(new Array[DataFile](0)))
taskCommit.reportOutputMetrics()
taskCommit
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,30 @@
package com.nvidia.spark.rapids.iceberg.iceberg110x;

import com.nvidia.spark.rapids.GpuMetric;
import com.nvidia.spark.rapids.RapidsConf;
import com.nvidia.spark.rapids.fileio.iceberg.IcebergInputFile;
import com.nvidia.spark.rapids.iceberg.IcebergShimUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.shaded.org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.iceberg.*;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.StorageCredential;
import org.apache.iceberg.io.SupportsStorageCredentials;
import org.apache.iceberg.shaded.org.apache.parquet.ParquetReadOptions;
import org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.spark.source.GpuSparkCopyOnWriteScan;
import org.apache.iceberg.spark.source.GpuSparkScan;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.Scan;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Iceberg 1.10.x shim: uses {@code SparkUtil::internalToSpark} and a cache-aware footer path. */
Expand Down Expand Up @@ -74,4 +82,32 @@ public ParquetFileReader openParquetReader(
scala.collection.immutable.Map<String, GpuMetric> metrics) throws IOException {
return GpuParquetIOShim.openReader(inputFile, filePath, options, metrics);
}

@Override
public GpuSparkScan newCopyOnWriteScan(
Scan cpuScan,
RapidsConf rapidsConf,
boolean queryUsesInputFile) {
return GpuSparkCopyOnWriteScan.create(cpuScan, rapidsConf, queryUsesInputFile);
}

@Override
@SuppressWarnings("unchecked")
public List<Expression> runtimeFiltersOf(Scan cpuScan) {
try {
return (List<Expression>) FieldUtils.readField(
cpuScan, "runtimeFilterExpressions", true);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}

@Override
public Schema batchProjectionOf(Batch cpuBatch) {
try {
return (Schema) FieldUtils.readField(cpuBatch, "expectedSchema", true);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}
Loading