Skip to content

Commit 86f2903

Browse files
Use RapidsFileIO for writing data. (#13501)
Fixes #13471 . ### Description This is blocked by NVIDIA/cudf-spark-jni#3768. In this pr we implemented output related interface for `HadoopFileIO` and `IcebergFileIO`, and use them in `ColumnarOutputWriter`. ### Checklists - [x] This PR has added documentation for new or modified features or behaviors. - [x] This PR has added new tests or modified existing tests to cover new code paths. (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.) - [ ] Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description. --------- Signed-off-by: liurenjie1024 <liurenjie2008@gmail.com>
1 parent 6f4c806 commit 86f2903

15 files changed

Lines changed: 310 additions & 43 deletions

File tree

iceberg/src/main/java/com/nvidia/spark/rapids/fileio/iceberg/IcebergFileIO.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO;
2020
import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile;
21+
import com.nvidia.spark.rapids.jni.fileio.RapidsOutputFile;
2122
import org.apache.iceberg.io.FileIO;
2223

2324
import java.io.IOException;
@@ -48,4 +49,9 @@ public IcebergFileIO(FileIO delegate) {
4849
public IcebergInputFile newInputFile(String path) throws IOException {
4950
return new IcebergInputFile(delegate.newInputFile(path));
5051
}
52+
53+
@Override
54+
public IcebergOutputFile newOutputFile(String path) throws IOException {
55+
return new IcebergOutputFile(delegate.newOutputFile(path));
56+
}
5157
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2025, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.nvidia.spark.rapids.fileio.iceberg;
18+
19+
import com.nvidia.spark.rapids.jni.fileio.RapidsOutputFile;
20+
import org.apache.iceberg.io.OutputFile;
21+
22+
import java.io.IOException;
23+
import java.util.Objects;
24+
25+
/**
26+
* Implementation of {@link RapidsOutputFile} using Iceberg {@link OutputFile}.
27+
*/
28+
public class IcebergOutputFile implements RapidsOutputFile {
29+
private final OutputFile delegate;
30+
31+
public IcebergOutputFile(OutputFile delegate) {
32+
Objects.requireNonNull(delegate, "delegate can't be null");
33+
this.delegate = delegate;
34+
}
35+
36+
@Override
37+
public IcebergOutputStream create(boolean overwrite) throws IOException {
38+
if (overwrite) {
39+
return new IcebergOutputStream(delegate.createOrOverwrite());
40+
}
41+
return new IcebergOutputStream(delegate.create());
42+
}
43+
44+
@Override
45+
public String getPath() {
46+
return delegate.location();
47+
}
48+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2025, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.nvidia.spark.rapids.fileio.iceberg;
18+
19+
import com.nvidia.spark.rapids.jni.fileio.RapidsOutputStream;
20+
import org.apache.iceberg.io.PositionOutputStream;
21+
22+
import java.io.IOException;
23+
import java.io.OutputStream;
24+
25+
import static java.util.Objects.requireNonNull;
26+
27+
/**
28+
* A {@link RapidsOutputStream} implementation that wraps an Iceberg {@link PositionOutputStream}.
29+
*/
30+
public class IcebergOutputStream extends RapidsOutputStream {
31+
private final PositionOutputStream out;
32+
private boolean closed;
33+
34+
public IcebergOutputStream(PositionOutputStream out) {
35+
this.out = requireNonNull(out, "out can't be null");
36+
this.closed = false;
37+
}
38+
39+
@Override
40+
public void write(int b) throws IOException {
41+
out.write(b);
42+
}
43+
44+
@Override
45+
public void write(byte[] b, int off, int len) throws IOException {
46+
out.write(b, off, len);
47+
}
48+
49+
@Override
50+
public void flush() throws IOException {
51+
out.flush();
52+
}
53+
54+
@Override
55+
public void close() throws IOException {
56+
if (!closed) {
57+
out.close();
58+
closed = true;
59+
}
60+
}
61+
}

iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkFileWriterFactory.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class GpuSparkFileWriterFactory(val table: Table,
3838
val columnarOutputWriterFactory: ColumnarOutputWriterFactory,
3939
val taskStatsTracker: ColumnarWriteTaskStatsTracker,
4040
val hadoopConf: SerializableConfiguration,
41+
val fileIO: IcebergFileIO
4142
) extends FileWriterFactory[SpillableColumnarBatch] {
4243
require(dataFileFormat == FileFormat.PARQUET,
4344
s"GpuSparkFileWriterFactory only supports PARQUET file format, but got $dataFileFormat")
@@ -78,8 +79,8 @@ class GpuSparkFileWriterFactory(val table: Table,
7879
dataSchema = dataSparkType,
7980
context = taskAttemptContext,
8081
statsTrackers = Seq(taskStatsTracker),
81-
debugOutputPath = None
82-
).asInstanceOf[GpuParquetWriter]
82+
debugOutputPath = None,
83+
fileIO).asInstanceOf[GpuParquetWriter]
8384

8485
new GpuIcebergParquetAppender(
8586
gpuWriter,

iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@ import scala.util.{Failure, Success}
2222
import com.nvidia.spark.rapids.{ColumnarOutputWriterFactory, GpuParquetFileFormat, GpuWrite, SparkPlanMeta, SpillableColumnarBatch}
2323
import com.nvidia.spark.rapids.Arm.closeOnExcept
2424
import com.nvidia.spark.rapids.SpillPriorities.ACTIVE_ON_DECK_PRIORITY
25+
import com.nvidia.spark.rapids.fileio.iceberg.IcebergFileIO
2526
import com.nvidia.spark.rapids.iceberg.GpuIcebergPartitioner
26-
import org.apache.hadoop.fs.Path
2727
import org.apache.hadoop.mapreduce.Job
28-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
2928
import org.apache.hadoop.shaded.org.apache.commons.lang3.reflect.{FieldUtils, MethodUtils}
3029
import org.apache.iceberg.{DataFile, FileFormat, PartitionSpec, Schema, SerializableTable, SnapshotUpdate, Table}
3130
import org.apache.iceberg.io.{DataWriteResult, FileIO, GpuClusteredDataWriter, GpuFanoutDataWriter, GpuRollingDataWriter, OutputFileFactory, PartitioningWriter}
@@ -96,7 +95,6 @@ class GpuSparkWrite(cpu: SparkWrite) extends GpuWrite with RequiresDistributionA
9695
val tmpJob = Job.getInstance(hadoopConf)
9796
tmpJob.setOutputKeyClass(classOf[Void])
9897
tmpJob.setOutputValueClass(classOf[InternalRow])
99-
FileOutputFormat.setOutputPath(tmpJob, new Path(table.location()))
10098
tmpJob
10199
}
102100

@@ -202,6 +200,8 @@ class GpuWriterFactory(val tableBroadcast: Broadcast[Table],
202200
val hadoopConf: SerializableConfiguration
203201
) extends DataWriterFactory {
204202

203+
private lazy val fileIO: IcebergFileIO = new IcebergFileIO(tableBroadcast.value.io())
204+
205205
override def createWriter(partitionId: Int, taskId: Long): DataWriter[InternalRow] = {
206206
val table = tableBroadcast.value
207207
val spec = table.specs().get(outputSpecId)
@@ -221,7 +221,8 @@ class GpuWriterFactory(val tableBroadcast: Broadcast[Table],
221221
format,
222222
outputWriterFactory,
223223
statsTracker.newTaskInstance(),
224-
hadoopConf)
224+
hadoopConf,
225+
fileIO)
225226

226227
if (spec.isUnpartitioned) {
227228
new GpuUnpartitionedDataWriter(writerFactory, outputFileFactory, io, spec, targetFileSize)

integration_tests/src/main/python/iceberg/iceberg_append_test.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,8 @@
2323
from marks import iceberg, ignore_order, allow_non_gpu
2424
from spark_session import with_gpu_session, with_cpu_session, is_spark_35x
2525

26-
pytestmark = [
27-
pytest.mark.skipif(not is_spark_35x(),
28-
reason="Current spark-rapids only support spark 3.5.x"),
29-
pytest.mark.skipif(is_iceberg_remote_catalog(),
30-
reason="https://github.com/NVIDIA/spark-rapids/issues/13471")
31-
]
26+
pytestmark = pytest.mark.skipif(not is_spark_35x(),
27+
reason="Current spark-rapids only support spark 3.5.x")
3228

3329

3430
def do_test_insert_into_table_sql(spark_tmp_table_factory,

sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopFileIO.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO;
2020
import com.nvidia.spark.rapids.jni.fileio.RapidsInputFile;
21+
import com.nvidia.spark.rapids.jni.fileio.RapidsOutputFile;
2122
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.fs.FileSystem;
2224
import org.apache.hadoop.fs.Path;
2325
import org.apache.spark.util.SerializableConfiguration;
2426

@@ -38,12 +40,18 @@ public HadoopFileIO(Configuration hadoopConf) {
3840
}
3941

4042
@Override
41-
public RapidsInputFile newInputFile(String path) throws IOException {
43+
public HadoopInputFile newInputFile(String path) throws IOException {
4244
return this.newInputFile(new Path(path));
4345
}
4446

4547
@Override
46-
public RapidsInputFile newInputFile(Path path) throws IOException {
48+
public HadoopInputFile newInputFile(Path path) throws IOException {
4749
return HadoopInputFile.create(path, hadoopConf.value());
4850
}
51+
52+
@Override
53+
public HadoopOutputFile newOutputFile(String path) throws IOException {
54+
Objects.requireNonNull(path, "path can't be null");
55+
return HadoopOutputFile.create(new Path(path), hadoopConf.value());
56+
}
4957
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) 2025, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.nvidia.spark.rapids.fileio.hadoop;
18+
19+
import com.nvidia.spark.rapids.jni.fileio.RapidsOutputFile;
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.FSDataOutputStream;
22+
import org.apache.hadoop.fs.FileSystem;
23+
import org.apache.hadoop.fs.Path;
24+
25+
import java.io.IOException;
26+
import java.util.Objects;
27+
import java.util.concurrent.ConcurrentHashMap;
28+
29+
/**
30+
* Implementation of {@link RapidsOutputFile} using the Hadoop file system.
31+
* <br/>
32+
* This class provides methods to create an output file and to obtain the absolute path.
33+
*/
34+
public class HadoopOutputFile implements RapidsOutputFile {
35+
private final Path filePath;
36+
private final FileSystem fs;
37+
38+
public static HadoopOutputFile create(Path filePath, Configuration conf)
39+
throws IOException {
40+
Objects.requireNonNull(filePath, "filePath can't be null");
41+
Objects.requireNonNull(conf, "Hadoop conf can't be null");
42+
FileSystem fs = filePath.getFileSystem(conf);
43+
return new HadoopOutputFile(filePath, fs);
44+
}
45+
46+
private HadoopOutputFile(Path filePath, FileSystem fs) {
47+
Objects.requireNonNull(filePath, "filePath can't be null");
48+
Objects.requireNonNull(fs, "FileSystem can't be null");
49+
this.filePath = filePath;
50+
this.fs = fs;
51+
}
52+
53+
@Override
54+
public HadoopOutputStream create(boolean overwrite) throws IOException {
55+
FSDataOutputStream output = fs.create(filePath, overwrite);
56+
return new HadoopOutputStream(output);
57+
}
58+
59+
@Override
60+
public String getPath() {
61+
return filePath.toString();
62+
}
63+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) 2025, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.nvidia.spark.rapids.fileio.hadoop;
18+
19+
import com.nvidia.spark.rapids.jni.fileio.RapidsOutputStream;
20+
import org.apache.hadoop.fs.FSDataOutputStream;
21+
22+
import java.io.IOException;
23+
import java.io.OutputStream;
24+
25+
import static java.util.Objects.requireNonNull;
26+
27+
/**
28+
* A {@link RapidsOutputStream} implementation that wraps a Hadoop {@link FSDataOutputStream}.
29+
* <br/>
30+
* This class delegates to the underlying output stream for write and close operations.
31+
*/
32+
public class HadoopOutputStream extends RapidsOutputStream {
33+
private final FSDataOutputStream out;
34+
private boolean closed;
35+
36+
public HadoopOutputStream(FSDataOutputStream out) {
37+
this.out = requireNonNull(out, "out can't be null");
38+
this.closed = false;
39+
}
40+
41+
@Override
42+
public void write(int b) throws IOException {
43+
out.write(b);
44+
}
45+
46+
@Override
47+
public void write(byte[] b, int off, int len) throws IOException {
48+
out.write(b, off, len);
49+
}
50+
51+
@Override
52+
public void flush() throws IOException {
53+
out.flush();
54+
}
55+
56+
@Override
57+
public void close() throws IOException {
58+
if (!closed) {
59+
out.close();
60+
closed = true;
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)