Skip to content

Commit 08da649

Browse files
authored
[VL] Remove metrics try-catch, fix Generate metrics, and handle missing join/aggregation params (#11861)
This PR removes the silent try/catch in MetricsUtil.genMetricsUpdatingFunction that was masking metrics bugs and fixes metrics issues across different files: Adds proper Generate transformer metrics to VeloxMetricsApi Registers JoinParams for CartesianProductExecTransformer with postProjectionNeeded = false Adds null-safe defaults for missing join/aggregation params in metrics updates
1 parent 89a77b6 commit 08da649

File tree

7 files changed

+94
-53
lines changed

7 files changed

+94
-53
lines changed

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,26 @@ class VeloxMetricsApi extends MetricsApi with Logging {
358358
override def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
359359
new ExpandMetricsUpdater(metrics)
360360

361+
override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
362+
Map(
363+
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
364+
"numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
365+
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
366+
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of generate"),
367+
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
368+
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
369+
"numMemoryAllocations" -> SQLMetrics.createMetric(
370+
sparkContext,
371+
"number of memory allocations"),
372+
"loadLazyVectorTime" -> SQLMetrics.createNanoTimingMetric(
373+
sparkContext,
374+
"time of loading lazy vectors")
375+
)
376+
377+
override def genGenerateTransformerMetricsUpdater(
378+
metrics: Map[String, SQLMetric]): MetricsUpdater =
379+
new GenerateMetricsUpdater(metrics)
380+
361381
override def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
362382
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
363383

backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.gluten.execution
1818

1919
import org.apache.gluten.backendsapi.BackendsApiManager
2020
import org.apache.gluten.execution.GenerateExecTransformer.supportsGenerate
21-
import org.apache.gluten.metrics.{GenerateMetricsUpdater, MetricsUpdater}
21+
import org.apache.gluten.metrics.MetricsUpdater
2222
import org.apache.gluten.substrait.SubstraitContext
2323
import org.apache.gluten.substrait.expression.ExpressionNode
2424
import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, ExtensionBuilder}
@@ -27,7 +27,6 @@ import org.apache.gluten.utils.PullOutProjectHelper
2727

2828
import org.apache.spark.sql.catalyst.expressions._
2929
import org.apache.spark.sql.execution.{GenerateExec, ProjectExec, SparkPlan}
30-
import org.apache.spark.sql.execution.metric.SQLMetrics
3130
import org.apache.spark.sql.types.{BooleanType, IntegerType}
3231

3332
import com.google.protobuf.StringValue
@@ -50,19 +49,10 @@ case class GenerateExecTransformer(
5049

5150
@transient
5251
override lazy val metrics =
53-
Map(
54-
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
55-
"numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
56-
"numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
57-
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of generate"),
58-
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
59-
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
60-
"numMemoryAllocations" -> SQLMetrics.createMetric(
61-
sparkContext,
62-
"number of memory allocations")
63-
)
52+
BackendsApiManager.getMetricsApiInstance.genGenerateTransformerMetrics(sparkContext)
6453

65-
override def metricsUpdater(): MetricsUpdater = new GenerateMetricsUpdater(metrics)
54+
override def metricsUpdater(): MetricsUpdater =
55+
BackendsApiManager.getMetricsApiInstance.genGenerateTransformerMetricsUpdater(metrics)
6656

6757
override protected def withNewChildInternal(newChild: SparkPlan): GenerateExecTransformer =
6858
copy(generator, requiredChildOutput, outer, generatorOutput, newChild)

backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,14 @@ abstract class JoinMetricsUpdaterBase(val metrics: Map[String, SQLMetric])
4646
joinMetrics: util.ArrayList[OperatorMetrics],
4747
singleMetrics: SingleMetric,
4848
joinParams: JoinParams): Unit = {
49-
assert(joinParams.postProjectionNeeded)
50-
val postProjectMetrics = joinMetrics.remove(0)
51-
postProjectionCpuCount += postProjectMetrics.cpuCount
52-
postProjectionWallNanos += postProjectMetrics.wallNanos
53-
numOutputRows += postProjectMetrics.outputRows
54-
numOutputVectors += postProjectMetrics.outputVectors
55-
numOutputBytes += postProjectMetrics.outputBytes
49+
if (joinParams.postProjectionNeeded) {
50+
val postProjectMetrics = joinMetrics.remove(0)
51+
postProjectionCpuCount += postProjectMetrics.cpuCount
52+
postProjectionWallNanos += postProjectMetrics.wallNanos
53+
numOutputRows += postProjectMetrics.outputRows
54+
numOutputVectors += postProjectMetrics.outputVectors
55+
numOutputBytes += postProjectMetrics.outputBytes
56+
}
5657

5758
updateJoinMetricsInternal(joinMetrics, joinParams)
5859
}

backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -253,27 +253,28 @@ object MetricsUtil extends Logging {
253253

254254
mutNode.updater match {
255255
case smj: SortMergeJoinMetricsUpdater =>
256-
smj.updateJoinMetrics(
257-
operatorMetrics,
258-
metrics.getSingleMetrics,
259-
joinParamsMap.get(operatorIdx))
256+
val joinParams = Option(joinParamsMap.get(operatorIdx)).getOrElse {
257+
val p = JoinParams(); p.postProjectionNeeded = false; p
258+
}
259+
smj.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, joinParams)
260260
case ju: JoinMetricsUpdaterBase =>
261261
// JoinRel and CrossRel output two suites of metrics respectively for build and probe.
262262
// Therefore, fetch one more suite of metrics here.
263263
operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
264264
curMetricsIdx -= 1
265-
ju.updateJoinMetrics(
266-
operatorMetrics,
267-
metrics.getSingleMetrics,
268-
joinParamsMap.get(operatorIdx))
265+
val joinParams = Option(joinParamsMap.get(operatorIdx)).getOrElse {
266+
val p = JoinParams(); p.postProjectionNeeded = false; p
267+
}
268+
ju.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, joinParams)
269269
case u: UnionMetricsUpdater =>
270270
// JoinRel outputs two suites of metrics respectively for hash build and hash probe.
271271
// Therefore, fetch one more suite of metrics here.
272272
operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
273273
curMetricsIdx -= 1
274274
u.updateUnionMetrics(operatorMetrics)
275275
case hau: HashAggregateMetricsUpdater =>
276-
hau.updateAggregationMetrics(operatorMetrics, aggParamsMap.get(operatorIdx))
276+
val aggParams = Option(aggParamsMap.get(operatorIdx)).getOrElse(AggregationParams())
277+
hau.updateAggregationMetrics(operatorMetrics, aggParams)
277278
case lu: LimitMetricsUpdater =>
278279
// Limit over Sort is converted to TopN node in Velox, so there is only one suite of metrics
279280
// for the two transformers. We do not update metrics for limit and leave it for sort.
@@ -342,30 +343,24 @@ object MetricsUtil extends Logging {
342343
aggParamsMap: JMap[JLong, AggregationParams],
343344
taskStatsAccumulator: TaskStatsAccumulator): IMetrics => Unit = {
344345
imetrics =>
345-
try {
346-
val metrics = imetrics.asInstanceOf[Metrics]
347-
val numNativeMetrics = metrics.inputRows.length
348-
if (numNativeMetrics == 0) {
349-
()
350-
} else {
351-
updateTransformerMetricsInternal(
352-
mutNode,
353-
relMap,
354-
operatorIdx,
355-
metrics,
356-
numNativeMetrics - 1,
357-
joinParamsMap,
358-
aggParamsMap)
346+
val metrics = imetrics.asInstanceOf[Metrics]
347+
val numNativeMetrics = metrics.inputRows.length
348+
if (numNativeMetrics == 0) {
349+
()
350+
} else {
351+
updateTransformerMetricsInternal(
352+
mutNode,
353+
relMap,
354+
operatorIdx,
355+
metrics,
356+
numNativeMetrics - 1,
357+
joinParamsMap,
358+
aggParamsMap)
359359

360-
// Update the task stats accumulator with the metrics.
361-
if (metrics.taskStats != null) {
362-
taskStatsAccumulator.add(metrics.taskStats)
363-
}
360+
// Update the task stats accumulator with the metrics.
361+
if (metrics.taskStats != null) {
362+
taskStatsAccumulator.add(metrics.taskStats)
364363
}
365-
} catch {
366-
case e: Exception =>
367-
logWarning(s"Updating native metrics failed due to ${e.getCause}.")
368-
()
369364
}
370365
}
371366
}

backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,27 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
152152
assert(metrics("numOutputVectors").value > 0)
153153
assert(metrics("numOutputBytes").value > 0)
154154
}
155+
156+
runQueryAndCompare(
157+
"SELECT c1, col FROM metrics_t1 LATERAL VIEW explode(array(c1, c2)) t AS col") {
158+
df =>
159+
val scan = find(df.queryExecution.executedPlan) {
160+
case _: FileSourceScanExecTransformer => true
161+
case _ => false
162+
}
163+
assert(scan.isDefined)
164+
val scanMetrics = scan.get.metrics
165+
assert(scanMetrics("rawInputRows").value > 0)
166+
167+
val generate = find(df.queryExecution.executedPlan) {
168+
case _: GenerateExecTransformer => true
169+
case _ => false
170+
}
171+
assert(generate.isDefined)
172+
val genMetrics = generate.get.metrics
173+
assert(genMetrics("numOutputRows").value == 2 * scanMetrics("rawInputRows").value)
174+
assert(genMetrics.contains("loadLazyVectorTime"))
175+
}
155176
}
156177

157178
test("Metrics of window") {

gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ trait MetricsApi extends Serializable {
8282

8383
def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater
8484

85+
def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
86+
throw new UnsupportedOperationException()
87+
88+
def genGenerateTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
89+
throw new UnsupportedOperationException()
90+
8591
def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, SQLMetric]
8692

8793
def genColumnarShuffleExchangeMetrics(

gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
2020
import org.apache.gluten.expression.ExpressionConverter
2121
import org.apache.gluten.extension.columnar.transition.Convention
2222
import org.apache.gluten.metrics.MetricsUpdater
23-
import org.apache.gluten.substrait.SubstraitContext
23+
import org.apache.gluten.substrait.{JoinParams, SubstraitContext}
2424
import org.apache.gluten.substrait.rel.RelBuilder
2525
import org.apache.gluten.utils.SubstraitUtil
2626

@@ -96,6 +96,11 @@ case class CartesianProductExecTransformer(
9696
JoinUtils.createExtensionNode(inputLeftOutput ++ inputRightOutput, validation = false)
9797

9898
val operatorId = context.nextOperatorId(this.nodeName)
99+
val joinParams = new JoinParams
100+
joinParams.postProjectionNeeded = false
101+
if (condition.isDefined) {
102+
joinParams.isWithCondition = true
103+
}
99104

100105
val currRel = RelBuilder.makeCrossRel(
101106
inputLeftRelNode,
@@ -106,6 +111,9 @@ case class CartesianProductExecTransformer(
106111
context,
107112
operatorId
108113
)
114+
115+
context.registerJoinParam(operatorId, joinParams)
116+
109117
TransformContext(output, currRel)
110118
}
111119

0 commit comments

Comments
 (0)