Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -569,69 +569,71 @@ class DeltaColumnRenameSuite extends QueryTest
}

testColumnMapping("rename column with special characters and data skipping stats") { mode =>
withTable("t1") {
spark.sql(
s"""
|CREATE TABLE t1 (c int, d string)
|USING DELTA
|TBLPROPERTIES (
| '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = '$mode',
| '${DeltaConfigs.DATA_SKIPPING_STATS_COLUMNS.key}' = 'c,d'
|)
|""".stripMargin)
spark.sql("INSERT INTO t1 VALUES (1, 'value1'), (2, 'value2'), (3, 'value3')")

// Verify stats are collected before rename
val deltaLog = DeltaLog.forTable(spark, spark.sessionState.catalog.getTableMetadata(
spark.sessionState.sqlParser.parseTableIdentifier("t1")))
val statsBefore = deltaLog.update().allFiles.collect().head.stats
assert(statsBefore != null && statsBefore.contains("numRecords"))

// Rename column c to a name with special characters
spark.sql("ALTER TABLE t1 RENAME COLUMN c TO `c#2`")

// Verify the rename worked
checkAnswer(
spark.table("t1"),
Seq(Row(1, "value1"), Row(2, "value2"), Row(3, "value3")))

// Verify we can query using the new column name
checkAnswer(
spark.sql("SELECT `c#2` FROM t1 WHERE `c#2` > 1"),
Seq(Row(2), Row(3)))

// Insert data after rename to ensure stats collection still works
spark.sql("INSERT INTO t1 VALUES (4, 'value4'), (5, 'value5')")

checkAnswer(
spark.table("t1"),
Seq(
Row(1, "value1"),
Row(2, "value2"),
Row(3, "value3"),
Row(4, "value4"),
Row(5, "value5")))

// Verify stats are still being collected after rename
val statsAfter = deltaLog.update().allFiles.collect().last.stats
assert(statsAfter != null && statsAfter.contains("numRecords"))

// Verify the rename history includes the escaped column name
val renameHistoryDf = sql("DESCRIBE HISTORY t1")
.where("operation = 'RENAME COLUMN'")
.select("operationParameters")
withSQLConf(DeltaSQLConf.DELTA_RENAME_COLUMN_ESCAPE_NAME.key -> "true") {
withTable("t1") {
spark.sql(
s"""
|CREATE TABLE t1 (c int, d string)
|USING DELTA
|TBLPROPERTIES (
| '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = '$mode',
| '${DeltaConfigs.DATA_SKIPPING_STATS_COLUMNS.key}' = 'c,d'
|)
|""".stripMargin)
spark.sql("INSERT INTO t1 VALUES (1, 'value1'), (2, 'value2'), (3, 'value3')")

// Verify stats are collected before rename
val deltaLog = DeltaLog.forTable(spark, spark.sessionState.catalog.getTableMetadata(
spark.sessionState.sqlParser.parseTableIdentifier("t1")))
val statsBefore = deltaLog.update().allFiles.collect().head.stats
assert(statsBefore != null && statsBefore.contains("numRecords"))

// Rename column c to a name with special characters
spark.sql("ALTER TABLE t1 RENAME COLUMN c TO `c#2`")

// Verify the rename worked
checkAnswer(
spark.table("t1"),
Seq(Row(1, "value1"), Row(2, "value2"), Row(3, "value3")))

// Verify we can query using the new column name
checkAnswer(
spark.sql("SELECT `c#2` FROM t1 WHERE `c#2` > 1"),
Seq(Row(2), Row(3)))

// Insert data after rename to ensure stats collection still works
spark.sql("INSERT INTO t1 VALUES (4, 'value4'), (5, 'value5')")

checkAnswer(
spark.table("t1"),
Seq(
Row(1, "value1"),
Row(2, "value2"),
Row(3, "value3"),
Row(4, "value4"),
Row(5, "value5")))

// Verify stats are still being collected after rename
val statsAfter = deltaLog.update().allFiles.collect().last.stats
assert(statsAfter != null && statsAfter.contains("numRecords"))

// Verify the rename history includes the escaped column name
val renameHistoryDf = sql("DESCRIBE HISTORY t1")
.where("operation = 'RENAME COLUMN'")
.select("operationParameters")

val operationParams = renameHistoryDf.head().getMap[String, String](0)
assert(operationParams("oldColumnPath") == "c")
assert(operationParams("newColumnPath").contains("c#2"))
val operationParams = renameHistoryDf.head().getMap[String, String](0)
assert(operationParams("oldColumnPath") == "c")
assert(operationParams("newColumnPath").contains("c#2"))

// Rename c#2 back to c before renaming column d
spark.sql("ALTER TABLE t1 RENAME COLUMN `c#2` TO c")
// Rename c#2 back to c before renaming column d
spark.sql("ALTER TABLE t1 RENAME COLUMN `c#2` TO c")

// Verify rename back worked
checkAnswer(
spark.sql("SELECT c FROM t1 WHERE c = 3"),
Seq(Row(3)))
// Verify rename back worked
checkAnswer(
spark.sql("SELECT c FROM t1 WHERE c = 3"),
Seq(Row(3)))
}
}
}
}
Loading