Skip to content

Commit 1320bf9

Browse files
rahulsmahadevtdas
authored andcommitted
[SC-33980][WARMFIX][DELTA] Fix incorrect metrics in Delete/Update and add more tests
## What changes were proposed in this pull request? - Fix inccorect way of capturing number of copied rows - copied rows were incorrectly computed based on scanned files. Changing this to use the computed write stats in DeleteCommand. For UpdateCommand we now use the udf in the right place. - Fix incorrect way of capturing number of removed files ## How was this patch tested? - added more tests - changed existing tests Author: Rahul Mahadev <[email protected]> #9576 is resolved by rahulsmahadev/rowLevelHistoryFix. GitOrigin-RevId: 83b991e4952a263549e1de2733003885c89737a9
1 parent 2115525 commit 1320bf9

File tree

5 files changed

+64
-54
lines changed

5 files changed

+64
-54
lines changed

src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,12 @@ object DeltaOperations {
7575
override val operationMetrics: Set[String] = DeltaOperationMetrics.DELETE
7676

7777
override def transformMetrics(metrics: Map[String, SQLMetric]): Map[String, String] = {
78-
// find the case where deletedRows are not captured
79-
val numTotalRows = metrics("numTotalRows").value
8078
var strMetrics = super.transformMetrics(metrics)
81-
strMetrics += "numCopiedRows" -> (numTotalRows -
82-
metrics("numDeletedRows").value).toString
83-
if (strMetrics("numDeletedRows") == "0" && strMetrics("numCopiedRows") == "0" &&
84-
strMetrics("numRemovedFiles") != "0") {
79+
if (metrics.contains("numOutputRows")) {
80+
strMetrics += "numCopiedRows" -> metrics("numOutputRows").value.toString
81+
}
82+
// find the case where deletedRows are not captured
83+
if (strMetrics("numDeletedRows") == "0" && strMetrics("numRemovedFiles") != "0") {
8584
// identify when row level metrics are unavailable. This will happen when the entire
8685
// table or partition are deleted.
8786
strMetrics -= "numDeletedRows"
@@ -149,15 +148,18 @@ object DeltaOperations {
149148
override val operationMetrics: Set[String] = DeltaOperationMetrics.UPDATE
150149

151150
override def transformMetrics(metrics: Map[String, SQLMetric]): Map[String, String] = {
152-
val numTotalRows = metrics("numTotalRows").value
153151
val numOutputRows = metrics("numOutputRows").value
154152
val numUpdatedRows = metrics("numUpdatedRows").value
155153
var strMetrics = super.transformMetrics(metrics)
156-
strMetrics += "numCopiedRows" -> (numTotalRows - numUpdatedRows).toString
157154
// In the case where the numUpdatedRows is not captured in the UpdateCommand implementation
158155
// we can siphon out the metrics from the BasicWriteStatsTracker for that command.
159-
if(numTotalRows == 0 && numUpdatedRows == 0 && numOutputRows != 0) {
156+
// This is for the case where the entire partition is re-written.
157+
if (numUpdatedRows == 0 && numOutputRows != 0) {
160158
strMetrics += "numUpdatedRows" -> numOutputRows.toString
159+
strMetrics += "numCopiedRows" -> "0"
160+
} else {
161+
strMetrics += "numCopiedRows" -> (
162+
numOutputRows - strMetrics("numUpdatedRows").toInt).toString
161163
}
162164
strMetrics
163165
}

src/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ case class DeleteCommand(
5656
override lazy val metrics = Map[String, SQLMetric](
5757
"numRemovedFiles" -> createMetric(sc, "number of files removed."),
5858
"numAddedFiles" -> createMetric(sc, "number of files added."),
59-
"numDeletedRows" -> createMetric(sc, "number of rows deleted."),
60-
"numTotalRows" -> createMetric(sc, "total number of rows.")
59+
"numDeletedRows" -> createMetric(sc, "number of rows deleted.")
6160
)
6261

6362
final override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -96,6 +95,7 @@ case class DeleteCommand(
9695
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
9796

9897
val operationTimestamp = System.currentTimeMillis()
98+
metrics("numRemovedFiles").set(allFiles.size)
9999
allFiles.map(_.removeWithTimestamp(operationTimestamp))
100100
case Some(cond) =>
101101
val (metadataPredicates, otherPredicates) =
@@ -111,6 +111,7 @@ case class DeleteCommand(
111111
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
112112
numTouchedFiles = candidateFiles.size
113113

114+
metrics("numRemovedFiles").set(numTouchedFiles)
114115
candidateFiles.map(_.removeWithTimestamp(operationTimestamp))
115116
} else {
116117
// Case 3: Delete the rows based on the condition.
@@ -125,12 +126,7 @@ case class DeleteCommand(
125126
// that only involves the affected files instead of all files.
126127
val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)
127128
val data = Dataset.ofRows(sparkSession, newTarget)
128-
val totalRowsCount = metrics("numTotalRows")
129129
val deletedRowCount = metrics("numDeletedRows")
130-
val totalRowUdf = udf { () =>
131-
totalRowsCount += 1
132-
true
133-
}.asNondeterministic()
134130
val deletedRowUdf = udf { () =>
135131
deletedRowCount += 1
136132
true
@@ -141,14 +137,14 @@ case class DeleteCommand(
141137
Array.empty[String]
142138
} else {
143139
data
144-
.filter(totalRowUdf())
145140
.filter(new Column(cond))
146141
.filter(deletedRowUdf())
147142
.select(new Column(InputFileName())).distinct()
148143
.as[String].collect()
149144
}
150145
}
151146

147+
metrics("numRemovedFiles").set(filesToRewrite.size)
152148
scanTimeMs = (System.nanoTime() - startTime) / 1000 / 1000
153149
if (filesToRewrite.isEmpty) {
154150
// Case 3.1: no row matches and no delete will be triggered
@@ -181,7 +177,6 @@ case class DeleteCommand(
181177
}
182178
}
183179
if (deleteActions.nonEmpty) {
184-
metrics("numRemovedFiles").set(numTouchedFiles)
185180
metrics("numAddedFiles").set(numRewrittenFiles)
186181
txn.registerSQLMetrics(sparkSession, metrics)
187182
txn.commit(deleteActions, DeltaOperations.Delete(condition.map(_.sql).toSeq))

src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ case class UpdateCommand(
5656
override lazy val metrics = Map[String, SQLMetric](
5757
"numAddedFiles" -> createMetric(sc, "number of files added."),
5858
"numRemovedFiles" -> createMetric(sc, "number of files removed."),
59-
"numUpdatedRows" -> createMetric(sc, "number of rows updated."),
60-
"numTotalRows" -> createMetric(sc, "number of rows copied.")
59+
"numUpdatedRows" -> createMetric(sc, "number of rows updated.")
6160
)
6261

6362
final override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -123,20 +122,14 @@ case class UpdateCommand(
123122
// that only involves the affected files instead of all files.
124123
val newTarget = DeltaTableUtils.replaceFileIndex(target, fileIndex)
125124
val data = Dataset.ofRows(sparkSession, newTarget)
126-
val totalRowsCount = metrics("numTotalRows")
127125
val updatedRowCount = metrics("numUpdatedRows")
128-
val totalRowUdf = udf { () =>
129-
totalRowsCount += 1
130-
true
131-
}.asNondeterministic()
132126
val updatedRowUdf = udf { () =>
133127
updatedRowCount += 1
134128
true
135129
}.asNondeterministic()
136130
val filesToRewrite =
137131
withStatusCode("DELTA", s"Finding files to rewrite for UPDATE operation") {
138-
data.filter(totalRowUdf())
139-
.filter(new Column(updateCondition))
132+
data.filter(new Column(updateCondition))
140133
.filter(updatedRowUdf())
141134
.select(input_file_name())
142135
.distinct().as[String].collect()

src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ trait DeltaVacuumSuiteBase extends QueryTest
378378
"numRemovedFiles" -> createMetric(sparkContext, "number of files removed."),
379379
"numAddedFiles" -> createMetric(sparkContext, "number of files added."),
380380
"numDeletedRows" -> createMetric(sparkContext, "number of rows deleted."),
381-
"numTotalRows" -> createMetric(sparkContext, "total number of rows.")
381+
"numCopiedRows" -> createMetric(sparkContext, "total number of rows.")
382382
)
383383
txn.registerSQLMetrics(spark, metrics)
384384
txn.commit(Seq(RemoveFile(path, Option(clock.getTimeMillis()))), Delete("true" :: Nil))

src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -452,29 +452,40 @@ trait DescribeDeltaHistorySuiteBase
452452

453453
test("operation metrics - update") {
454454
withSQLConf(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true") {
455-
val numRows = 100
456-
val numPartitions = 5
457455
withTempDir { tempDir =>
458-
// Create a Delta table
459-
spark.range(numRows).repartition(numPartitions)
460-
.withColumnRenamed("id", "key")
456+
// Create the initial table as a single file
457+
Seq(1, 2, 5, 11, 21, 3, 4, 6, 9, 7, 8, 0).toDF("key")
461458
.withColumn("value", 'key % 2)
462459
.write
463460
.format("delta")
464461
.save(tempDir.getAbsolutePath)
465-
val deltaTable = io.delta.tables.DeltaTable.forPath(tempDir.getAbsolutePath)
466462

467-
// update some records
468-
deltaTable.update(col("key") < 1, Map("key" -> lit(1)))
463+
// append additional data with the same number range to the table.
464+
// This data is saved as a separate file as well
465+
Seq(15, 16, 17).toDF("key")
466+
.withColumn("value", 'key % 2)
467+
.repartition(1)
468+
.write
469+
.format("delta")
470+
.mode("append")
471+
.save(tempDir.getAbsolutePath)
472+
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getAbsolutePath)
473+
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
474+
deltaLog.snapshot.numOfFiles
475+
476+
// update the table
477+
deltaTable.update(col("key") === lit("16"), Map("value" -> lit("1")))
478+
// The file from the append gets updated but the file from the initial table gets scanned
479+
// as well. We want to make sure numCopied rows is calculated from written files and not
480+
// scanned files[SC-33980]
469481

470-
// check operation metrics
482+
// get operation metrics
471483
val operationMetrics = getOperationMetrics(deltaTable.history(1))
472-
var expectedRowCount = numRows - 1
473484
val expectedMetrics = Map(
474485
"numAddedFiles" -> "1",
475486
"numRemovedFiles" -> "1",
476487
"numUpdatedRows" -> "1",
477-
"numCopiedRows" -> expectedRowCount.toString
488+
"numCopiedRows" -> "2" // There should be only three rows in total(updated + copied)
478489
)
479490
checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE)
480491
}
@@ -517,31 +528,40 @@ trait DescribeDeltaHistorySuiteBase
517528

518529
test("operation metrics - delete") {
519530
withSQLConf(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true") {
520-
val numRows = 100
521-
val rowsToDelete = 10
522531
withTempDir { tempDir =>
523-
// Create a delta table
524-
spark.range(numRows).repartition(5)
525-
.withColumnRenamed("id", "key")
532+
// Create the initial table as a single file
533+
Seq(1, 2, 5, 11, 21, 3, 4, 6, 9, 7, 8, 0).toDF("key")
526534
.withColumn("value", 'key % 2)
535+
.repartition(1)
527536
.write
528537
.format("delta")
529538
.save(tempDir.getAbsolutePath)
530-
val deltaTable = io.delta.tables.DeltaTable.forPath(tempDir.getAbsolutePath)
539+
540+
// Append to the initial table additional data in the same numerical range
541+
Seq(15, 16, 17).toDF("key")
542+
.withColumn("value", 'key % 2)
543+
.repartition(1)
544+
.write
545+
.format("delta")
546+
.mode("append")
547+
.save(tempDir.getAbsolutePath)
548+
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getAbsolutePath)
531549
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
532-
val numFilesBeforeDelete = deltaLog.snapshot.numOfFiles
550+
deltaLog.snapshot.numOfFiles
533551

534-
// delete records
535-
deltaTable.delete(col("key") < rowsToDelete)
552+
// delete the table
553+
deltaTable.delete(col("key") === lit("16"))
554+
// The file from the append gets deleted but the file from the initial table gets scanned
555+
// as well. We want to make sure numCopied rows is calculated from the written files instead
556+
// of the scanned files.[SC-33980]
536557

537-
// check operation metrics
538-
val numFilesAfterDelete = deltaLog.snapshot.numOfFiles
558+
// get operation metrics
539559
val operationMetrics = getOperationMetrics(deltaTable.history(1))
540560
val expectedMetrics = Map(
541-
"numAddedFiles" -> numFilesAfterDelete.toString,
542-
"numRemovedFiles" -> numFilesBeforeDelete.toString,
543-
"numDeletedRows" -> rowsToDelete.toString,
544-
"numCopiedRows" -> (numRows - rowsToDelete).toString
561+
"numAddedFiles" -> "1",
562+
"numRemovedFiles" -> "1",
563+
"numDeletedRows" -> "1",
564+
"numCopiedRows" -> "2" // There should be only three rows in total(deleted + copied)
545565
)
546566
checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.DELETE)
547567
}

0 commit comments

Comments
 (0)