Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,9 @@ object DeltaHistoryManager extends DeltaLogging {
private val maybeDeleteFiles = new mutable.ArrayBuffer[FileStatus]()
private var lastFile: FileStatus = _
private var hasNextCalled: Boolean = false
// A map to keep track of multi-part checkpoints.
val checkpointMap = new scala.collection.mutable.HashMap[(Long, Int),
collection.mutable.Buffer[FileStatus]]()

private def init(): Unit = {
if (underlying.hasNext) {
Expand Down Expand Up @@ -856,23 +859,39 @@ object DeltaHistoryManager extends DeltaLogging {
*/
private def queueFilesInBuffer(): Unit = {
var continueBuffering = true
while (continueBuffering) {
if (!underlying.hasNext) {
flushBuffer()
return
}

while (continueBuffering && underlying.hasNext) {
var currentFile = underlying.next()
require(currentFile != null, "FileStatus iterator returned null")
if (needsTimeAdjustment(currentFile)) {
currentFile = new FileStatus(
currentFile.getLen, currentFile.isDirectory, currentFile.getReplication,
currentFile.getBlockSize, lastFile.getModificationTime + 1, currentFile.getPath)
maybeDeleteFiles.append(currentFile)
} else if (FileNames.isCheckpointFile(currentFile) && currentFile.getLen > 0) {
// Only flush the buffer when we find a checkpoint. This is because we don't want to
// delete the delta log files unless we have a checkpoint to ensure that non-expired
// subsequent delta logs are valid.
val numParts = FileNames.numCheckpointParts(currentFile.getPath)

if (numParts.isEmpty) { // Single-part or V2
flushBuffer()
maybeDeleteFiles.append(currentFile)
continueBuffering = false
} else {
// Multi-part checkpoint
val mpKey = versionGetter(currentFile.getPath) -> numParts.get
val partBuffer = checkpointMap.getOrElse(mpKey, mutable.ArrayBuffer())
partBuffer.append(currentFile)
checkpointMap.put(mpKey, partBuffer)
if (numParts.get == partBuffer.size) {
flushBuffer()
partBuffer.foreach(f => maybeDeleteFiles.append(f))
checkpointMap.remove(mpKey)
continueBuffering = false
}
}
} else {
flushBuffer()
maybeDeleteFiles.append(currentFile)
continueBuffering = false
}
lastFile = currentFile
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,222 @@ class DeltaRetentionSuite extends QueryTest
}
}

(Seq(("Default", Seq.empty[(String, String)])) ++ CheckpointPolicy.ALL.map {
case CheckpointPolicy.Classic =>
Seq(
("Classic", Seq(
DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.Classic.name)),
("Multipart", Seq(
DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.Classic.name,
DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> "1"))
)
case CheckpointPolicy.V2 =>
V2Checkpoint.Format.ALL_AS_STRINGS.map { v2CheckpointFormat =>
(s"V2 $v2CheckpointFormat",
Seq(DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name,
DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> v2CheckpointFormat))
}
}.flatten).foreach { case (chkConfigName, chkConfig) =>
test(s"cleanup does not delete the checkpoint if it is required by non-expired versions. " +
s"Config: $chkConfigName.") {
withSQLConf(chkConfig: _*) {
withTempDir { tempDir =>
val startTime = getStartTimeForRetentionTest
val clock = new ManualClock(startTime)
val actualTestStartTime = System.currentTimeMillis()
val tableReference = s"delta.`${tempDir.getCanonicalPath()}`"
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock)
val logPath = new File(log.logPath.toUri)
val minChksCount = if (chkConfigName == "Multipart") { 2 } else { 1 }

// commit 0
spark.sql(
s"""CREATE TABLE $tableReference (id Int) USING delta
| TBLPROPERTIES('delta.enableChangeDataFeed' = true)
""".stripMargin)
// Set time for commit 0 to ensure that the commits don't need timestamp adjustment.
val commit0Time = clock.getTimeMillis()
new File(FileNames.unsafeDeltaFile(log.logPath, 0).toUri).setLastModified(commit0Time)
new File(FileNames.checksumFile(log.logPath, 0).toUri).setLastModified(commit0Time)

def commitNewVersion(version: Long): Unit = {
spark.sql(s"INSERT INTO $tableReference VALUES (1)")

val deltaFile = new File(FileNames.unsafeDeltaFile(log.logPath, version).toUri)
val time = clock.getTimeMillis() + version * 1000
deltaFile.setLastModified(time)
val crcFile = new File(FileNames.checksumFile(log.logPath, version).toUri)
crcFile.setLastModified(time)
val chks = getCheckpointFiles(logPath)
.filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == version)

if (version % 10 == 0) {
assert(chks.length >= minChksCount)
chks.foreach { chk =>
assert(chk.exists())
chk.setLastModified(time)
}
} else { assert(chks.isEmpty) }
}

// Day 0: Add commits 1 to 15 --> creates 1 checkpoint at Day 0 for version 10
(1L to 15L).foreach(commitNewVersion)

// ensure that the checkpoint at version 10 exists
val checkpoint10Files = getCheckpointFiles(logPath)
.filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == 10)
assert(checkpoint10Files.length >= minChksCount)
assert(checkpoint10Files.forall(_.exists))
val deltaFiles = (0 to 15).map { i =>
new File(FileNames.unsafeDeltaFile(log.logPath, i).toUri)
}
deltaFiles.foreach { f =>
assert(f.exists())
}

// Day 35: Add commits 16 to 25 --> creates a checkpoint at Day 35 for version 20
clock.setTime(day(startTime, 35))
(16L to 25L).foreach(commitNewVersion)

assert(checkpoint10Files.forall(_.exists))
deltaFiles.foreach { f =>
assert(f.exists())
}

// auto cleanup is disabled in DeltaRetentionSuiteBase so tests have control when it happens
cleanUpExpiredLogs(log)

// assert that the checkpoint from day 0 (at version 10) and all the commits after
// that are still there
assert(checkpoint10Files.forall(_.exists))
deltaFiles.foreach { f =>
val version = FileNames.deltaVersion(new Path(f.toString()))
if (version < 10) {
assert(!f.exists, version)
} else {
assert(f.exists, version)
}
}

// Validate we can time travel to version >=10
val earliestExpectedChkVersion = 10
(0 to 25).map { version =>
val sqlCommand = s"SELECT * FROM $tableReference VERSION AS OF $version"
if (version < earliestExpectedChkVersion) {
val ex = intercept[org.apache.spark.sql.delta.VersionNotFoundException] {
spark.sql(sqlCommand).collect()
}
assert(ex.userVersion === version)
assert(ex.earliest === earliestExpectedChkVersion)
assert(ex.latest === 25)
} else {
spark.sql(sqlCommand).collect()
}
}

// Validate CDF - SELECT * FROM table_changes_by_path('table', X, Y)
(0 to 24).map { version =>
val sqlCommand = s"SELECT * FROM " +
s"table_changes_by_path('${tempDir.getCanonicalPath}', $version, 25)"
if (version < earliestExpectedChkVersion) {
if (coordinatedCommitsEnabledInTests) {
intercept[IllegalStateException] {
spark.sql(sqlCommand).collect()
}
} else {
intercept[org.apache.spark.sql.delta.DeltaFileNotFoundException] {
spark.sql(sqlCommand).collect()
}
}
} else {
spark.sql(sqlCommand).collect()
}
}
}
}
}
}

test(s"cleanup does not delete the JSON logs if the multi-part checkpoint is incomplete.") {
withSQLConf(DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> "1") {
withTempDir { tempDir =>
val startTime = getStartTimeForRetentionTest
val clock = new ManualClock(startTime)
val actualTestStartTime = System.currentTimeMillis()
val tableReference = s"delta.`${tempDir.getCanonicalPath()}`"
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock)
val logPath = new File(log.logPath.toUri)

// commit 0
spark.sql(
s"""CREATE TABLE $tableReference (id Int) USING delta
| TBLPROPERTIES('delta.enableChangeDataFeed' = true)
""".stripMargin)
// Set time for commit 0 to ensure that the commits don't need timestamp adjustment.
val commit0Time = clock.getTimeMillis()
new File(FileNames.unsafeDeltaFile(log.logPath, 0).toUri).setLastModified(commit0Time)
new File(FileNames.checksumFile(log.logPath, 0).toUri).setLastModified(commit0Time)

def commitNewVersion(version: Long): Unit = {
spark.sql(s"INSERT INTO $tableReference VALUES (1)")

val deltaFile = new File(FileNames.unsafeDeltaFile(log.logPath, version).toUri)
val time = clock.getTimeMillis() + version * 1000
deltaFile.setLastModified(time)
val crcFile = new File(FileNames.checksumFile(log.logPath, version).toUri)
crcFile.setLastModified(time)
val chks = getCheckpointFiles(logPath)
.filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == version)

if (version % 10 == 0) {
assert(chks.length >= 2) // Multipart checkpoints
chks.foreach { chk =>
assert(chk.exists())
chk.setLastModified(time)
}
} else { assert(chks.isEmpty) }
}

// Day 0: Add commits 1 to 15 --> creates 1 checkpoint at Day 0 for version 10
(1L to 15L).foreach(commitNewVersion)

// ensure that the checkpoint at version 10 exists
val checkpoint10Files = getCheckpointFiles(logPath)
.filter(f => FileNames.checkpointVersion(new Path(f.getCanonicalPath)) == 10)
assert(checkpoint10Files.length >= 2) // Multipart checkpoints
assert(checkpoint10Files.forall(_.exists))
val deltaFiles = (0 to 15).map { i =>
new File(FileNames.unsafeDeltaFile(log.logPath, i).toUri)
}
deltaFiles.foreach { f =>
assert(f.exists())
}

// Day 35: Add commits 16 to 25 --> creates a checkpoint at Day 35 for version 20
clock.setTime(day(startTime, 35))
(16L to 25L).foreach(commitNewVersion)

assert(checkpoint10Files.forall(_.exists))
deltaFiles.foreach { f =>
assert(f.exists())
}

checkpoint10Files.lastOption.foreach { lastPart =>
lastPart.delete() // delete the last part to simulate incomplete checkpoint
}

// auto cleanup is disabled in DeltaRetentionSuiteBase so tests have control when it happens
cleanUpExpiredLogs(log)

// assert that delta logs are not deleted due to missing checkpoint part
deltaFiles.foreach { f =>
val version = FileNames.deltaVersion(new Path(f.toString()))
assert(f.exists, s"version $version should not be deleted")
}
}
}
}

test("Metadata cleanup respects requireCheckpointProtectionBeforeVersion") {
withSQLConf(
DeltaSQLConf.ALLOW_METADATA_CLEANUP_WHEN_ALL_PROTOCOLS_SUPPORTED.key -> "false",
Expand Down Expand Up @@ -611,31 +827,34 @@ class DeltaRetentionSuite extends QueryTest

// Corner cases.
testRequireCheckpointProtectionBeforeVersion(
createNumCommitsOutsideRetentionPeriod = 2,
createNumCommitsWithinRetentionPeriod = 14,
createNumCommitsOutsideRetentionPeriod = 1,
createNumCommitsWithinRetentionPeriod = 15,
createCheckpoints = Set(1),
requireCheckpointProtectionBeforeVersion = 0,
expectedCommitsAfterCleanup = (2 to 15),
expectedCommitsAfterCleanup = (1 to 15),
// Α checkpoint is automatically created every 10 commits.
expectedCheckpointsAfterCleanup = Set(10))
expectedCheckpointsAfterCleanup = Set(1, 10))

testRequireCheckpointProtectionBeforeVersion(
createNumCommitsOutsideRetentionPeriod = 2,
createNumCommitsWithinRetentionPeriod = 14,
createNumCommitsOutsideRetentionPeriod = 1,
createNumCommitsWithinRetentionPeriod = 15,
createCheckpoints = Set(1),
requireCheckpointProtectionBeforeVersion = 1,
expectedCommitsAfterCleanup = (2 to 15),
expectedCommitsAfterCleanup = (1 to 15),
// Α checkpoint is automatically created every 10 commits.
expectedCheckpointsAfterCleanup = Set(10))
expectedCheckpointsAfterCleanup = Set(1, 10))

// v1 can't be deleted because it is the only checkpoint before version 2.
// v0 can't be deleted because of the checkpoint protection, v0 and v1 needs
// to be deleted together.
testRequireCheckpointProtectionBeforeVersion(
createNumCommitsOutsideRetentionPeriod = 2,
createNumCommitsWithinRetentionPeriod = 14,
createNumCommitsOutsideRetentionPeriod = 1,
createNumCommitsWithinRetentionPeriod = 15,
createCheckpoints = Set(1),
requireCheckpointProtectionBeforeVersion = 2,
expectedCommitsAfterCleanup = (2 to 15),
expectedCommitsAfterCleanup = (0 to 15),
// Α checkpoint is automatically created every 10 commits.
expectedCheckpointsAfterCleanup = Set(10))
expectedCheckpointsAfterCleanup = Set(1, 10))

testRequireCheckpointProtectionBeforeVersion(
createNumCommitsOutsideRetentionPeriod = 2,
Expand Down Expand Up @@ -713,20 +932,18 @@ class DeltaRetentionSuite extends QueryTest
testRequireCheckpointProtectionBeforeVersion(
createNumCommitsOutsideRetentionPeriod = 8,
createNumCommitsWithinRetentionPeriod = 8,
createCheckpoints = Set(1),
createCheckpoints = Set(1, 8),
requireCheckpointProtectionBeforeVersion = 10,
expectedCommitsAfterCleanup = (8 to 15),
// This is a bit weird. Cleanup should had created a checkpoint at 8.
expectedCheckpointsAfterCleanup = Set(10))
expectedCheckpointsAfterCleanup = Set(8, 10))

testRequireCheckpointProtectionBeforeVersion(
createNumCommitsOutsideRetentionPeriod = 8,
createNumCommitsWithinRetentionPeriod = 8,
createCheckpoints = Set(0),
createCheckpoints = Set(0, 8),
requireCheckpointProtectionBeforeVersion = 10,
expectedCommitsAfterCleanup = (8 to 15),
// This is a bit weird. Cleanup should had created a checkpoint at 8.
expectedCheckpointsAfterCleanup = Set(10))
expectedCheckpointsAfterCleanup = Set(8, 10))
}
}

Expand Down
Loading
Loading