Skip to content

Commit b3eb469

Browse files
committed
[Spark] Advertise SUPPORT_COLUMN_DEFAULT_VALUE catalog capability
DeltaCatalog now overrides capabilities() to advertise TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE on top of the delegate's capabilities. Without this, Spark's analyzer (validateCatalogForDefaultValue) rejects column DEFAULT clauses on tables created through a non-session Delta catalog such as Unity Catalog; the session-catalog path is unaffected because Spark's V2SessionCatalog already reports this capability. Bumps the pinned Unity Catalog SHA used by the UC integration tests to 7ad03012 (unitycatalog#1589), where UCSingleCatalog forwards the delegate's capabilities, so the end-to-end path is exercised. Tests: a unit test asserting the capability is added on top of a delegate that reports none, and an end-to-end test creating a Delta table with DEFAULT values across string, int, bigint, boolean, double, decimal, date, array, map, and struct columns.
1 parent 3f10aae commit b3eb469

3 files changed

Lines changed: 52 additions & 4 deletions

File tree

project/scripts/setup_unitycatalog_main.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ set -euo pipefail
5757
# The pin. Bump both lines together if UC's version.sbt changed at the new SHA. build.sbt's
5858
# `unityCatalogVersion` is obtained by running this script with `--print-version`, so these two
5959
# values are the single source of truth.
60-
UC_PIN_SHA=e1f6e52acc39b925fd6a42180d400ca4e0a3895f
60+
UC_PIN_SHA=7ad030127959453b9010285f4b814dd78b947070
6161
UC_BASE_VERSION=0.5.0-SNAPSHOT
6262
# ---------------------------------------------------------------------------------------------
6363

spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
5555
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFieldPosition}
5656
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
5757
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, QualifiedColType, QualifiedColTypeShims, SyncIdentity}
58-
import org.apache.spark.sql.connector.catalog.{DelegatingCatalogExtension, Identifier, StagedTable, StagingTableCatalog, SupportsWrite, Table, TableCapability, TableCatalog, TableChange, V1Table}
58+
import org.apache.spark.sql.connector.catalog.{DelegatingCatalogExtension, Identifier, StagedTable, StagingTableCatalog, SupportsWrite, Table, TableCapability, TableCatalog, TableCatalogCapability, TableChange, V1Table}
5959
import org.apache.spark.sql.connector.catalog.TableCapability._
6060
import org.apache.spark.sql.connector.catalog.TableChange._
6161
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Literal, NamedReference, Transform}
@@ -102,6 +102,12 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
102102
AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled(name, options, super.loadTable)
103103
}
104104

105+
override def capabilities(): util.Set[TableCatalogCapability] = {
106+
val capabilities = new util.HashSet[TableCatalogCapability](super.capabilities())
107+
capabilities.add(TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE)
108+
capabilities
109+
}
110+
105111
private lazy val isUnityCatalog: Boolean = {
106112
val delegateField = classOf[DelegatingCatalogExtension].getDeclaredField("delegate")
107113
delegateField.setAccessible(true)

spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableCreationTests.scala

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
2828
import org.apache.spark.sql.delta.actions.Metadata
2929
import org.apache.spark.sql.delta.actions.Protocol
3030
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils
31+
import org.apache.spark.sql.delta.catalog.DeltaCatalog
3132
import org.apache.spark.sql.delta.sources.DeltaSQLConf
32-
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest}
33+
import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, DeltaSQLCommandTest, DummyCatalog}
3334
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
3435
import org.apache.commons.io.FileUtils
3536
import org.apache.hadoop.fs.Path
@@ -41,7 +42,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
4142
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalogUtils, SessionCatalog}
4243
import org.apache.spark.sql.catalyst.parser.ParseException
4344
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils
44-
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog}
45+
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog, TableCatalogCapability}
4546
import org.apache.spark.sql.functions.col
4647
import org.apache.spark.sql.internal.SQLConf
4748
import org.apache.spark.sql.test.SharedSparkSession
@@ -2715,6 +2716,47 @@ class DeltaTableCreationSuite
27152716
}
27162717
}
27172718
}
2719+
2720+
test("Default column values: Delta catalog advertises SUPPORT_COLUMN_DEFAULT_VALUE") {
2721+
// Spark only accepts column DEFAULT clauses on tables created through a (non-session) V2
2722+
// catalog when the catalog reports SUPPORT_COLUMN_DEFAULT_VALUE. The override has to add the
2723+
// capability on top of whatever the delegate reports, so wrap a delegate that reports none.
2724+
val deltaCatalog = new DeltaCatalog()
2725+
deltaCatalog.setDelegateCatalog(new DummyCatalog())
2726+
assert(deltaCatalog.capabilities().contains(
2727+
TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE))
2728+
}
2729+
2730+
test("Default column values: defaults of different column types are materialized") {
2731+
val tbl = "default_column_types"
2732+
withTable(tbl) {
2733+
sql(
2734+
s"""CREATE TABLE $tbl (
2735+
| id INT,
2736+
| str_col STRING DEFAULT 'hello',
2737+
| int_col INT DEFAULT 42,
2738+
| long_col BIGINT DEFAULT 1234567890123,
2739+
| bool_col BOOLEAN DEFAULT true,
2740+
| double_col DOUBLE DEFAULT 3.14,
2741+
| decimal_col DECIMAL(5, 2) DEFAULT 9.99,
2742+
| date_col DATE DEFAULT DATE'2024-01-01',
2743+
| arr_col ARRAY<INT> DEFAULT ARRAY(1, 2, 3),
2744+
| map_col MAP<STRING, INT> DEFAULT MAP('a', 1, 'b', 2),
2745+
| struct_col STRUCT<x: INT, y: STRING> DEFAULT NAMED_STRUCT('x', 7, 'y', 'foo')
2746+
|)
2747+
|USING DELTA
2748+
|TBLPROPERTIES ('delta.feature.allowColumnDefaults' = 'supported')""".stripMargin)
2749+
2750+
// Provide only the column without a default; the rest fall back to their DEFAULT values.
2751+
sql(s"INSERT INTO $tbl (id) VALUES (1)")
2752+
2753+
checkAnswer(
2754+
sql(s"SELECT * FROM $tbl"),
2755+
Row(1, "hello", 42, 1234567890123L, true, 3.14,
2756+
new java.math.BigDecimal("9.99"), java.sql.Date.valueOf("2024-01-01"),
2757+
Seq(1, 2, 3), Map("a" -> 1, "b" -> 2), Row(7, "foo")))
2758+
}
2759+
}
27182760
}
27192761

27202762
trait DeltaTableCreationColumnMappingSuiteBase extends DeltaColumnMappingSelectedTestMixin {

0 commit comments

Comments
 (0)