From dba632363a24effa8b535ec0f72fd22bca5f351f Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 25 Sep 2025 17:47:45 +0800 Subject: [PATCH 01/15] Partial --- .../rapids/fileio/iceberg/IcebergFileIO.java | 10 +++ .../fileio/iceberg/IcebergOutputFile.java | 48 ++++++++++++ .../fileio/iceberg/IcebergOutputStream.java | 66 +++++++++++++++++ .../rapids/fileio/hadoop/HadoopFileIO.java | 12 ++- .../fileio/hadoop/HadoopOutputFile.java | 63 ++++++++++++++++ .../fileio/hadoop/HadoopOutputStream.java | 73 +++++++++++++++++++ .../spark/rapids/ColumnarOutputWriter.scala | 20 ++--- .../spark/rapids/GpuParquetFileFormat.scala | 10 +-- 8 files changed, 285 insertions(+), 17 deletions(-) create mode 100644 iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputFile.java create mode 100644 iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputStream.java create mode 100644 sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputFile.java create mode 100644 sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java diff --git a/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java b/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java index 39c6b325133..953ad10b71d 100644 --- a/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java +++ b/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java @@ -18,6 +18,7 @@ import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO; import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile; +import com.nvidia.spark.rapids.jni.fileio.RapidsOutputFile; import org.apache.iceberg.io.FileIO; import java.io.IOException; @@ -39,6 +40,10 @@ public class IcebergFileIO implements RapidsFileIO { * that the delegate is closed when no longer used, e.g., iceberg table/catalog close. */ public IcebergFileIO(FileIO delegate) { + this(delegate, null); + } + + public IcebergFileIO(FileIO delegate, LocationProvider locationProvider) { Objects.requireNonNull(delegate, "delegate can't be null"); this.delegate = delegate; } @@ -48,4 +53,9 @@ public IcebergFileIO(FileIO delegate) { public IcebergInputFile newInputFile(String path) throws IOException { return new IcebergInputFile(delegate.newInputFile(path)); } + + @Override + public IcebergOutputFile newOutputFile(String path) throws IOException { + return new IcebergOutputFile(delegate.newOutputFile(path)); + } } diff --git a/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputFile.java b/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputFile.java new file mode 100644 index 00000000000..56dda3284e7 --- /dev/null +++ b/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputFile.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.fileio.iceberg; + +import com.nvidia.spark.rapids.jni.fileio.RapidsOutputFile; +import org.apache.iceberg.io.OutputFile; + +import java.io.IOException; +import java.util.Objects; + +/** + * Implementation of {@link RapidsOutputFile} using Iceberg {@link OutputFile}. + */ +public class IcebergOutputFile implements RapidsOutputFile { + private final OutputFile delegate; + + public IcebergOutputFile(OutputFile delegate) { + Objects.requireNonNull(delegate, "delegate can't be null"); + this.delegate = delegate; + } + + @Override + public IcebergOutputStream create(boolean overwrite) throws IOException { + if (overwrite) { + return new IcebergOutputStream(delegate.createOrOverwrite()); + } + return new IcebergOutputStream(delegate.create()); + } + + @Override + public String getPath() { + return delegate.location(); + } +} diff --git a/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputStream.java b/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputStream.java new file mode 100644 index 00000000000..f366b0637af --- /dev/null +++ b/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputStream.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.fileio.iceberg; + +import com.nvidia.spark.rapids.jni.fileio.RapidsOutputStream; +import org.apache.iceberg.io.PositionOutputStream; + +import java.io.IOException; +import java.io.OutputStream; + +import static java.util.Objects.requireNonNull; + +/** + * A {@link RapidsOutputStream} implementation that wraps an Iceberg {@link PositionOutputStream}. + */ +public class IcebergOutputStream extends RapidsOutputStream { + private final PositionOutputStream out; + private boolean closed; + + public IcebergOutputStream(PositionOutputStream out) { + this.out = requireNonNull(out, "out can't be null"); + this.closed = false; + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void sync() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + if (!closed) { + out.close(); + closed = true; + } + } +} diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopFileIO.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopFileIO.java index 870b6098b4b..da45700ba31 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopFileIO.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopFileIO.java @@ -18,7 +18,9 @@ import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO; import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile; +import com.nvidia.spark.rapids.jni.fileio.RapidsOutputFile; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.util.SerializableConfiguration; @@ -38,12 +40,18 @@ public HadoopFileIO(Configuration hadoopConf) { } @Override - public RapidsInputFile newInputFile(String path) throws IOException { + public HadoopInputFile newInputFile(String path) throws IOException { return this.newInputFile(new Path(path)); } @Override - public RapidsInputFile newInputFile(Path path) throws IOException { + public HadoopInputFile newInputFile(Path path) throws IOException { return HadoopInputFile.create(path, hadoopConf.value()); } + + @Override + public HadoopOutputFile newOutputFile(String path) throws IOException { + Objects.requireNonNull(path, "path can't be null"); + return HadoopOutputFile.create(new Path(path), hadoopConf.value()); + } } diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputFile.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputFile.java new file mode 100644 index 00000000000..3ed1146eefa --- /dev/null +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputFile.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.fileio.hadoop; + +import com.nvidia.spark.rapids.jni.fileio.RapidsOutputFile; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Implementation of {@link RapidsOutputFile} using the Hadoop file system. + *
+ * This class provides methods to create an output file and to obtain the absolute path. + */ +public class HadoopOutputFile implements RapidsOutputFile { + private final Path filePath; + private final FileSystem fs; + + public static HadoopOutputFile create(Path filePath, Configuration conf) + throws IOException { + Objects.requireNonNull(filePath, "filePath can't be null"); + Objects.requireNonNull(conf, "Hadoop conf can't be null"); + FileSystem fs = filePath.getFileSystem(conf); + return new HadoopOutputFile(filePath, fs); + } + + private HadoopOutputFile(Path filePath, FileSystem fs) { + Objects.requireNonNull(filePath, "filePath can't be null"); + Objects.requireNonNull(fs, "FileSystem can't be null"); + this.filePath = filePath; + this.fs = fs; + } + + @Override + public HadoopOutputStream create(boolean overwrite) throws IOException { + FSDataOutputStream output = fs.create(filePath, overwrite); + return new HadoopOutputStream(output); + } + + @Override + public String getPath() { + return filePath.toString(); + } +} diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java new file mode 100644 index 00000000000..768ad58fc5f --- /dev/null +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.fileio.hadoop; + +import com.nvidia.spark.rapids.jni.fileio.RapidsOutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +import java.io.IOException; +import java.io.OutputStream; + +import static java.util.Objects.requireNonNull; + +/** + * A {@link RapidsOutputStream} implementation that wraps a Hadoop {@link FSDataOutputStream}. + *
+ * This class delegates to the underlying output stream for write and close operations. + */ +public class HadoopOutputStream extends RapidsOutputStream { + private final FSDataOutputStream out; + private boolean closed; + + public HadoopOutputStream(FSDataOutputStream out) { + this.out = requireNonNull(out, "out can't be null"); + this.closed = false; + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void sync() throws IOException { + out.hsync(); + } + + @Override + public OutputStream getWrappedStream() { + return out; + } + + @Override + public void close() throws IOException { + if (!closed) { + out.close(); + closed = true; + } + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala index acd6e2d61bf..e281f1c35de 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala @@ -26,6 +26,7 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRestoreOnRetry, withRetry, withRetryNoSplit} import com.nvidia.spark.rapids.io.async.{AsyncOutputStream, TrafficController} +import com.nvidia.spark.rapids.jni.fileio.{RapidsFileIO, RapidsOutputFile, RapidsOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext @@ -78,7 +79,8 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext, statsTrackers: Seq[ColumnarWriteTaskStatsTracker], debugDumpPath: Option[String], holdGpuBetweenBatches: Boolean = false, - useAsyncWrite: Boolean = false) extends HostBufferConsumer with Logging { + useAsyncWrite: Boolean = false, + rapidsFileIO: RapidsFileIO) extends HostBufferConsumer with Logging { // Length of the file written so far. This is used to track the size of the file private var fileLength: Long = 0L @@ -125,24 +127,22 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext, private val trafficController: TrafficController = TrafficController.getWriteInstance - private def openOutputStream(): OutputStream = { - val hadoopPath = new Path(path) - val fs = hadoopPath.getFileSystem(conf) - fs.create(hadoopPath, false) + private def openOutputFile(): RapidsOutputFile = { + rapidsFileIO.newOutputFile(path()) } // This is implemented as a method to make it easier to subclass // ColumnarOutputWriter in the tests, and override this behavior. - protected def getOutputStream: OutputStream = { + protected def getOutputStream: RapidsOutputStream = { if (useAsyncWrite) { logWarning("Async output write enabled") - AsyncOutputStream(() => openOutputStream(), trafficController, statsTrackers) + AsyncOutputStream(() => openOutputFile().create(false), trafficController, statsTrackers) } else { - openOutputStream() + openOutputFile().create(false) } } - protected val outputStream: OutputStream = getOutputStream + protected val outputStream: RapidsOutputStream = getOutputStream private[this] val tempBuffer = new Array[Byte](128 * 1024) private[this] var anythingWritten = false @@ -303,7 +303,7 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext, object ColumnarOutputWriter { // write buffers to outputStream via tempBuffer and close buffers def writeBufferedData(buffers: mutable.Queue[(HostMemoryBuffer, Long)], - tempBuffer: Array[Byte], outputStream: OutputStream): Unit = { + tempBuffer: Array[Byte], outputStream: RapidsOutputStream): Unit = { val toProcess = buffers.dequeueAll(_ => true) try { toProcess.foreach { case (buffer, len) => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index e427163b085..1fa4b9de5ba 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -17,11 +17,11 @@ package com.nvidia.spark.rapids import java.time.ZoneId - import ai.rapids.cudf._ import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray import com.nvidia.spark.rapids.jni.DateTimeRebase +import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO import com.nvidia.spark.rapids.shims._ import com.nvidia.spark.rapids.shims.parquet._ import org.apache.hadoop.mapreduce.{Job, OutputCommitter, TaskAttemptContext} @@ -29,7 +29,6 @@ import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil - import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.execution.datasources.DataSourceUtils @@ -283,7 +282,7 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging { path: String, dataSchema: StructType, context: TaskAttemptContext, - statsTrackers: Seq[ColumnarWriteTaskStatsTracker], + statsTrackers: Seq[ColumnarWriteTaskStatsTracker], debugOutputPath: Option[String]): ColumnarOutputWriter = { new GpuParquetWriter(path, dataSchema, compressionType, outputTimestampType.toString, dateTimeRebaseMode, timestampRebaseMode, context, parquetFieldIdWriteEnabled, @@ -313,9 +312,10 @@ class GpuParquetWriter( statsTrackers: Seq[ColumnarWriteTaskStatsTracker], debugDumpPath: Option[String], holdGpuBetweenBatches: Boolean, - useAsyncWrite: Boolean) + useAsyncWrite: Boolean, + fileIO: RapidsFileIO) extends ColumnarOutputWriter(context, dataSchema, "Parquet", true, statsTrackers, - debugDumpPath, holdGpuBetweenBatches, useAsyncWrite) { + debugDumpPath, holdGpuBetweenBatches, useAsyncWrite, fileIO) { override def throwIfRebaseNeededInExceptionMode(batch: ColumnarBatch): Unit = { val cols = GpuColumnVector.extractBases(batch) cols.foreach { col => From 02022a3cc9b9d670395d23af9855800960873e76 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 26 Sep 2025 12:55:12 +0800 Subject: [PATCH 02/15] Complete --- .../spark/source/GpuSparkFileWriterFactory.scala | 5 +++-- .../iceberg/spark/source/GpuSparkWrite.scala | 6 ++++-- .../spark/rapids/ColumnarOutputWriter.scala | 3 ++- .../spark/rapids/GpuParquetFileFormat.scala | 6 ++++-- .../spark/sql/hive/rapids/GpuHiveFileFormat.scala | 15 ++++++++------- .../sql/rapids/GpuFileFormatDataWriter.scala | 6 +++++- .../spark/sql/rapids/GpuOrcFileFormat.scala | 12 +++++++----- 7 files changed, 33 insertions(+), 20 deletions(-) diff --git a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkFileWriterFactory.scala b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkFileWriterFactory.scala index 2d881669a88..195f73190f7 100644 --- a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkFileWriterFactory.scala +++ b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkFileWriterFactory.scala @@ -37,6 +37,7 @@ class GpuSparkFileWriterFactory(val table: Table, val deleteFileFormat: FileFormat, val columnarOutputWriterFactory: ColumnarOutputWriterFactory, val hadoopConf: SerializableConfiguration, + val fileIO: IcebergFileIO ) extends FileWriterFactory[SpillableColumnarBatch] { require(dataFileFormat == FileFormat.PARQUET, s"GpuSparkFileWriterFactory only supports PARQUET file format, but got $dataFileFormat") @@ -81,8 +82,8 @@ class GpuSparkFileWriterFactory(val table: Table, dataSchema = dataSparkType, context = taskAttemptContext, statsTrackers = Seq(jobStatsTracker.newTaskInstance()), - debugOutputPath = None - ).asInstanceOf[GpuParquetWriter] + debugOutputPath = None, + fileIO).asInstanceOf[GpuParquetWriter] new GpuIcebergParquetAppender( gpuWriter, diff --git a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala index 22b0a034800..29409cfad98 100644 --- a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala +++ b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala @@ -17,10 +17,10 @@ package org.apache.iceberg.spark.source import scala.collection.JavaConverters._ - import com.nvidia.spark.rapids.{ColumnarOutputWriterFactory, GpuParquetFileFormat, GpuWrite, SparkPlanMeta, SpillableColumnarBatch} import com.nvidia.spark.rapids.Arm.closeOnExcept import com.nvidia.spark.rapids.SpillPriorities.ACTIVE_ON_DECK_PRIORITY +import com.nvidia.spark.rapids.fileio.iceberg.IcebergFileIO import com.nvidia.spark.rapids.iceberg.GpuIcebergPartitioner import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job @@ -29,7 +29,6 @@ import org.apache.hadoop.shaded.org.apache.commons.lang3.reflect.{FieldUtils, Me import org.apache.iceberg.{DataFile, FileFormat, PartitionSpec, Schema, SerializableTable, SnapshotUpdate, Table} import org.apache.iceberg.io.{ClusteredDataWriter, DataWriteResult, FanoutDataWriter, FileIO, OutputFileFactory, PartitioningWriter, RollingDataWriter} import org.apache.iceberg.spark.source.SparkWrite.TaskCommit - import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession @@ -167,6 +166,9 @@ class GpuWriterFactory(val tableBroadcast: Broadcast[Table], val outputWriterFactory: ColumnarOutputWriterFactory, val hadoopConf: SerializableConfiguration, ) extends DataWriterFactory { + + private lazy val fileIO: IcebergFileIO = new IcebergFileIO(tableBroadcast.value.io()) + override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = { val table = tableBroadcast.value val spec = table.specs().get(outputSpecId) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala index e281f1c35de..b98b2ace202 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala @@ -64,7 +64,8 @@ abstract class ColumnarOutputWriterFactory extends Serializable { dataSchema: StructType, context: TaskAttemptContext, statsTrackers: Seq[ColumnarWriteTaskStatsTracker], - debugOutputPath: Option[String]): ColumnarOutputWriter + debugOutputPath: Option[String], + fileIO: RapidsFileIO): ColumnarOutputWriter } /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 1fa4b9de5ba..3675dc9a6ce 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -29,6 +29,7 @@ import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil + import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.execution.datasources.DataSourceUtils @@ -283,10 +284,11 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging { dataSchema: StructType, context: TaskAttemptContext, statsTrackers: Seq[ColumnarWriteTaskStatsTracker], - debugOutputPath: Option[String]): ColumnarOutputWriter = { + debugOutputPath: Option[String], + fileIO: RapidsFileIO): ColumnarOutputWriter = { new GpuParquetWriter(path, dataSchema, compressionType, outputTimestampType.toString, dateTimeRebaseMode, timestampRebaseMode, context, parquetFieldIdWriteEnabled, - statsTrackers, debugOutputPath, holdGpuBetweenBatches, asyncOutputWriteEnabled) + statsTrackers, debugOutputPath, holdGpuBetweenBatches, asyncOutputWriteEnabled, fileIO) } override def getFileExtension(context: TaskAttemptContext): String = { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala index ea0e2869a03..a4d7c5574cd 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala @@ -18,15 +18,14 @@ package org.apache.spark.sql.hive.rapids import java.nio.charset.Charset import java.util.Locale - -import ai.rapids.cudf.{CompressionType, CSVWriterOptions, DType, ParquetWriterOptions, QuoteStyle, Scalar, Table, TableWriter => CudfTableWriter} +import ai.rapids.cudf.{CSVWriterOptions, CompressionType, DType, ParquetWriterOptions, QuoteStyle, Scalar, Table, TableWriter => CudfTableWriter} import com.google.common.base.Charsets import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.jni.CastStrings +import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO import com.nvidia.spark.rapids.shims.BucketingUtilsShim import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} - import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions @@ -217,9 +216,10 @@ class GpuHiveParquetFileFormat(compType: CompressionType) extends ColumnarFileFo dataSchema: StructType, context: TaskAttemptContext, statsTrackers: Seq[ColumnarWriteTaskStatsTracker], - debugOutputPath: Option[String]): ColumnarOutputWriter = { + debugOutputPath: Option[String], + fileIO: RapidsFileIO): ColumnarOutputWriter = { new GpuHiveParquetWriter(path, dataSchema, context, compressionType, statsTrackers, - debugOutputPath) + debugOutputPath, fileIO) } } } @@ -228,9 +228,10 @@ class GpuHiveParquetFileFormat(compType: CompressionType) extends ColumnarFileFo class GpuHiveParquetWriter(override val path: String, dataSchema: StructType, context: TaskAttemptContext, compType: CompressionType, statsTrackers: Seq[ColumnarWriteTaskStatsTracker], - debugOutputPath: Option[String]) + debugOutputPath: Option[String], + fileIO: RapidsFileIO) extends ColumnarOutputWriter(context, dataSchema, "HiveParquet", true, statsTrackers, - debugOutputPath) { + debugOutputPath, fileIO) { override protected val tableWriter: CudfTableWriter = { val optionsBuilder = SchemaUtils diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala index 8911a57e747..7411223767e 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala @@ -26,6 +26,7 @@ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.withRetryNoSplit +import com.nvidia.spark.rapids.fileio.hadoop.HadoopFileIO import com.nvidia.spark.rapids.shims.GpuFileFormatDataWriterShim import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext @@ -623,7 +624,8 @@ class GpuDynamicPartitionDataSingleWriter( dataSchema = description.dataColumns.toStructType, context = taskAttemptContext, statsTrackers = statsTrackers, - debugOutputPath = debugOutputPath) + debugOutputPath = debugOutputPath, + description.fileIO) statsTrackers.foreach(_.newFile(currentPath)) outWriter @@ -984,6 +986,8 @@ class GpuWriteJobDescription( val concurrentWriterPartitionFlushSize: Long) extends Serializable { + lazy val fileIO: HadoopFileIO = new HadoopFileIO(serializableHadoopConf.value) + assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ dataColumns), s""" |All columns: ${allColumns.mkString(", ")} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala index 7fa3cbdb9bb..61ee5fc5d58 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.rapids import java.time.ZoneId - import ai.rapids.cudf._ import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO import com.nvidia.spark.rapids.shims.OrcShims import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -195,9 +195,10 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging { dataSchema: StructType, context: TaskAttemptContext, statsTrackers: Seq[ColumnarWriteTaskStatsTracker], - debugOutputPath: Option[String]): ColumnarOutputWriter = { + debugOutputPath: Option[String], + fileIO: RapidsFileIO): ColumnarOutputWriter = { new GpuOrcWriter(path, dataSchema, context, statsTrackers, debugOutputPath, - holdGpuBetweenBatches, orcStripeSizeRows, asyncOutputWriteEnabled) + holdGpuBetweenBatches, orcStripeSizeRows, asyncOutputWriteEnabled, fileIO) } override def getFileExtension(context: TaskAttemptContext): String = { @@ -224,9 +225,10 @@ class GpuOrcWriter( debugOutputPath: Option[String], holdGpuBetweenBatches: Boolean, orcStripeSizeRows: Option[Integer], - useAsyncWrite: Boolean) + useAsyncWrite: Boolean, + fileIO: RapidsFileIO) extends ColumnarOutputWriter(context, dataSchema, "ORC", true, statsTrackers, debugOutputPath, - holdGpuBetweenBatches, useAsyncWrite) { + holdGpuBetweenBatches, useAsyncWrite, fileIO) { override val tableWriter: TableWriter = { val builder = SchemaUtils From 2ab5a8c3de0a063f037a75c97e8e58f5f7d58b6b Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 26 Sep 2025 12:59:37 +0800 Subject: [PATCH 03/15] Fix build --- .../scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala index b98b2ace202..40911f89097 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala @@ -134,7 +134,7 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext, // This is implemented as a method to make it easier to subclass // ColumnarOutputWriter in the tests, and override this behavior. - protected def getOutputStream: RapidsOutputStream = { + protected def getOutputStream: OutputStream = { if (useAsyncWrite) { logWarning("Async output write enabled") AsyncOutputStream(() => openOutputFile().create(false), trafficController, statsTrackers) @@ -304,7 +304,7 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext, object ColumnarOutputWriter { // write buffers to outputStream via tempBuffer and close buffers def writeBufferedData(buffers: mutable.Queue[(HostMemoryBuffer, Long)], - tempBuffer: Array[Byte], outputStream: RapidsOutputStream): Unit = { + tempBuffer: Array[Byte], outputStream: OutputStream): Unit = { val toProcess = buffers.dequeueAll(_ => true) try { toProcess.foreach { case (buffer, len) => From b1d3eb134a7f8749bdbb5f60ff840747fbbfee40 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 26 Sep 2025 13:03:54 +0800 Subject: [PATCH 04/15] Fix build --- .../nvidia/spark/rapids/ColumnarOutputWriter.scala | 2 +- .../spark/sql/hive/rapids/GpuHiveFileFormat.scala | 12 +++++++----- .../spark/sql/rapids/GpuFileFormatDataWriter.scala | 3 ++- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala index 40911f89097..37d9e87b64e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala @@ -143,7 +143,7 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext, } } - protected val outputStream: RapidsOutputStream = getOutputStream + protected val outputStream: OutputStream = getOutputStream private[this] val tempBuffer = new Array[Byte](128 * 1024) private[this] var anythingWritten = false diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala index a4d7c5574cd..a47fbc4f857 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala @@ -231,7 +231,7 @@ class GpuHiveParquetWriter(override val path: String, dataSchema: StructType, debugOutputPath: Option[String], fileIO: RapidsFileIO) extends ColumnarOutputWriter(context, dataSchema, "HiveParquet", true, statsTrackers, - debugOutputPath, fileIO) { + debugOutputPath, false, false, fileIO) { override protected val tableWriter: CudfTableWriter = { val optionsBuilder = SchemaUtils @@ -261,8 +261,9 @@ class GpuHiveTextFileFormat extends ColumnarFileFormat with Logging with Seriali dataSchema: StructType, context: TaskAttemptContext, statsTrackers: Seq[ColumnarWriteTaskStatsTracker], - debugOutputPath: Option[String]): ColumnarOutputWriter = { - new GpuHiveTextWriter(path, dataSchema, context, statsTrackers, debugOutputPath) + debugOutputPath: Option[String], + fileIO: RapidsFileIO): ColumnarOutputWriter = { + new GpuHiveTextWriter(path, dataSchema, context, statsTrackers, debugOutputPath, fileIO) } } } @@ -272,9 +273,10 @@ class GpuHiveTextWriter(override val path: String, dataSchema: StructType, context: TaskAttemptContext, statsTrackers: Seq[ColumnarWriteTaskStatsTracker], - debugOutputPath: Option[String]) + debugOutputPath: Option[String], + fileIO: RapidsFileIO) extends ColumnarOutputWriter(context, dataSchema, "HiveText", false, statsTrackers, - debugOutputPath) { + debugOutputPath, false, false, fileIO) { /** * This reformats columns, to iron out inconsistencies between diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala index 7411223767e..9ecdf3d1d1d 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala @@ -280,7 +280,8 @@ class GpuSingleDirectoryDataWriter( dataSchema = description.dataColumns.toStructType, context = taskAttemptContext, statsTrackers = statsTrackers, - debugOutputPath = debugOutputPath) + debugOutputPath = debugOutputPath, + fileIO = description.fileIO) statsTrackers.foreach(_.newFile(currentPath)) } From 04bba3c1ece8ef9ca83e52aabcc73062c39899e5 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 26 Sep 2025 13:30:45 +0800 Subject: [PATCH 05/15] Fix import --- .../scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala index 37d9e87b64e..7b6bc084877 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala @@ -26,7 +26,7 @@ import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRestoreOnRetry, withRetry, withRetryNoSplit} import com.nvidia.spark.rapids.io.async.{AsyncOutputStream, TrafficController} -import com.nvidia.spark.rapids.jni.fileio.{RapidsFileIO, RapidsOutputFile, RapidsOutputStream} +import com.nvidia.spark.rapids.jni.fileio.{RapidsFileIO, RapidsOutputFile} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.TaskAttemptContext From bdfeaba2c8381a04960108f952be563f4e629cd7 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 26 Sep 2025 13:33:25 +0800 Subject: [PATCH 06/15] Fix Output --- .../rapids/fileio/iceberg/IcebergOutputStream.java | 5 ----- .../spark/rapids/fileio/hadoop/HadoopOutputStream.java | 10 ---------- 2 files changed, 15 deletions(-) diff --git a/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputStream.java b/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputStream.java index f366b0637af..ee2a34f8334 100644 --- a/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputStream.java +++ b/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergOutputStream.java @@ -51,11 +51,6 @@ public void flush() throws IOException { out.flush(); } - @Override - public void sync() throws IOException { - out.flush(); - } - @Override public void close() throws IOException { if (!closed) { diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java index 768ad58fc5f..301570fba1f 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopOutputStream.java @@ -53,16 +53,6 @@ public void flush() throws IOException { out.flush(); } - @Override - public void sync() throws IOException { - out.hsync(); - } - - @Override - public OutputStream getWrappedStream() { - return out; - } - @Override public void close() throws IOException { if (!closed) { From e77f0bc0a5f19f84c952425cf68b9b4e8cf20b00 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 26 Sep 2025 13:36:53 +0800 Subject: [PATCH 07/15] Fix build --- .../nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java | 6 +----- .../org/apache/iceberg/spark/source/GpuSparkWrite.scala | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java b/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java index 953ad10b71d..415a08f0b8b 100644 --- a/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java +++ b/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java @@ -40,11 +40,7 @@ public class IcebergFileIO implements RapidsFileIO { * that the delegate is closed when no longer used, e.g., iceberg table/catalog close. */ public IcebergFileIO(FileIO delegate) { - this(delegate, null); - } - - public IcebergFileIO(FileIO delegate, LocationProvider locationProvider) { - Objects.requireNonNull(delegate, "delegate can't be null"); + Objects.requireNonNull(delegate, "delegate can't be null!"); this.delegate = delegate; } diff --git a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala index 29409cfad98..0f848ddc453 100644 --- a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala +++ b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala @@ -187,8 +187,8 @@ class GpuWriterFactory(val tableBroadcast: Broadcast[Table], table.sortOrder(), format, outputWriterFactory, - hadoopConf - ) + hadoopConf, + fileIO) if (spec.isUnpartitioned) { new GpuUnpartitionedDataWriter(writerFactory, outputFileFactory, io, spec, targetFileSize) From 143705ebd284bfe3ee6500080de9bc8bc81a5bdf Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 26 Sep 2025 13:41:44 +0800 Subject: [PATCH 08/15] Fix build --- .../spark/sql/rapids/GpuFileFormatDataWriterSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala index a1b0f04b51a..6049dce2c08 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala @@ -61,7 +61,8 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { rangeName, includeRetry, mockJobDescription.statsTrackers.map(_.newTaskInstance()), - None) { + None, + mockJobDescription.fileIO) { // this writer (for tests) doesn't do anything and passes through the // batch passed to it when asked to transform, which is done to @@ -95,7 +96,7 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { types, "", includeRetry)) - when(mockOutputWriterFactory.newInstance(any(), any(), any(), any(), any())) + when(mockOutputWriterFactory.newInstance(any(), any(), any(), any(), any(), any())) .thenAnswer(_ => mockOutputWriter) } From 29c55df14385a54e1ac0c1bd44e55f2313f0aaa9 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 26 Sep 2025 13:45:30 +0800 Subject: [PATCH 09/15] Fix build --- .../apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala index 6049dce2c08..bf3e2924320 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriterSuite.scala @@ -62,6 +62,8 @@ class GpuFileFormatDataWriterSuite extends AnyFunSuite with BeforeAndAfterEach { includeRetry, mockJobDescription.statsTrackers.map(_.newTaskInstance()), None, + false, + false, mockJobDescription.fileIO) { // this writer (for tests) doesn't do anything and passes through the From 4384bf37e2af2be636db72095f952715f2a4f407 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Fri, 26 Sep 2025 14:07:23 +0800 Subject: [PATCH 10/15] Remove test skip --- .../src/main/python/iceberg/iceberg_append_test.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/iceberg/iceberg_append_test.py b/integration_tests/src/main/python/iceberg/iceberg_append_test.py index f6638fb38cb..1cfa7251542 100644 --- a/integration_tests/src/main/python/iceberg/iceberg_append_test.py +++ b/integration_tests/src/main/python/iceberg/iceberg_append_test.py @@ -23,12 +23,8 @@ from marks import iceberg, ignore_order, allow_non_gpu from spark_session import with_gpu_session, with_cpu_session, is_spark_35x -pytestmark = [ - pytest.mark.skipif(not is_spark_35x(), - reason="Current spark-rapids only support spark 3.5.x"), - pytest.mark.skipif(is_iceberg_remote_catalog(), - reason="https://github.com/NVIDIA/spark-rapids/issues/13471") -] +pytestmark = pytest.mark.skipif(not is_spark_35x(), + reason="Current spark-rapids only support spark 3.5.x") def do_test_insert_into_table_sql(spark_tmp_table_factory, From c5a9f804bd1b63e233a478c85b76c8747db4163f Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Sun, 28 Sep 2025 17:08:15 +0800 Subject: [PATCH 11/15] fix build break --- .../scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala | 2 ++ .../scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala | 1 + .../org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala | 2 ++ .../scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala | 1 + 4 files changed, 6 insertions(+) diff --git a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala index 625531bb2ad..712e21076eb 100644 --- a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala +++ b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala @@ -17,6 +17,7 @@ package org.apache.iceberg.spark.source import scala.collection.JavaConverters._ + import com.nvidia.spark.rapids.{ColumnarOutputWriterFactory, GpuParquetFileFormat, GpuWrite, SparkPlanMeta, SpillableColumnarBatch} import com.nvidia.spark.rapids.Arm.closeOnExcept import com.nvidia.spark.rapids.SpillPriorities.ACTIVE_ON_DECK_PRIORITY @@ -29,6 +30,7 @@ import org.apache.hadoop.shaded.org.apache.commons.lang3.reflect.{FieldUtils, Me import org.apache.iceberg.{DataFile, FileFormat, PartitionSpec, Schema, SerializableTable, SnapshotUpdate, Table} import org.apache.iceberg.io.{DataWriteResult, FileIO, GpuClusteredDataWriter, GpuFanoutDataWriter, GpuRollingDataWriter, OutputFileFactory, PartitioningWriter} import org.apache.iceberg.spark.source.SparkWrite.TaskCommit + import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.SparkSession diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 3675dc9a6ce..0ca808551b8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids import java.time.ZoneId + import ai.rapids.cudf._ import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala index a47fbc4f857..6f3072e5036 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.rapids import java.nio.charset.Charset import java.util.Locale + import ai.rapids.cudf.{CSVWriterOptions, CompressionType, DType, ParquetWriterOptions, QuoteStyle, Scalar, Table, TableWriter => CudfTableWriter} import com.google.common.base.Charsets import com.nvidia.spark.rapids._ @@ -26,6 +27,7 @@ import com.nvidia.spark.rapids.jni.CastStrings import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO import com.nvidia.spark.rapids.shims.BucketingUtilsShim import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} + import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala index 61ee5fc5d58..69b14936eef 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuOrcFileFormat.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.rapids import java.time.ZoneId + import ai.rapids.cudf._ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO From 37c870ef018d02c1b7a20729edf408720480d248 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Sun, 28 Sep 2025 17:18:14 +0800 Subject: [PATCH 12/15] fix build break --- .../org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala index 6f3072e5036..47a330bc365 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveFileFormat.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.rapids import java.nio.charset.Charset import java.util.Locale -import ai.rapids.cudf.{CSVWriterOptions, CompressionType, DType, ParquetWriterOptions, QuoteStyle, Scalar, Table, TableWriter => CudfTableWriter} +import ai.rapids.cudf.{CompressionType, CSVWriterOptions, DType, ParquetWriterOptions, QuoteStyle, Scalar, Table, TableWriter => CudfTableWriter} import com.google.common.base.Charsets import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.withResource From 503d74163319ed0e95a895da95ddf9a31538f199 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 9 Oct 2025 17:25:08 +0800 Subject: [PATCH 13/15] Skip set output path --- .../scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala index 7620dd7956d..d51a3a1964b 100644 --- a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala +++ b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala @@ -97,7 +97,7 @@ class GpuSparkWrite(cpu: SparkWrite) extends GpuWrite with RequiresDistributionA val tmpJob = Job.getInstance(hadoopConf) tmpJob.setOutputKeyClass(classOf[Void]) tmpJob.setOutputValueClass(classOf[InternalRow]) - FileOutputFormat.setOutputPath(tmpJob, new Path(table.location())) +// FileOutputFormat.setOutputPath(tmpJob, new Path(table.location())) tmpJob } From 7e5033d9bb428088d4b8439706a009ccabece1eb Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 9 Oct 2025 17:28:17 +0800 Subject: [PATCH 14/15] Fix build break --- .../scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala index d51a3a1964b..669c17b4b1b 100644 --- a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala +++ b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala @@ -24,9 +24,7 @@ import com.nvidia.spark.rapids.Arm.closeOnExcept import com.nvidia.spark.rapids.SpillPriorities.ACTIVE_ON_DECK_PRIORITY import com.nvidia.spark.rapids.fileio.iceberg.IcebergFileIO import com.nvidia.spark.rapids.iceberg.GpuIcebergPartitioner -import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.shaded.org.apache.commons.lang3.reflect.{FieldUtils, MethodUtils} import org.apache.iceberg.{DataFile, FileFormat, PartitionSpec, Schema, SerializableTable, SnapshotUpdate, Table} import org.apache.iceberg.io.{DataWriteResult, FileIO, GpuClusteredDataWriter, GpuFanoutDataWriter, GpuRollingDataWriter, OutputFileFactory, PartitioningWriter} From 7cd078a791f17149d791b67716a1bf4bac825e17 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Thu, 9 Oct 2025 17:39:46 +0800 Subject: [PATCH 15/15] Fix test --- .../com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java | 2 +- .../scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java b/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java index 415a08f0b8b..c7c8461ddbb 100644 --- a/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java +++ b/iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java @@ -40,7 +40,7 @@ public class IcebergFileIO implements RapidsFileIO { * that the delegate is closed when no longer used, e.g., iceberg table/catalog close. */ public IcebergFileIO(FileIO delegate) { - Objects.requireNonNull(delegate, "delegate can't be null!"); + Objects.requireNonNull(delegate, "delegate can't be null"); this.delegate = delegate; } diff --git a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala index 669c17b4b1b..5bee0e2f391 100644 --- a/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala +++ b/iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala @@ -95,7 +95,6 @@ class GpuSparkWrite(cpu: SparkWrite) extends GpuWrite with RequiresDistributionA val tmpJob = Job.getInstance(hadoopConf) tmpJob.setOutputKeyClass(classOf[Void]) tmpJob.setOutputValueClass(classOf[InternalRow]) -// FileOutputFormat.setOutputPath(tmpJob, new Path(table.location())) tmpJob }