Skip to content

Commit 3ac120f

Browse files
Register VACUUM operations in the delta log
This PR registers the start and end of VACUUM operations in the delta log. This means that we commit a commit with no Add/Remove files, and only a `CommitInfo` file which contains the delta operation info. `VacuumStart` operation contains metrics: `numFilesToDelete` and `sizeOfDataToDelete` `VacuumEnd` operation contains metrics: `numDeletedFiles` and `numVacuumedDirectories` New UTs. Expose additional metrics and history in the _delta_log for the start and end of VACUUM operations. Closes #1552. Resolves #868. Co-authored-by: Yann Byron <[email protected]> GitOrigin-RevId: 94805531d022bac4afafd0b672d17b8828d8aa2c
1 parent 57d68b3 commit 3ac120f

File tree

4 files changed

+285
-19
lines changed

4 files changed

+285
-19
lines changed

core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala

+39
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,34 @@ object DeltaOperations {
379379
override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE
380380
}
381381

382+
/**
383+
* @param retentionCheckEnabled - whether retention check was enabled for this run of vacuum.
384+
* @param specifiedRetentionMillis - specified retention interval
385+
* @param defaultRetentionMillis - default retention period for the table
386+
*/
387+
case class VacuumStart(
388+
retentionCheckEnabled: Boolean,
389+
specifiedRetentionMillis: Option[Long],
390+
defaultRetentionMillis: Long) extends Operation("VACUUM START") {
391+
override val parameters: Map[String, Any] = Map(
392+
"retentionCheckEnabled" -> retentionCheckEnabled,
393+
"defaultRetentionMillis" -> defaultRetentionMillis
394+
) ++ specifiedRetentionMillis.map("specifiedRetentionMillis" -> _)
395+
396+
override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_START
397+
}
398+
399+
/**
400+
* @param status - whether the vacuum operation was successful; either "COMPLETED" or "FAILED"
401+
*/
402+
case class VacuumEnd(status: String) extends Operation(s"VACUUM END") {
403+
override val parameters: Map[String, Any] = Map(
404+
"status" -> status
405+
)
406+
407+
override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_END
408+
}
409+
382410

383411
private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
384412
Map(
@@ -497,4 +525,15 @@ private[delta] object DeltaOperationMetrics {
497525
"removedFilesSize", // size in bytes of files removed by the restore
498526
"restoredFilesSize" // size in bytes of files added by the restore
499527
)
528+
529+
val VACUUM_START = Set(
530+
"numFilesToDelete", // number of files that will be deleted by vacuum
531+
"sizeOfDataToDelete" // total size in bytes of files that will be deleted by vacuum
532+
)
533+
534+
val VACUUM_END = Set(
535+
"numDeletedFiles", // number of files deleted by vacuum
536+
"numVacuumedDirectories" // number of directories vacuumed
537+
)
538+
500539
}

core/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala

+138-18
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,14 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
3131
import org.apache.spark.sql.delta.util.DeltaFileOperations
3232
import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
3333
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
34+
import org.apache.hadoop.conf.Configuration
3435
import org.apache.hadoop.fs.{FileSystem, Path}
3536

3637
import org.apache.spark.broadcast.Broadcast
37-
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
38-
import org.apache.spark.sql.internal.SQLConf
38+
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
39+
import org.apache.spark.sql.execution.metric.SQLMetric
40+
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
41+
import org.apache.spark.sql.functions.{col, count, sum}
3942
import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}
4043

4144
/**
@@ -49,6 +52,8 @@ import org.apache.spark.util.{Clock, SerializableConfiguration, SystemClock}
4952
*/
5053
object VacuumCommand extends VacuumCommandImpl with Serializable {
5154

55+
case class FileNameAndSize(path: String, length: Long)
56+
5257
/**
5358
* Additional check on retention duration to prevent people from shooting themselves in the foot.
5459
*/
@@ -204,21 +209,32 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
204209
val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
205210
fileStatusIterator.flatMap { fileStatus =>
206211
if (fileStatus.isDir) {
207-
Iterator.single(relativize(fileStatus.getPath, fs, reservoirBase, isDir = true))
212+
Iterator.single(FileNameAndSize(
213+
relativize(fileStatus.getPath, fs, reservoirBase, isDir = true), 0L))
208214
} else {
209215
val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
210216
val dirsWithSlash = dirs.map { p =>
211-
relativize(new Path(p), fs, reservoirBase, isDir = true)
217+
val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
218+
FileNameAndSize(relativizedPath, 0L)
212219
}
213220
dirsWithSlash ++ Iterator(
214-
relativize(new Path(fileStatus.path), fs, reservoirBase, isDir = false))
221+
FileNameAndSize(relativize(
222+
fileStatus.getPath, fs, reservoirBase, isDir = false), fileStatus.length))
215223
}
216224
}
217-
}.groupBy($"value" as 'path)
218-
.count()
225+
}.groupBy(col("path")).agg(count(new Column("*")).as("count"), sum("length").as("length"))
219226
.join(validFiles, Seq("path"), "leftanti")
220227
.where('count === 1)
221-
.select('path)
228+
229+
val sizeOfDataToDeleteRow = diff.agg(sum("length").cast("long")).first
230+
val sizeOfDataToDelete = if (sizeOfDataToDeleteRow.isNullAt(0)) {
231+
0L
232+
} else {
233+
sizeOfDataToDeleteRow.getLong(0)
234+
}
235+
236+
val diffFiles = diff
237+
.select(col("path"))
222238
.as[String]
223239
.map { relativePath =>
224240
assert(!stringToPath(relativePath).isAbsolute,
@@ -227,31 +243,33 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
227243
}
228244

229245
if (dryRun) {
230-
val numFiles = diff.count()
246+
val numFiles = diffFiles.count()
231247
val stats = DeltaVacuumStats(
232248
isDryRun = true,
233249
specifiedRetentionMillis = retentionMillis,
234250
defaultRetentionMillis = deltaLog.tombstoneRetentionMillis,
235251
minRetainedTimestamp = deleteBeforeTimestamp,
236252
dirsPresentBeforeDelete = dirCounts,
237-
objectsDeleted = numFiles)
253+
objectsDeleted = numFiles,
254+
sizeOfDataToDelete = sizeOfDataToDelete)
238255

239256
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
240-
logConsole(s"Found $numFiles files and directories in a total of " +
241-
s"$dirCounts directories that are safe to delete.")
257+
logConsole(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " +
258+
s"a total of $dirCounts directories that are safe to delete.")
242259

243-
return diff.map(f => stringToPath(f).toString).toDF("path")
260+
return diffFiles.map(f => stringToPath(f).toString).toDF("path")
244261
}
245262
logVacuumStart(
246263
spark,
247264
deltaLog,
248265
path,
249-
diff,
266+
diffFiles,
267+
sizeOfDataToDelete,
250268
retentionMillis,
251269
deltaLog.tombstoneRetentionMillis)
252270

253271
val filesDeleted = try {
254-
delete(diff, spark, basePath, hadoopConf, parallelDeleteEnabled,
272+
delete(diffFiles, spark, basePath, hadoopConf, parallelDeleteEnabled,
255273
parallelDeletePartitions)
256274
} catch { case t: Throwable =>
257275
logVacuumEnd(deltaLog, spark, path)
@@ -263,7 +281,8 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
263281
defaultRetentionMillis = deltaLog.tombstoneRetentionMillis,
264282
minRetainedTimestamp = deleteBeforeTimestamp,
265283
dirsPresentBeforeDelete = dirCounts,
266-
objectsDeleted = filesDeleted)
284+
objectsDeleted = filesDeleted,
285+
sizeOfDataToDelete = sizeOfDataToDelete)
267286
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
268287
logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts))
269288

@@ -277,22 +296,122 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
277296

278297
trait VacuumCommandImpl extends DeltaCommand {
279298

299+
private val supportedFsForLogging = Seq(
300+
"wasbs", "wasbss", "abfs", "abfss", "adl", "gs", "file", "hdfs"
301+
)
302+
303+
/**
304+
* Returns whether we should record vacuum metrics in the delta log.
305+
*/
306+
private def shouldLogVacuum(
307+
spark: SparkSession,
308+
deltaLog: DeltaLog,
309+
hadoopConf: Configuration,
310+
path: Path): Boolean = {
311+
val logVacuumConf = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_LOGGING_ENABLED)
312+
313+
if (logVacuumConf.nonEmpty) {
314+
return logVacuumConf.get
315+
}
316+
317+
val logStore = deltaLog.store
318+
319+
try {
320+
val rawResolvedUri: URI = logStore.resolvePathOnPhysicalStorage(path, hadoopConf).toUri
321+
val scheme = rawResolvedUri.getScheme
322+
if (supportedFsForLogging.contains(scheme)) {
323+
true
324+
} else {
325+
false
326+
}
327+
} catch {
328+
case _: UnsupportedOperationException =>
329+
logWarning("Vacuum event logging" +
330+
" not enabled on this file system because we cannot detect your cloud storage type.")
331+
false
332+
}
333+
}
334+
335+
/**
336+
* Record Vacuum specific metrics in the commit log at the START of vacuum.
337+
*
338+
* @param spark - spark session
339+
* @param deltaLog - DeltaLog of the table
340+
* @param path - the (data) path to the root of the table
341+
* @param diff - the list of paths (files, directories) that are safe to delete
342+
* @param sizeOfDataToDelete - the amount of data (bytes) to be deleted
343+
* @param specifiedRetentionMillis - the optional override retention period (millis) to keep
344+
* logically removed files before deleting them
345+
* @param defaultRetentionMillis - the default retention period (millis)
346+
*/
280347
protected def logVacuumStart(
281348
spark: SparkSession,
282349
deltaLog: DeltaLog,
283350
path: Path,
284351
diff: Dataset[String],
352+
sizeOfDataToDelete: Long,
285353
specifiedRetentionMillis: Option[Long],
286354
defaultRetentionMillis: Long): Unit = {
287-
logInfo(s"Deleting untracked files and empty directories in $path")
355+
logInfo(s"Deleting untracked files and empty directories in $path. The amount of data to be " +
356+
s"deleted is $sizeOfDataToDelete (in bytes)")
357+
358+
// We perform an empty commit in order to record information about the Vacuum
359+
if (shouldLogVacuum(spark, deltaLog, deltaLog.newDeltaHadoopConf(), path)) {
360+
val checkEnabled =
361+
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED)
362+
val txn = deltaLog.startTransaction()
363+
val metrics = Map[String, SQLMetric](
364+
"numFilesToDelete" -> createMetric(spark.sparkContext, "number of files to deleted"),
365+
"sizeOfDataToDelete" -> createMetric(spark.sparkContext,
366+
"The total amount of data to be deleted in bytes")
367+
)
368+
metrics("numFilesToDelete").set(diff.count())
369+
metrics("sizeOfDataToDelete").set(sizeOfDataToDelete)
370+
txn.registerSQLMetrics(spark, metrics)
371+
txn.commit(actions = Seq(), DeltaOperations.VacuumStart(
372+
checkEnabled,
373+
specifiedRetentionMillis,
374+
defaultRetentionMillis
375+
))
376+
}
288377
}
289378

379+
/**
380+
* Record Vacuum specific metrics in the commit log at the END of vacuum.
381+
*
382+
* @param deltaLog - DeltaLog of the table
383+
* @param spark - spark session
384+
* @param path - the (data) path to the root of the table
385+
* @param filesDeleted - if the vacuum completed this will contain the number of files deleted.
386+
* if the vacuum failed, this will be None.
387+
* @param dirCounts - if the vacuum completed this will contain the number of directories
388+
* vacuumed. if the vacuum failed, this will be None.
389+
*/
290390
protected def logVacuumEnd(
291391
deltaLog: DeltaLog,
292392
spark: SparkSession,
293393
path: Path,
294394
filesDeleted: Option[Long] = None,
295395
dirCounts: Option[Long] = None): Unit = {
396+
if (shouldLogVacuum(spark, deltaLog, deltaLog.newDeltaHadoopConf(), path)) {
397+
val txn = deltaLog.startTransaction()
398+
val status = if (filesDeleted.isEmpty && dirCounts.isEmpty) { "FAILED" } else { "COMPLETED" }
399+
if (filesDeleted.nonEmpty && dirCounts.nonEmpty) {
400+
val metrics = Map[String, SQLMetric](
401+
"numDeletedFiles" -> createMetric(spark.sparkContext, "number of files deleted."),
402+
"numVacuumedDirectories" ->
403+
createMetric(spark.sparkContext, "num of directories vacuumed."),
404+
"status" -> createMetric(spark.sparkContext, "status of vacuum")
405+
)
406+
metrics("numDeletedFiles").set(filesDeleted.get)
407+
metrics("numVacuumedDirectories").set(dirCounts.get)
408+
txn.registerSQLMetrics(spark, metrics)
409+
}
410+
txn.commit(actions = Seq(), DeltaOperations.VacuumEnd(
411+
status
412+
))
413+
}
414+
296415
if (filesDeleted.nonEmpty) {
297416
logConsole(s"Deleted ${filesDeleted.get} files and directories in a total " +
298417
s"of ${dirCounts.get} directories.")
@@ -362,4 +481,5 @@ case class DeltaVacuumStats(
362481
defaultRetentionMillis: Long,
363482
minRetainedTimestamp: Long,
364483
dirsPresentBeforeDelete: Long,
365-
objectsDeleted: Long)
484+
objectsDeleted: Long,
485+
sizeOfDataToDelete: Long)

core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

+9
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,15 @@ trait DeltaSQLConfBase {
270270
.booleanConf
271271
.createWithDefault(true)
272272

273+
val DELTA_VACUUM_LOGGING_ENABLED =
274+
buildConf("vacuum.logging.enabled")
275+
.doc("Whether to log vacuum information into the Delta transaction log." +
276+
" 'spark.databricks.delta.commitInfo.enabled' should be enabled when using this config." +
277+
" Users should only set this config to 'true' when the underlying file system safely" +
278+
" supports concurrent writes.")
279+
.booleanConf
280+
.createOptional
281+
273282
val DELTA_VACUUM_RETENTION_CHECK_ENABLED =
274283
buildConf("retentionDurationCheck.enabled")
275284
.doc("Adds a check preventing users from running vacuum with a very short retention " +

0 commit comments

Comments
 (0)