Skip to content

Commit 80da99b

Browse files
committed
Initial Implementation
1 parent c3a8ba5 commit 80da99b

4 files changed

Lines changed: 108 additions & 0 deletions

File tree

docs/additional-functionality/advanced_configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ Name | Description | Default Value | Applicable at
5050
<a name="memory.gpu.reserve"></a>spark.rapids.memory.gpu.reserve|The amount of GPU memory that should remain unallocated by RMM and left for system use such as memory needed for kernels and kernel launches.|671088640|Startup
5151
<a name="memory.gpu.state.debug"></a>spark.rapids.memory.gpu.state.debug|To better recover from out of memory errors, RMM will track several states for the threads that interact with the GPU. This provides a log of those state transitions to aid in debugging it. STDOUT or STDERR will have the logging go there empty string will disable logging and anything else will be treated as a file to write the logs to.||Startup
5252
<a name="memory.gpu.unspill.enabled"></a>spark.rapids.memory.gpu.unspill.enabled|When a spilled GPU buffer is needed again, should it be unspilled, or only copied back into GPU memory temporarily. Unspilling may be useful for GPU buffers that are needed frequently, for example, broadcast variables; however, it may also increase GPU memory usage|false|Startup
53+
<a name="perfio.gcs.enabled"></a>spark.rapids.perfio.gcs.enabled|Controls the Google Cloud Storage reader for improved performance in certain queries. When true, enables it and throws at startup if google-cloud-storage classes are not on the classpath. When false, disables it unconditionally. When unset (default), enables it opportunistically if google-cloud-storage classes are found, otherwise falls back to the configured GCS connector with a warning. The presence of com.google.cloud:google-cloud-storage on the executor classpath is required.|None|Startup
5354
<a name="perfio.s3.enabled"></a>spark.rapids.perfio.s3.enabled|Controls the AWS S3 reader for improved performance in certain queries. When true, enables it and throws at startup if no compatible HTTP client is on the classpath. When false, disables it unconditionally. When unset (default), enables it opportunistically if a compatible HTTP client is found, otherwise falls back to S3A with a warning. The presence of AWS SDK packages for Netty and/or CRT HTTP clients on the classpath is required. You can use Spark submit option `--packages software.amazon.awssdk:s3:2.22.12,software.amazon.awssdk:aws-crt-client:2.22.12` to achieve this. See https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/crt-based-s3-client.html#crt-based-s3-client-depend|None|Startup
5455
<a name="python.concurrentPythonWorkers"></a>spark.rapids.python.concurrentPythonWorkers|Set the number of Python worker processes that can execute concurrently per GPU. Python worker processes may temporarily block when the number of concurrent Python worker processes started by the same executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors. >0 means enabled, while <=0 means unlimited|0|Runtime
5556
<a name="python.memory.gpu.allocFraction"></a>spark.rapids.python.memory.gpu.allocFraction|The fraction of total GPU memory that should be initially allocated for pooled memory for all the Python workers. It supposes to be less than (1 - $(spark.rapids.memory.gpu.allocFraction)), since the executor will share the GPU with its owning Python workers. Half of the rest will be used if not specified|None|Runtime

sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/RapidsInputFiles.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,17 @@ public static boolean isS3PerfEnabled() {
3838
}
3939
return env.conf().getBoolean(PerfIOConf.S3PERF_ENABLED().key(), false);
4040
}
41+
/**
42+
* True iff {@code spark.rapids.perfio.gcs.enabled} is set to {@code true} on
43+
* the active SparkConf. Returns false when no {@link SparkEnv} is initialized
44+
* (e.g. before driver bring-up) so callers default to the non-PerfIO path.
45+
*/
46+
public static boolean isGCSPerfEnabled() {
47+
SparkEnv env = SparkEnv.get();
48+
if (env == null) {
49+
return false;
50+
}
51+
return env.conf().getBoolean(PerfIOConf.GCSPERF_ENABLED().key(), false);
52+
}
53+
4154
}

sql-plugin/src/main/java/com/nvidia/spark/rapids/fileio/hadoop/HadoopFileIO.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public RapidsInputFile newInputFile(Path path) throws IOException {
5151
if (scheme != null && scheme.startsWith("s3") && RapidsInputFiles.isS3PerfEnabled()) {
5252
return S3InputFile.create(path, hadoopConf.value());
5353
}
54+
if (scheme != null && (scheme.equals("gs") || scheme.equals("gcs")) &&
55+
RapidsInputFiles.isGCSPerfEnabled()) {
56+
return GCSInputFile.create(path, hadoopConf.value());
57+
}
5458
return HadoopInputFile.create(path, hadoopConf.value());
5559
}
5660

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright (c) 2026, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.nvidia.spark.rapids.fileio.hadoop
18+
19+
import java.io.IOException
20+
import java.net.URI
21+
import java.util.OptionalLong
22+
23+
import scala.collection.JavaConverters._
24+
25+
import ai.rapids.cudf.HostMemoryBuffer
26+
import com.nvidia.spark.rapids.{IntRangeWithOffset, PerfIO, RangeWithOffset, SuffixRangeWithOffset}
27+
import com.nvidia.spark.rapids.jni.fileio.{RapidsInputFile, SeekableInputStream}
28+
import org.apache.hadoop.conf.Configuration
29+
import org.apache.hadoop.fs.Path
30+
31+
/**
32+
* GCS-backed {@link RapidsInputFile} for Hadoop-conf-driven (non-iceberg) reads.
33+
* {@code readVectored} issues batched byte-range reads through the optimized
34+
* PerfIO path; the other operations delegate to the standard {@link HadoopInputFile}.
35+
*/
36+
class GCSInputFile private (
37+
delegate: HadoopInputFile,
38+
fileUri: URI,
39+
hadoopConf: Configuration)
40+
extends RapidsInputFile {
41+
42+
override def path(): String = delegate.path()
43+
44+
@throws[IOException]
45+
override def getLength(): Long = delegate.getLength()
46+
47+
@throws[IOException]
48+
override def getLastModificationTime(): OptionalLong = delegate.getLastModificationTime()
49+
50+
@throws[IOException]
51+
override def open(): SeekableInputStream = delegate.open()
52+
53+
@throws[IOException]
54+
override def readVectored(
55+
output: HostMemoryBuffer,
56+
copyRanges: java.util.List[RapidsInputFile.CopyRange]): Unit = {
57+
val ranges = copyRanges.asScala.map { r =>
58+
IntRangeWithOffset(r.getInputOffset, r.getLength, r.getOutputOffset)
59+
}.toSeq
60+
require(
61+
PerfIO.readToHostMemory(hadoopConf, output, fileUri, ranges).isDefined,
62+
"expected to use PerfIO to read")
63+
}
64+
65+
/**
66+
* Issue a single suffix-range read for the last {@code length} bytes. Avoids
67+
* the {@code getLength()} round-trip the default {@link RapidsInputFile#readTail}
68+
* would make. PerfIO resolves the GCS suffix range internally.
69+
*/
70+
@throws[IOException]
71+
override def readTail(length: Long, output: HostMemoryBuffer): Unit = {
72+
if (length == 0) {
73+
return
74+
}
75+
if (length < 0) {
76+
throw new IllegalArgumentException("length must be non-negative")
77+
}
78+
val ranges = Seq[RangeWithOffset](SuffixRangeWithOffset(length, /*destOffset*/ 0L))
79+
require(
80+
PerfIO.readToHostMemory(hadoopConf, output, fileUri, ranges).isDefined,
81+
"expected to use PerfIO to read")
82+
}
83+
}
84+
85+
object GCSInputFile {
86+
@throws[IOException]
87+
def create(filePath: Path, conf: Configuration): GCSInputFile = {
88+
new GCSInputFile(HadoopInputFile.create(filePath, conf), filePath.toUri, conf)
89+
}
90+
}

0 commit comments

Comments
 (0)