Skip to content

Commit c4367bd

Browse files
authored
Migrate CollectTimeIterator to use new NvtxIdWithMetrics class (#12900)
We are in the process of migrating all of our nvtx ranges to use the new documented Nvtx api. Many of the existing ranges are already wrapped in other classes such as the `NvtxWithMetrics` class. This pr establishes a replacement for `NvtxWithMetrics` called `NvtxIdWithMetrics` which provides the same wrapping functionality, but using the documented Nvtx class as input. Then, the PR begins migrating some of the many uses of this wrapper, namely the ranges that go through the `CollectTimeIterator` --------- Signed-off-by: Zach Puller <zpuller@nvidia.com>
1 parent ba2ba29 commit c4367bd

8 files changed

Lines changed: 94 additions & 14 deletions

File tree

docs/dev/nvtx_ranges.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,26 @@ See [nvtx_profiling.md](https://nvidia.github.io/spark-rapids/docs/dev/nvtx_prof
1919

2020
Name | Description
2121
-----|-------------
22-
getMapSizesByExecId|Call to internal Spark API for retrieving size and location of shuffle map output blocks
2322
gpuKudoSerialize|Perform kudo serialization on the gpu
23+
getMapSizesByExecId|Call to internal Spark API for retrieving size and location of shuffle map output blocks
24+
probe right|Probing the right side of a join input iterator to get the data size for preparing the join
25+
fetch join stream|IO time on the stream side data for the following join
26+
shuffled join stream|GpuShuffledHashJoinExec op is preparing build batches for join
2427
Acquire GPU|Time waiting for GPU semaphore to be acquired
2528
Release GPU|Releasing the GPU semaphore
29+
GpuCoalesceBatches: collect|GPU combining of small batches post-kernel processing
2630
gpuKudoSliceBuffers|slice kudo serialized buffers on host into partitions
31+
broadcast join stream|time it takes to materialize a broadcast batch on the host
2732
CommitShuffle|After all temporary shuffle writes are done, produce a single file (shuffle_[map_id]_0) in the commit phase
2833
ParallelDeserializerIterator.next|Calling next on the MT shuffle reader iterator
2934
queueFetched|MT shuffle manager is using the RapidsShuffleBlockFetcherIterator to queue the next set of fetched results
3035
WaitingForWrites|Rapids Shuffle Manager (multi threaded) is waiting for any queued writes to finish before finalizing the map output writer
36+
AbstractGpuCoalesceIterator|Default range for a code path in the AbstractGpuCoalesceIterator for an op which is not explicitly documented in its own range
3137
ThreadedWriter.write|Rapids Shuffle Manager (multi threaded) writing
3238
ThreadedReader.read|Rapids Shuffle Manager (multi threaded) reading
3339
gpuKudoCopyToHost|copy gpu kudo serialized outputs back to the host
40+
hash join build|IO time on the build side data for the following join
41+
probe left|Probing the left side of a join input iterator to get the data size for preparing the join
42+
build batch: collect|Perform a join where the build side fits in a single GPU batch
3443
BatchWait|Rapids Shuffle Manager (multi threaded) reader blocked waiting for batches to finish decoding
3544
RapidsCachingWriter.write|Rapids Shuffle Manager (ucx) writing

sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,15 @@ case class BatchedByKey(gpuOrder: Seq[SortOrder])(val cpuOrder: Seq[SortOrder])
245245
override def children: Seq[Expression] = gpuOrder
246246
}
247247

248+
object OpNameNvtxMap {
249+
private val map = Map(
250+
"GpuCoalesceBatches: collect" -> NvtxRegistry.GPU_COALESCE_BATCHES_COLLECT,
251+
"build batch: collect" -> NvtxRegistry.BUILD_BATCH_COLLECT
252+
)
253+
254+
def get(opName: String): Option[NvtxId] = map.get(opName)
255+
}
256+
248257
abstract class AbstractGpuCoalesceIterator(
249258
inputIter: Iterator[ColumnarBatch],
250259
goal: CoalesceSizeGoal,
@@ -261,7 +270,9 @@ abstract class AbstractGpuCoalesceIterator(
261270
case NoopMetric => new LocalGpuMetric
262271
case _ => streamTimeOrNoop
263272
}
264-
private val iter = new CollectTimeIterator(s"$opName: collect", inputIter, streamTime)
273+
private val iter = new CollectTimeIterator(
274+
OpNameNvtxMap.get(s"$opName: collect").getOrElse(NvtxRegistry.GPU_COALESCE_ITERATOR),
275+
inputIter, streamTime)
265276

266277
private var batchInitialized: Boolean = false
267278

sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMetrics.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ package com.nvidia.spark.rapids
1818

1919
import scala.collection.immutable.TreeMap
2020

21-
import ai.rapids.cudf.NvtxColor
22-
import com.nvidia.spark.rapids.Arm.withResource
2321
import com.nvidia.spark.rapids.metrics.GpuBubbleTimerManager
2422

2523
import org.apache.spark.{SparkContext, TaskContext}
@@ -423,17 +421,17 @@ final class LocalGpuMetric extends GpuMetric {
423421
}
424422

425423
class CollectTimeIterator[T](
426-
nvtxName: String,
424+
nvtxId: NvtxId,
427425
it: Iterator[T],
428426
collectTime: GpuMetric) extends Iterator[T] {
429427
override def hasNext: Boolean = {
430-
withResource(new NvtxWithMetrics(nvtxName, NvtxColor.BLUE, collectTime)) { _ =>
428+
NvtxIdWithMetrics(nvtxId, collectTime) {
431429
it.hasNext
432430
}
433431
}
434432

435433
override def next(): T = {
436-
withResource(new NvtxWithMetrics(nvtxName, NvtxColor.BLUE, collectTime)) { _ =>
434+
NvtxIdWithMetrics(nvtxId, collectTime) {
437435
it.next
438436
}
439437
}

sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledHashJoinExec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ case class GpuShuffledHashJoinExec(
241241
(streamIter, buildIter) => {
242242
val (buildData, maybeBufferedStreamIter) =
243243
GpuShuffledHashJoinExec.prepareBuildBatchesForJoin(buildIter,
244-
new CollectTimeIterator("shuffled join stream", streamIter, streamTime),
244+
new CollectTimeIterator(NvtxRegistry.SHUFFLED_JOIN_STREAM, streamIter, streamTime),
245245
realTarget, localBuildOutput, buildGoal, subPartConf, coalesceMetrics, readOption)
246246

247247
buildData match {
@@ -371,7 +371,7 @@ object GpuShuffledHashJoinExec extends Logging {
371371
} else {
372372
logDebug("Return multiple batches as the build side data for the following " +
373373
"sub-partitioning join")
374-
Right(new CollectTimeIterator("hash join build", gpuBuildIter, buildTime))
374+
Right(new CollectTimeIterator(NvtxRegistry.HASH_JOIN_BUILD, gpuBuildIter, buildTime))
375375
}
376376
}
377377
buildTime += System.nanoTime() - startTime
@@ -421,7 +421,7 @@ object GpuShuffledHashJoinExec extends Logging {
421421
val safeIter = GpuSubPartitionHashJoin.safeIteratorFromSeq(spillBuf.toSeq).map { sp =>
422422
withRetryNoSplit(sp)(_.getColumnarBatch())
423423
} ++ filteredIter
424-
Right(new CollectTimeIterator("hash join build", safeIter, buildTime))
424+
Right(new CollectTimeIterator(NvtxRegistry.HASH_JOIN_BUILD, safeIter, buildTime))
425425
} else {
426426
// The size after filtering is within the target size or sub-partitioning is disabled.
427427
while(filteredIter.hasNext) {

sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffledSizedHashJoinExec.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -672,8 +672,10 @@ object GpuShuffledSymmetricHashJoinExec {
672672
val rightTime = new LocalGpuMetric
673673
val buildTime = metrics(BUILD_TIME)
674674
val streamTime = metrics(STREAM_TIME)
675-
val leftIter = new CollectTimeIterator("probe left", setupForProbe(rawLeftIter), leftTime)
676-
val rightIter = new CollectTimeIterator("probe right", setupForProbe(rawRightIter), rightTime)
675+
val leftIter = new CollectTimeIterator(NvtxRegistry.PROBE_LEFT,
676+
setupForProbe(rawLeftIter), leftTime)
677+
val rightIter = new CollectTimeIterator(NvtxRegistry.PROBE_RIGHT,
678+
setupForProbe(rawRightIter), rightTime)
677679
closeOnExcept(mutable.Queue.empty[T]) { leftQueue =>
678680
closeOnExcept(mutable.Queue.empty[T]) { rightQueue =>
679681
var leftSize = 0L
@@ -723,7 +725,7 @@ object GpuShuffledSymmetricHashJoinExec {
723725
} else {
724726
baseBuildIter
725727
}
726-
val streamIter = new CollectTimeIterator("fetch join stream",
728+
val streamIter = new CollectTimeIterator(NvtxRegistry.FETCH_JOIN_STREAM,
727729
setupForJoin(streamQueue, rawStreamIter, exprs.streamTypes, gpuBatchSizeBytes, metrics),
728730
streamTime)
729731
JoinInfo(joinType, buildSide, buildIter, buildSize, None, streamIter, exprs)

sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxRangeWithDoc.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,34 @@ object NvtxRegistry {
134134
val GET_MAP_SIZES_BY_EXEC_ID: NvtxId = NvtxId("getMapSizesByExecId", NvtxColor.CYAN,
135135
"Call to internal Spark API for retrieving size and location of shuffle map output blocks")
136136

137+
val GPU_COALESCE_BATCHES_COLLECT: NvtxId = NvtxId("GpuCoalesceBatches: collect", NvtxColor.BLUE,
138+
"GPU combining of small batches post-kernel processing")
139+
140+
val BUILD_BATCH_COLLECT: NvtxId = NvtxId("build batch: collect", NvtxColor.BLUE,
141+
"Perform a join where the build side fits in a single GPU batch")
142+
143+
val GPU_COALESCE_ITERATOR: NvtxId = NvtxId("AbstractGpuCoalesceIterator", NvtxColor.BLUE,
144+
"Default range for a code path in the AbstractGpuCoalesceIterator for an op which " +
145+
"is not explicitly documented in its own range")
146+
147+
val SHUFFLED_JOIN_STREAM: NvtxId = NvtxId("shuffled join stream", NvtxColor.BLUE,
148+
"GpuShuffledHashJoinExec op is preparing build batches for join")
149+
150+
val HASH_JOIN_BUILD: NvtxId = NvtxId("hash join build", NvtxColor.BLUE,
151+
"IO time on the build side data for the following join")
152+
153+
val PROBE_LEFT: NvtxId = NvtxId("probe left", NvtxColor.BLUE,
154+
"Probing the left side of a join input iterator to get the data size for preparing the join")
155+
156+
val PROBE_RIGHT: NvtxId = NvtxId("probe right", NvtxColor.BLUE,
157+
"Probing the right side of a join input iterator to get the data size for preparing the join")
158+
159+
val FETCH_JOIN_STREAM: NvtxId = NvtxId("fetch join stream", NvtxColor.BLUE,
160+
"IO time on the stream side data for the following join")
161+
162+
val BROADCAST_JOIN_STREAM: NvtxId = NvtxId("broadcast join stream", NvtxColor.BLUE,
163+
"time it takes to materialize a broadcast batch on the host")
164+
137165
val GPU_KUDO_SERIALIZE: NvtxId = NvtxId("gpuKudoSerialize", NvtxColor.YELLOW,
138166
"Perform kudo serialization on the gpu")
139167

@@ -158,6 +186,15 @@ object NvtxRegistry {
158186
register(QUEUE_FETCHED)
159187
register(RAPIDS_CACHING_WRITER_WRITE)
160188
register(GET_MAP_SIZES_BY_EXEC_ID)
189+
register(GPU_COALESCE_BATCHES_COLLECT)
190+
register(BUILD_BATCH_COLLECT)
191+
register(GPU_COALESCE_ITERATOR)
192+
register(SHUFFLED_JOIN_STREAM)
193+
register(HASH_JOIN_BUILD)
194+
register(PROBE_LEFT)
195+
register(PROBE_RIGHT)
196+
register(FETCH_JOIN_STREAM)
197+
register(BROADCAST_JOIN_STREAM)
161198
register(GPU_KUDO_SERIALIZE)
162199
register(GPU_KUDO_COPY_TO_HOST)
163200
register(GPU_KUDO_SLICE_BUFFERS)

sql-plugin/src/main/scala/com/nvidia/spark/rapids/NvtxWithMetrics.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,29 @@ class NvtxWithMetrics(name: String, color: NvtxColor, val metrics: Seq[GpuMetric
5353
}
5454
}
5555

56+
/**
57+
* NvtxId with option to pass one or more nano timing metric(s) that are updated upon close
58+
* by the amount of time spent in the range
59+
*/
60+
object NvtxIdWithMetrics {
61+
62+
def apply[V](nvtxId: NvtxId, metrics: GpuMetric*)(block: => V): V = {
63+
val needTracks = metrics.map(_.tryActivateTimer(Seq.empty))
64+
val start = System.nanoTime()
65+
66+
try {
67+
nvtxId.apply(block)
68+
} finally {
69+
val time = System.nanoTime() - start
70+
metrics.toSeq.zip(needTracks).foreach { pair =>
71+
if (pair._2) {
72+
pair._1.deactivateTimer(time, Seq.empty)
73+
}
74+
}
75+
}
76+
}
77+
}
78+
5679
class MetricRange(val metrics: Seq[GpuMetric], val excludeMetric: Seq[GpuMetric] = Seq.empty)
5780
extends AutoCloseable {
5881

sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastHashJoinExecBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ abstract class GpuBroadcastHashJoinExecBase(
151151
GpuBroadcastHelper.getBroadcastBuiltBatchAndStreamIter(
152152
broadcastRelation,
153153
buildSchema,
154-
new CollectTimeIterator("broadcast join stream", it, streamTime))
154+
new CollectTimeIterator(NvtxRegistry.BROADCAST_JOIN_STREAM, it, streamTime))
155155
// builtBatch will be closed in doJoin
156156
doJoin(builtBatch, streamIter, targetSize, numOutputRows, numOutputBatches, opTime, joinTime)
157157
}

0 commit comments

Comments
 (0)