Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class IcebergConversionTransaction(
def add(add: AddFile): Unit = throw new UnsupportedOperationException
def add(remove: RemoveFile): Unit = throw new UnsupportedOperationException

def commit(): Unit = {
def commit(deltaCommitVersion: Long): Unit = {
assert(!committed, "Already committed.")
impl.commit()
committed = true
Expand All @@ -118,7 +118,7 @@ class IcebergConversionTransaction(
override def opType: String = "null"
override def add(add: AddFile): Unit = {}
override def add(remove: RemoveFile): Unit = {}
override def commit(): Unit = {}
override def commit(deltaCommitVersion: Long): Unit = {}
}
/**
* API for appending new files in a table.
Expand Down Expand Up @@ -195,12 +195,12 @@ class IcebergConversionTransaction(
removeBuffer += remove.toDataFile
}

override def commit(): Unit = {
override def commit(deltaCommitVersion: Long): Unit = {
if (removeBuffer.nonEmpty) {
rewriter.rewriteFiles(removeBuffer.asJava, addBuffer.asJava, 0)
}
currentSnapshotId.foreach(rewriter.validateFromSnapshot)
super.commit()
super.commit(deltaCommitVersion)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,16 +371,20 @@ class IcebergConverter(spark: SparkSession)

val actionsToConvert = DeltaFileProviderUtils.parallelReadAndParseDeltaFilesAsIterator(
log, spark, deltaFiles)
var deltaVersion = prevSnapshot.version
actionsToConvert.foreach { actionsIter =>
try {
deltaVersion += 1
// TODO: get rid of this grouped batching behavior
actionsIter.grouped(actionBatchSize).foreach { actionStrs =>
val actions = actionStrs.map(Action.fromJson)
needsExpireSnapshot ||= existsOptimize(actions)

runIcebergConversionForActions(
icebergTxn,
actions,
prevConvertedSnapshotOpt)
prevConvertedSnapshotOpt,
deltaVersion)
}
} finally {
actionsIter.close()
Expand Down Expand Up @@ -411,7 +415,7 @@ class IcebergConverter(spark: SparkSession)
.grouped(actionBatchSize)
.foreach { actions =>
needsExpireSnapshot ||= existsOptimize(actions)
runIcebergConversionForActions(icebergTxn, actions, None)
runIcebergConversionForActions(icebergTxn, actions, None, snapshotToConvert.version)
}

// Always attempt to update table metadata (schema/properties) for REPLACE_TABLE
Expand Down Expand Up @@ -442,7 +446,7 @@ class IcebergConverter(spark: SparkSession)
}
})
}
expireSnapshotHelper.commit()
expireSnapshotHelper.commit(snapshotToConvert.version)
}

icebergTxn.commit()
Expand Down Expand Up @@ -509,11 +513,14 @@ class IcebergConverter(spark: SparkSession)
/**
* Build an iceberg TransactionHelper from the provided txn, and commit the set of changes
* specified by the actionsToCommit.
*
* For iceberg v3+, deltaVersion will be used to set iceberg snapshot sequence number
*/
private[delta] def runIcebergConversionForActions(
icebergTxn: IcebergConversionTransaction,
actionsToCommit: Seq[Action],
prevSnapshotOpt: Option[Snapshot]): Unit = {
prevSnapshotOpt: Option[Snapshot],
deltaVersion: Long): Unit = {
prevSnapshotOpt match {
case None =>
// If we don't have a previous snapshot, that implies that the table is either being
Expand All @@ -525,7 +532,7 @@ class IcebergConverter(spark: SparkSession)
case _ => throw new IllegalStateException(s"Must provide only AddFiles when creating " +
s"or replacing an Iceberg Table.")
}
appendHelper.commit()
appendHelper.commit(deltaVersion)

case Some(_) =>
// We have to go through the seq of actions twice, once to figure out the TransactionHelper
Expand Down Expand Up @@ -575,7 +582,7 @@ class IcebergConverter(spark: SparkSession)
rewriteHelper.add(action.remove)
}
}
rewriteHelper.commit()
rewriteHelper.commit(deltaVersion)
} else if ((hasAdds && hasRemoves) || !allDeltaActionsCaptured) {
val overwriteHelper = icebergTxn.getOverwriteHelper
addsAndRemoves.foreach { action =>
Expand All @@ -585,7 +592,7 @@ class IcebergConverter(spark: SparkSession)
overwriteHelper.add(action.remove)
}
}
overwriteHelper.commit()
overwriteHelper.commit(deltaVersion)
} else if (hasAdds) {
if (!hasRemoves && !hasDataChange && allDeltaActionsCaptured) {
logInfo(log"Skip Iceberg conversion for commit that only has AddFiles " +
Expand All @@ -594,12 +601,12 @@ class IcebergConverter(spark: SparkSession)
} else {
val appendHelper = icebergTxn.getAppendOnlyHelper
addsAndRemoves.foreach(action => appendHelper.add(action.add))
appendHelper.commit()
appendHelper.commit(deltaVersion)
}
} else if (hasRemoves) {
val removeHelper = icebergTxn.getRemoveOnlyHelper
addsAndRemoves.foreach(action => removeHelper.add(action.remove))
removeHelper.commit()
removeHelper.commit(deltaVersion)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ case class CreateDeltaTableCommand(
}
}

val txn = startTxnForTableCreation(sparkSession, deltaLog, tableWithLocation)
var txn = startTxnForTableCreation(sparkSession, deltaLog, tableWithLocation)

OptimisticTransaction.withActive(txn) {
val result = query match {
Expand All @@ -185,7 +185,8 @@ case class CreateDeltaTableCommand(
commandMetrics = Some(metrics))
case Some(deltaWriter: WriteIntoDeltaLike) =>
checkPathEmpty(txn)
handleCreateTableAsSelect(sparkSession, txn, deltaLog, deltaWriter, tableWithLocation)
txn = handleCreateTableAsSelect(
sparkSession, txn, deltaLog, deltaWriter, tableWithLocation)
Nil
case Some(query) =>
checkPathEmpty(txn)
Expand All @@ -202,7 +203,8 @@ case class CreateDeltaTableCommand(
configuration = tableWithLocation.properties + ("comment" -> table.comment.orNull),
data = data,
Some(tableWithLocation))
handleCreateTableAsSelect(sparkSession, txn, deltaLog, deltaWriter, tableWithLocation)
txn = handleCreateTableAsSelect(
sparkSession, txn, deltaLog, deltaWriter, tableWithLocation)
Nil
case _ =>
handleCreateTable(sparkSession, txn, tableWithLocation, fs, hadoopConf)
Expand Down Expand Up @@ -258,13 +260,15 @@ case class CreateDeltaTableCommand(
* CREATE TABLE AS SELECT
* CREATE OR REPLACE TABLE AS SELECT
* .saveAsTable in DataframeWriter API
*
* @return the txn used to make Delta commit
*/
private def handleCreateTableAsSelect(
sparkSession: SparkSession,
txn: OptimisticTransaction,
deltaLog: DeltaLog,
deltaWriter: WriteIntoDeltaLike,
tableWithLocation: CatalogTable): Unit = {
tableWithLocation: CatalogTable): OptimisticTransaction = {
val isManagedTable = tableWithLocation.tableType == CatalogTableType.MANAGED
val options = new DeltaOptions(table.storage.properties, sparkSession.sessionState.conf)

Expand Down Expand Up @@ -334,6 +338,7 @@ case class CreateDeltaTableCommand(
val (taggedCommitData, op) = doDeltaWrite(updatedWriter, updatedWriter.data.schema.asNullable)
txn.commit(taggedCommitData.actions, op, tags = taggedCommitData.stringTags)
}
txnToReturn
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,26 @@ trait UniversalFormatSuiteBase extends IcebergCompatUtilsBase
}
}

test("create new UniForm table via clone") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unintended. will remove

withTempTableAndDir { case (id, loc) =>
executeSql(s"""
|CREATE TABLE $id (ID INT) USING DELTA TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name')
| LOCATION $loc """.stripMargin)
executeSql(s"""
|INSERT INTO $id values (1) """.stripMargin)
withTempTableAndDir { case (cloneId, loc1) =>
executeSql(s"""
|CREATE TABLE $cloneId SHALLOW CLONE $id TBLPROPERTIES (
| 'delta.universalFormat.enabledFormats' = 'iceberg',
| 'delta.enableIcebergCompatV$compatVersion' = 'true',
| 'delta.columnMapping.mode' = 'name'
|) LOCATION $loc1 """.stripMargin)
assertUniFormIcebergProtocolAndProperties(cloneId)
}
}
}

test("enable UniForm on existing table with IcebergCompat enabled") {
allReaderWriterVersions.foreach { case (r, w) =>
withTempTableAndDir { case (id, loc) =>
Expand Down
Loading