Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo}
import org.apache.spark.sql.connector.catalog.{BufferedRows, Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo}
import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue}
import org.apache.spark.sql.connector.catalog.TableChange
Expand All @@ -40,7 +41,7 @@ import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan
import org.apache.spark.sql.execution.datasources.v2.{AlterTableExec, CreateTableExec, DataSourceV2Relation, ReplaceTableExec}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, StringType, TimestampType}
import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, LongType, StringType, StructType, TimestampType}
import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -1759,6 +1760,263 @@ class DataSourceV2DataFrameSuite
}
}

// Temp views with stored plans: scenarios from the DSv2 table refresh design doc.
// Each test creates a DSv2 table with initial data, builds a temp view with a filter
// (to demonstrate that the stored plan is non-trivial), and then verifies the view
// behavior after various table modifications (session or external).

test("temp view with stored plan reflects session write") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")

spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
checkAnswer(spark.table("v"), Seq(Row(1, 100)))

sql(s"INSERT INTO $t VALUES (2, 200)")

checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
}
}

test("temp view with stored plan reflects external write") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")

spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
checkAnswer(spark.table("v"), Seq(Row(1, 100)))

// external writer adds (2, 200) via direct catalog API
val schema = StructType.fromDDL("id INT, salary INT")
val extTable = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
extTable.withData(Array(
new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200))))

checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
}
}

test("temp view with stored plan preserves schema after session ADD COLUMN") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")

spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
checkAnswer(spark.table("v"), Seq(Row(1, 100)))

sql(s"ALTER TABLE $t ADD COLUMN new_column INT")
sql(s"INSERT INTO $t VALUES (2, 200, -1)")

// view preserves original 2-column schema, filter still applied
checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
}
}

test("temp view with stored plan preserves schema after external ADD COLUMN") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")

spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
checkAnswer(spark.table("v"), Seq(Row(1, 100)))

// external schema change via catalog API
val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true)
catalog("testcat").alterTable(ident, addCol)

// external writer adds data with new schema
val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT")
val extTable = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
extTable.withData(Array(
new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1))))

// view preserves original 2-column schema, filter still applied
checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200)))
}
}

test("temp view with stored plan detects external column removal") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")

spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
checkAnswer(spark.table("v"), Seq(Row(1, 100)))

// external schema change via catalog API
val dropCol = TableChange.deleteColumn(Array("salary"), false)
catalog("testcat").alterTable(ident, dropCol)

checkError(
exception = intercept[AnalysisException] { spark.table("v").collect() },
condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
parameters = Map(
"viewName" -> "`v`",
"tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
"colType" -> "data",
"errors" -> "- `salary` INT has been removed"))
}
}

test("temp view with stored plan resolves to externally recreated table") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")

spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
checkAnswer(spark.table("v"), Seq(Row(1, 100)))

val originalTableId = catalog("testcat").loadTable(ident).id

// external drop and recreate via catalog API
catalog("testcat").dropTable(ident)
catalog("testcat").createTable(
ident,
new TableInfo.Builder()
.withColumns(Array(
Column.create("id", IntegerType),
Column.create("salary", IntegerType)))
.build())

val newTableId = catalog("testcat").loadTable(ident).id
assert(originalTableId != newTableId)

// view resolves to the new empty table
checkAnswer(spark.table("v"), Seq.empty)

// insert new data and verify the view picks it up
sql(s"INSERT INTO $t VALUES (2, 200)")
checkAnswer(spark.table("v"), Seq(Row(2, 200)))
}
}

test("temp view with stored plan after session drop and re-add column same type") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")

spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
checkAnswer(spark.table("v"), Seq(Row(1, 100)))

// drop and re-add column with same name and type
sql(s"ALTER TABLE $t DROP COLUMN salary")
sql(s"ALTER TABLE $t ADD COLUMN salary INT")

// schema validation passes (same column names and types)
// InMemoryTable preserves row data through ALTER chain
checkAnswer(spark.table("v"), Seq(Row(1, 100)))
}
}

test("temp view with stored plan after external drop and re-add column same type") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")

spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
checkAnswer(spark.table("v"), Seq(Row(1, 100)))

// external drop and re-add column via catalog API
val dropCol = TableChange.deleteColumn(Array("salary"), false)
val addCol = TableChange.addColumn(Array("salary"), IntegerType, true)
catalog("testcat").alterTable(ident, dropCol, addCol)

// schema validation passes (same column names and types)
checkAnswer(spark.table("v"), Seq(Row(1, 100)))
}
}

test("temp view with stored plan detects session column type change") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")

spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
checkAnswer(spark.table("v"), Seq(Row(1, 100)))

// drop and re-add column with same name but different type
sql(s"ALTER TABLE $t DROP COLUMN salary")
sql(s"ALTER TABLE $t ADD COLUMN salary STRING")

checkError(
exception = intercept[AnalysisException] { spark.table("v").collect() },
condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
parameters = Map(
"viewName" -> "`v`",
"tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
"colType" -> "data",
"errors" -> "- `salary` type has changed from INT to STRING"))
}
}

test("temp view with stored plan detects external column type change") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")

spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
checkAnswer(spark.table("v"), Seq(Row(1, 100)))

// external drop and re-add column with different type via catalog API
val dropCol = TableChange.deleteColumn(Array("salary"), false)
val addCol = TableChange.addColumn(Array("salary"), StringType, true)
catalog("testcat").alterTable(ident, dropCol, addCol)

checkError(
exception = intercept[AnalysisException] { spark.table("v").collect() },
condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
parameters = Map(
"viewName" -> "`v`",
"tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
"colType" -> "data",
"errors" -> "- `salary` type has changed from INT to STRING"))
}
}

test("temp view with stored plan detects type widening") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)")

spark.table(t).filter("salary < 999").createOrReplaceTempView("v")
checkAnswer(spark.table("v"), Seq(Row(1, 100)))

// widen salary type from INT to BIGINT via catalog API
val updateType = TableChange.updateColumnType(Array("salary"), LongType)
catalog("testcat").alterTable(ident, updateType)

checkError(
exception = intercept[AnalysisException] { spark.table("v").collect() },
condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
parameters = Map(
"viewName" -> "`v`",
"tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`",
"colType" -> "data",
"errors" -> "- `salary` type has changed from INT to BIGINT"))
}
}

test("cached DSv2 table DataFrame is refreshed and reused after insert") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
Expand Down