Skip to content

Commit 5f9363a

Browse files
committed
Adapt SQL RAPIDS callers to columnar helpers
Signed-off-by: Gera Shegalov <gshegalov@nvidia.com>
1 parent b6bb8fe commit 5f9363a

47 files changed

Lines changed: 623 additions & 357 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoreDumpHandler.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023, NVIDIA CORPORATION.
2+
* Copyright (c) 2023-2026, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,13 +29,32 @@ import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
2929

3030
import org.apache.spark.SparkContext
3131
import org.apache.spark.api.plugin.PluginContext
32-
import org.apache.spark.internal.Logging
3332
import org.apache.spark.io.CompressionCodec
3433
import org.apache.spark.sql.SparkSession
3534
import org.apache.spark.sql.rapids.execution.TrampolineUtil
3635
import org.apache.spark.util.SerializableConfiguration
3736

38-
object GpuCoreDumpHandler extends Logging {
37+
object GpuCoreDumpHandler {
38+
private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$"))
39+
40+
private def logInfo(msg: => String): Unit = {
41+
if (log.isInfoEnabled) {
42+
log.info(msg)
43+
}
44+
}
45+
46+
private def logWarning(msg: => String, throwable: Throwable): Unit = {
47+
log.warn(msg, throwable)
48+
}
49+
50+
private def logError(msg: => String): Unit = {
51+
log.error(msg)
52+
}
53+
54+
private def logError(msg: => String, throwable: Throwable): Unit = {
55+
log.error(msg, throwable)
56+
}
57+
3958
private var executor: Option[ExecutorService] = None
4059
private var dumpedPath: Option[String] = None
4160
private var namedPipeFile: File = _

sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDeviceManager.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import com.nvidia.spark.rapids.jni.RmmSpark
2727
import com.nvidia.spark.rapids.spill.SpillFramework
2828

2929
import org.apache.spark.{SparkConf, SparkEnv, TaskContext}
30-
import org.apache.spark.internal.Logging
3130
import org.apache.spark.network.util.ByteUnit
3231
import org.apache.spark.resource.ResourceInformation
3332
import org.apache.spark.sql.internal.SQLConf
@@ -38,7 +37,29 @@ private case object Initialized extends MemoryState
3837
private case object Uninitialized extends MemoryState
3938
private case object Errored extends MemoryState
4039

41-
object GpuDeviceManager extends Logging {
40+
object GpuDeviceManager {
41+
private val log = org.slf4j.LoggerFactory.getLogger(getClass.getName.stripSuffix("$"))
42+
43+
private def logDebug(msg: => String): Unit = {
44+
if (log.isDebugEnabled) {
45+
log.debug(msg)
46+
}
47+
}
48+
49+
private def logInfo(msg: => String): Unit = {
50+
if (log.isInfoEnabled) {
51+
log.info(msg)
52+
}
53+
}
54+
55+
private def logWarning(msg: => String): Unit = {
56+
log.warn(msg)
57+
}
58+
59+
private def logError(msg: => String): Unit = {
60+
log.error(msg)
61+
}
62+
4263
// This config controls whether RMM/Pinned memory are initialized from the task
4364
// or from the executor side plugin. The default is to initialize from the
4465
// executor plugin.

sql-plugin/src/main/scala/com/nvidia/spark/rapids/InternalExclusiveModeGpuDiscoveryPlugin.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2021, NVIDIA CORPORATION.
2+
* Copyright (c) 2021-2026, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,15 +24,30 @@ import ai.rapids.cudf.Cuda
2424

2525
import org.apache.spark.SparkConf
2626
import org.apache.spark.api.resource.ResourceDiscoveryPlugin
27-
import org.apache.spark.internal.Logging
2827
import org.apache.spark.resource.{ResourceInformation, ResourceRequest}
2928

3029
/**
3130
* Note, this class should not be referenced directly in source code.
3231
* It should be loaded by reflection using ShimLoader.newInstanceOf, see ./docs/dev/shims.md
3332
*/
3433
protected class InternalExclusiveModeGpuDiscoveryPlugin
35-
extends ResourceDiscoveryPlugin with Logging {
34+
extends ResourceDiscoveryPlugin {
35+
36+
private val log = org.slf4j.LoggerFactory.getLogger(
37+
classOf[InternalExclusiveModeGpuDiscoveryPlugin])
38+
39+
private def logInfo(msg: => String): Unit = {
40+
if (log.isInfoEnabled) {
41+
log.info(msg)
42+
}
43+
}
44+
45+
private def logWarning(msg: => String): Unit = {
46+
if (log.isWarnEnabled) {
47+
log.warn(msg)
48+
}
49+
}
50+
3651
override def discoverResource(
3752
request: ResourceRequest,
3853
sparkconf: SparkConf

sql-plugin/src/main/scala/com/nvidia/spark/rapids/python/GpuPythonArguments.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2025, NVIDIA CORPORATION.
2+
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,11 +27,11 @@ import org.apache.spark.sql.types.DataType
2727
* @param argOffsets The offsets of the original arguments in "flattenedArgs"
2828
* @param argNames The optional argument names
2929
*/
30-
case class GpuPythonArguments(
31-
flattenedArgs: Seq[Expression],
32-
flattenedTypes: Seq[DataType],
33-
argOffsets: Array[Array[Int]],
34-
argNames: Option[Array[Array[Option[String]]]])
30+
class GpuPythonArguments(
31+
val flattenedArgs: Seq[Expression],
32+
val flattenedTypes: Seq[DataType],
33+
val argOffsets: Array[Array[Int]],
34+
val argNames: Option[Array[Array[Option[String]]]])
3535

3636
/** Gpu version of ArgumentMetadata */
37-
case class GpuArgumentMeta(offset: Int, name: Option[String])
37+
class GpuArgumentMeta(val offset: Int, val name: Option[String])

sql-plugin/src/main/scala/com/nvidia/spark/rapids/python/PythonWorkerSemaphore.scala

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import com.nvidia.spark.rapids.python.PythonConfEntries.CONCURRENT_PYTHON_WORKER
2626
import org.apache.commons.lang3.mutable.MutableInt
2727

2828
import org.apache.spark.{SparkEnv, TaskContext}
29-
import org.apache.spark.internal.Logging
3029

3130
/*
3231
* PythonWorkerSemaphore is used to limit the number of Python workers(processes) to be started
@@ -41,7 +40,15 @@ import org.apache.spark.internal.Logging
4140
* the inner semaphore when no longer needed.
4241
*
4342
*/
44-
object PythonWorkerSemaphore extends Logging {
43+
object PythonWorkerSemaphore {
44+
private val log = org.slf4j.LoggerFactory.getLogger(
45+
"com.nvidia.spark.rapids.python.PythonWorkerSemaphore")
46+
47+
private def logDebug(msg: => String): Unit = {
48+
if (log.isDebugEnabled) {
49+
log.debug(msg)
50+
}
51+
}
4552

4653
private lazy val rapidsConf = new RapidsConf(SparkEnv.get.conf)
4754
private lazy val workersPerGpu = rapidsConf.get(CONCURRENT_PYTHON_WORKERS)
@@ -97,7 +104,15 @@ object PythonWorkerSemaphore extends Logging {
97104
}
98105
}
99106

100-
private final class PythonWorkerSemaphore(tasksPerGpu: Int) extends Logging {
107+
private final class PythonWorkerSemaphore(tasksPerGpu: Int) {
108+
private val log = org.slf4j.LoggerFactory.getLogger(classOf[PythonWorkerSemaphore])
109+
110+
private def logDebug(msg: => String): Unit = {
111+
if (log.isDebugEnabled) {
112+
log.debug(msg)
113+
}
114+
}
115+
101116
private val semaphore = new Semaphore(tasksPerGpu)
102117
// Map to track which tasks have acquired the semaphore.
103118
private val activeTasks = new ConcurrentHashMap[Long, MutableInt]

sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/CoalesceConvertIterator.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._
2222
import com.nvidia.spark.rapids.hybrid.{CoalesceBatchConverter => NativeConverter, HybridHostRetryAllocator, RapidsHostColumn}
2323

2424
import org.apache.spark.TaskContext
25-
import org.apache.spark.internal.Logging
2625
import org.apache.spark.sql.catalyst.expressions.Attribute
2726
import org.apache.spark.sql.types.StructType
2827
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
@@ -36,7 +35,13 @@ class CoalesceConvertIterator(cpuScanIter: Iterator[ColumnarBatch],
3635
targetBatchSizeInBytes: Long,
3736
schema: StructType,
3837
metrics: Map[String, GpuMetric])
39-
extends Iterator[Array[RapidsHostColumn]] with Logging {
38+
extends Iterator[Array[RapidsHostColumn]] {
39+
40+
@transient private lazy val log = org.slf4j.LoggerFactory.getLogger(
41+
classOf[CoalesceConvertIterator])
42+
43+
private def logInfo(msg: => String): Unit = if (log.isInfoEnabled) log.info(msg)
44+
4045

4146
private var converterImpl: NativeConverter = _
4247

@@ -140,7 +145,7 @@ class CoalesceConvertIterator(cpuScanIter: Iterator[ColumnarBatch],
140145
}
141146
}
142147

143-
object CoalesceConvertIterator extends Logging {
148+
object CoalesceConvertIterator {
144149
/**
145150
* Consumes the RapidsHostBatchProducer and converts the HostColumnVectors to Device ones.
146151
*/

sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridExecutionUtils.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
2+
* Copyright (c) 2024-2026, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,7 +21,6 @@ import java.util.Locale
2121
import ai.rapids.cudf.DType
2222
import com.nvidia.spark.rapids.{RapidsConf, VersionUtils}
2323

24-
import org.apache.spark.internal.Logging
2524
import org.apache.spark.sql.catalyst.expressions._
2625
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnresolvedHint}
2726
import org.apache.spark.sql.catalyst.trees.TreePattern
@@ -33,7 +32,6 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil
3332
import org.apache.spark.sql.types._
3433

3534
object HybridExecutionUtils extends PredicateHelper {
36-
3735
private val HYBRID_JAR_PLUGIN_CLASS_NAME = "com.nvidia.spark.rapids.hybrid.HybridPluginWrapper"
3836

3937
/**
@@ -434,7 +432,7 @@ object HybridExecutionUtils extends PredicateHelper {
434432
}
435433
}
436434

437-
object HybridExecOverrides extends Logging {
435+
object HybridExecOverrides {
438436
// The SQL hint enables HybridScan for specific tables even if HYBRID_PARQUET_READER is disabled
439437
val HYBRID_SCAN_HINT = "HYBRID_SCAN"
440438

sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/RapidsHostBatchProducer.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2025, NVIDIA CORPORATION.
2+
* Copyright (c) 2025-2026, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,7 +25,6 @@ import com.nvidia.spark.rapids.hybrid.RapidsHostColumn
2525
import com.nvidia.spark.rapids.jni.RmmSpark
2626

2727
import org.apache.spark.TaskContext
28-
import org.apache.spark.internal.Logging
2928
import org.apache.spark.sql.rapids.execution.TrampolineUtil
3029

3130
/**
@@ -92,7 +91,15 @@ class PrefetchHostBatchProducer(
9291
taskAttId: Long,
9392
base: Iterator[Array[RapidsHostColumn]],
9493
capacity: Int,
95-
waitTimeMetric: GpuMetric) extends RapidsHostBatchProducer with Logging {
94+
waitTimeMetric: GpuMetric) extends RapidsHostBatchProducer {
95+
96+
@transient private lazy val log = org.slf4j.LoggerFactory.getLogger(
97+
classOf[PrefetchHostBatchProducer])
98+
99+
private def logInfo(msg: => String): Unit = if (log.isInfoEnabled) log.info(msg)
100+
101+
private def logError(msg: => String): Unit = if (log.isErrorEnabled) log.error(msg)
102+
96103

97104
@volatile private var isInit: Boolean = false
98105
// Mark if there is in-progress element being produced in producerThread

sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuJsonScan.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,15 +268,15 @@ case class GpuJsonScan(
268268
val broadcastedConf = sparkSession.sparkContext.broadcast(
269269
new SerializableConfiguration(hadoopConf))
270270

271-
GpuJsonPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
271+
new GpuJsonPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
272272
dataSchema, readDataSchema, readPartitionSchema, parsedOptions, maxReaderBatchSizeRows,
273273
maxReaderBatchSizeBytes, maxGpuColumnSizeBytes, metrics, options.asScala.toMap)
274274
}
275275

276276
override def withInputFile(): GpuScan = this
277277
}
278278

279-
case class GpuJsonPartitionReaderFactory(
279+
class GpuJsonPartitionReaderFactory(
280280
sqlConf: SQLConf,
281281
broadcastedConf: Broadcast[SerializableConfiguration],
282282
dataSchema: StructType,
@@ -288,7 +288,8 @@ case class GpuJsonPartitionReaderFactory(
288288
maxReaderBatchSizeBytes: Long,
289289
maxGpuColumnSizeBytes: Long,
290290
metrics: Map[String, GpuMetric],
291-
@transient params: Map[String, String]) extends ShimFilePartitionReaderFactory(params) {
291+
@transient params: Map[String, String])
292+
extends ShimFilePartitionReaderFactory(params) with Serializable {
292293

293294
override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
294295
throw new IllegalStateException("ROW BASED PARSING IS NOT SUPPORTED ON THE GPU...")

sql-plugin/src/main/scala/org/apache/spark/sql/catalyst/json/rapids/GpuReadJsonFileFormat.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2025, NVIDIA CORPORATION.
2+
* Copyright (c) 2022-2026, NVIDIA CORPORATION.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,7 +56,7 @@ class GpuReadJsonFileFormat extends JsonFileFormat with GpuReadFileFormatWithMet
5656
sparkSession.sessionState.conf.sessionLocalTimeZone,
5757
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
5858
val rapidsConf = new RapidsConf(sqlConf)
59-
val factory = GpuJsonPartitionReaderFactory(
59+
val factory = new GpuJsonPartitionReaderFactory(
6060
sqlConf,
6161
broadcastedHadoopConf,
6262
dataSchema,
@@ -81,7 +81,7 @@ class GpuReadJsonFileFormat extends JsonFileFormat with GpuReadFileFormatWithMet
8181
}
8282
}
8383

84-
object GpuReadJsonFileFormat {
84+
object GpuReadJsonFileFormat extends Serializable {
8585
def tagSupport(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
8686
val fsse = meta.wrapped
8787
GpuJsonScan.tagSupport(

0 commit comments

Comments
 (0)