Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ private[delta] class ConflictChecker(
val nextAvailableVersion = winningCommitVersion + 1L
val updatedMetadata =
InCommitTimestampUtils.getUpdatedMetadataWithICTEnablementInfo(
spark,
updatedCommitTimestamp,
currentTransactionInfo.readSnapshot,
currentTransactionInfo.metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,7 @@ trait OptimisticTransactionImpl extends TransactionHelper
val metadataWithIctInfo = commitInfo.inCommitTimestamp
.flatMap { inCommitTimestamp =>
InCommitTimestampUtils.getUpdatedMetadataWithICTEnablementInfo(
inCommitTimestamp, snapshot, metadata, firstAttemptVersion)
spark, inCommitTimestamp, snapshot, metadata, firstAttemptVersion)
}.getOrElse { return false }
newMetadata = Some(metadataWithIctInfo)
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,16 @@ trait DeltaSQLConfBase {
// DynamoDB Commit Coordinator-specific configs end
/////////////////////////////////////////////

val IN_COMMIT_TIMESTAMP_RETAIN_ENABLEMENT_INFO_FIX_ENABLED =
buildConf("inCommitTimestamp.retainEnablementInfoFix.enabled")
.internal()
.doc("When disabled, Delta can end up dropping " +
s"inCommitTimestampEnablementVersion and inCommitTimestampEnablementTimestamp " +
s"during a REPLACE or CLONE command. This accidental removal of these " +
s"properties can result in failures on time travel queries.")
.booleanConf
.createWithDefault(true)

val DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD =
buildConf("catalog.update.longFieldTruncationThreshold")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,20 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.util.ScalaExtensions._

import org.apache.spark.sql.SparkSession

object InCommitTimestampUtils {

final val TABLE_PROPERTY_CONFS = Seq(
DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED,
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION,
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP)

final val TABLE_PROPERTY_KEYS: Seq[String] = TABLE_PROPERTY_CONFS.map(_.key)

/** Returns true if the current transaction implicitly/explicitly enables ICT. */
def didCurrentTransactionEnableICT(
currentTransactionMetadata: Metadata,
Expand All @@ -45,22 +55,43 @@ object InCommitTimestampUtils {
/**
* Returns the updated [[Metadata]] with inCommitTimestamp enablement related info
* (version and timestamp) correctly set.
* This is done only
* This enablement info will be set to the current commit's timestamp and version if:
* 1. If this transaction enables inCommitTimestamp.
* 2. If the commit version is not 0. This is because we only need to persist
* the enablement info if there are non-ICT commits in the Delta log.
* For cases where ICT is enabled in both the current transaction and the read snapshot,
* we will retain the enablement info from the read snapshot. Note that this can
* happen for commands like REPLACE or CLONE, where we can end up dropping the enablement
* info due to the belief that ICT was just enabled.
* Note: This function must only be called after transaction conflicts have been resolved.
*/
def getUpdatedMetadataWithICTEnablementInfo(
spark: SparkSession,
inCommitTimestamp: Long,
readSnapshot: Snapshot,
metadata: Metadata,
commitVersion: Long): Option[Metadata] = {
Option.when(didCurrentTransactionEnableICT(metadata, readSnapshot) && commitVersion != 0) {
if (didCurrentTransactionEnableICT(metadata, readSnapshot) && commitVersion != 0) {
val enablementTrackingProperties = Map(
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.key -> commitVersion.toString,
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.key -> inCommitTimestamp.toString)
metadata.copy(configuration = metadata.configuration ++ enablementTrackingProperties)
Some(metadata.copy(configuration = metadata.configuration ++ enablementTrackingProperties))
} else if (DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(metadata) &&
!didCurrentTransactionEnableICT(metadata, readSnapshot) &&
// This check ensures that we don't make an unnecessary metadata update
// even when ICT enablement properties are not being dropped.
getValidatedICTEnablementInfo(readSnapshot.metadata).isDefined &&
getValidatedICTEnablementInfo(metadata).isEmpty &&
spark.conf.get(DeltaSQLConf.IN_COMMIT_TIMESTAMP_RETAIN_ENABLEMENT_INFO_FIX_ENABLED)
) {
// If ICT was enabled in the readSnapshot and is still enabled, we should
// retain the enablement info from the read snapshot.
// This prevents enablement info from being dropped during REPLACE/CLONE.
val existingICTConfigs = readSnapshot.metadata.configuration
.filter { case (k, _) => TABLE_PROPERTY_KEYS.contains(k) }
Some(metadata.copy(configuration = metadata.configuration ++ existingICTConfigs))
} else {
None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames, Json
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -1146,6 +1147,104 @@ class InCommitTimestampSuite
}}}
}
}

private def testICTEnablementPropertyRetention(
expectRetention: Boolean,
expectICTEnabled: Option[Boolean] = None)(runCommand: (String) => Unit): Unit = {
val ictConfOpt =
spark.conf.getOption(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey)
try {
spark.conf.unset(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey)
withTempDir { tempDir =>
spark.range(1).write.format("delta").save(tempDir.getAbsolutePath)
val deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
// Enable ICT at version 1 instead of 0 so that we can test the retention of
// enablement provenance properties as well.
spark.sql(
s"ALTER TABLE delta.`${tempDir.getAbsolutePath}` " +
s"SET TBLPROPERTIES ('${DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key}' = 'true')")
val enablementVersion =
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.fromMetaData(
deltaLog.snapshot.metadata)
val enablementTimestamp =
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.fromMetaData(
deltaLog.snapshot.metadata)
assert(enablementVersion.contains(1))
assert(enablementTimestamp.isDefined)

spark.range(2, 3).write.format("delta").mode("overwrite").save(tempDir.getAbsolutePath)

// Run the REPLACE/CLONE command.
runCommand(tempDir.getAbsolutePath)

val metadataAfterReplace = deltaLog.update().metadata
assert(
DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(
metadataAfterReplace) == expectICTEnabled.getOrElse(expectRetention))
if (expectRetention) {
assert(
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.fromMetaData(
metadataAfterReplace) == enablementTimestamp)
assert(
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.fromMetaData(
metadataAfterReplace) == enablementVersion)
} else {
Seq(
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.key,
DeltaConfigs.IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.key
).foreach { key =>
assert(!metadataAfterReplace.configuration.contains(key))
}
}
}
} finally {
ictConfOpt.foreach { ictConf =>
spark.conf.set(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey, ictConf)
}
}
}

testWithDefaultCommitCoordinatorUnset(
"ICT enablement properties remain unchanged after a REPLACE with explicit enablement") {
testICTEnablementPropertyRetention(expectRetention = true) { tableDir =>
sql(s"REPLACE TABLE delta.`$tableDir` USING delta " +
s"TBLPROPERTIES ('${DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key}' = 'true') " +
"AS SELECT * FROM range(3, 4)")
}
}

testWithDefaultCommitCoordinatorUnset(
"ICT enablement properties are dropped after a REPLACE with explicit enablement " +
s"when the ${DeltaSQLConf.IN_COMMIT_TIMESTAMP_RETAIN_ENABLEMENT_INFO_FIX_ENABLED.key} " +
s"is disabled") {
withSQLConf(
DeltaSQLConf.IN_COMMIT_TIMESTAMP_RETAIN_ENABLEMENT_INFO_FIX_ENABLED.key -> "false"
) {
testICTEnablementPropertyRetention(
expectRetention = false, expectICTEnabled = Some(true)) { tableDir =>
sql(
s"REPLACE TABLE delta.`$tableDir` USING delta " +
s"TBLPROPERTIES ('${DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key}' = 'true') " +
"AS SELECT * FROM range(3, 4)")
}
}
}

testWithDefaultCommitCoordinatorUnset(
"ICT enablement properties are dropped after a REPLACE with explicit disablement") {
testICTEnablementPropertyRetention(expectRetention = false) { tableDir =>
sql(s"REPLACE TABLE delta.`$tableDir` USING delta " +
s"TBLPROPERTIES ('${DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key}' = 'false') " +
"AS SELECT * FROM range(3, 4)")
}
}

testWithDefaultCommitCoordinatorUnset(
"ICT is completely dropped after a REPLACE with no explicit disablement") {
testICTEnablementPropertyRetention(expectRetention = false) { tableDir =>
sql(s"REPLACE TABLE delta.`$tableDir` USING delta AS SELECT * FROM range(3, 4)")
}
}
}

class InCommitTimestampWithCoordinatedCommitsSuite
Expand Down
Loading