Skip to content

Commit f667dd6

Browse files
committed
[Spark] Make deltaRestApi.enabled default to true
Signed-off-by: Yi Li <yi.li@databricks.com>
1 parent 8677f80 commit f667dd6

7 files changed

Lines changed: 57 additions & 19 deletions

File tree

project/scripts/setup_unitycatalog_main.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ set -euo pipefail
5757
# The pin. Bump both lines together if UC's version.sbt changed at the new SHA. build.sbt's
5858
# `unityCatalogVersion` is obtained by running this script with `--print-version`, so these two
5959
# values are the single source of truth.
60-
UC_PIN_SHA=ae0a40caa8b2b0627e88b93cb84fe6be255575db
60+
UC_PIN_SHA=9196fa45948adbebb9276140d5d16c60ec7ae7a3
6161
UC_BASE_VERSION=0.5.0-SNAPSHOT
6262
# ---------------------------------------------------------------------------------------------
6363

spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalog.scala

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,38 @@ class AbstractDeltaCatalog extends DelegatingCatalogExtension
8686

8787
val spark = SparkSession.active
8888

89+
// Captured from `initialize` so `deltaCatalogClient` can be built lazily (see its docs). Using
90+
// the `initialize` name directly rather than `name()`, which delegates to the (possibly unset)
91+
// delegate catalog.
92+
private var catalogName: String = _
93+
private var catalogOptions: CaseInsensitiveStringMap = _
94+
8995
/**
9096
* When defined, table operations are routed through this client instead of through the
9197
* [[org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension]] delegate that
9298
* `AbstractDeltaCatalog` normally relies on. This lets the catalog inject custom
9399
* interactions (e.g. talking to a REST endpoint, catalog-specific property handling,
94100
* storage-credential vending) rather than going through the Spark
95101
* [[org.apache.spark.sql.connector.catalog.TableCatalog]] API.
102+
*
103+
* Evaluated lazily rather than in `initialize` because the decision depends on
104+
* [[isUnityCatalog]], which reads the [[DelegatingCatalogExtension]] delegate. Spark's
105+
* `CatalogManager` calls `initialize` *before* `setDelegateCatalog` for the session catalog,
106+
* so the delegate isn't available yet during `initialize`; it is by the time any table
107+
* operation (the only reader of this client) runs.
96108
*/
97-
private[catalog] var deltaCatalogClient: Option[AbstractDeltaCatalogClient] = None
109+
private[catalog] lazy val deltaCatalogClient: Option[AbstractDeltaCatalogClient] =
110+
if (isUnityCatalog) {
111+
AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled(
112+
catalogName, catalogOptions, super.loadTable)
113+
} else {
114+
None
115+
}
98116

99117
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
100118
super.initialize(name, options)
101-
deltaCatalogClient =
102-
AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled(name, options, super.loadTable)
119+
catalogName = name
120+
catalogOptions = options
103121
}
104122

105123
private lazy val isUnityCatalog: Boolean = {

spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,19 @@ private[catalog] object AbstractDeltaCatalogClient extends Logging {
136136
"org.apache.spark.sql.delta.catalog.UCDeltaCatalogClientImpl"
137137

138138
/**
139-
* Returns a [[AbstractDeltaCatalogClient]] wrapped in [[Some]] when the catalog opted in via
140-
* `deltaRestApi.enabled`, else [[None]]. The concrete impl is loaded reflectively so
141-
* [[AbstractDeltaCatalog]] doesn't compile-depend on it. If opt-in is explicit but reflective
142-
* loading fails, throws [[IllegalStateException]] rather than silently degrading.
139+
* Returns a [[AbstractDeltaCatalogClient]] wrapped in [[Some]] unless the catalog has
140+
* opted out via `deltaRestApi.enabled=false`, in which case returns [[None]]. The flag
141+
* defaults to `true` when absent, so any UC catalog goes through the UC Delta API path
142+
* unless the operator explicitly disables it. The concrete impl is loaded reflectively so
143+
* [[AbstractDeltaCatalog]] doesn't compile-depend on it; if loading fails, throws
144+
* [[IllegalStateException]] rather than silently degrading.
143145
*/
144146
def fromCatalogOptionsIfEnabled(
145147
catalogName: String,
146148
options: CaseInsensitiveStringMap,
147149
fallbackLoadTableFunc: Identifier => Table): Option[AbstractDeltaCatalogClient] = {
148150
val key = UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY
149-
if (!options.getBoolean(key, false)) {
151+
if (!options.getBoolean(key, true)) {
150152
return None
151153
}
152154
val factory = try {

spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,6 @@ object UCDeltaCatalogClientImpl extends AbstractDeltaCatalogClientFactory with L
637637
// case-insensitive so defaults don't create duplicate keys.
638638
val merged = new java.util.HashMap[String, String](options.asCaseSensitiveMap())
639639
Seq(
640-
UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY -> "true",
641640
UCTokenBasedRestClientFactory.RENEW_CREDENTIAL_ENABLED_KEY -> "true",
642641
UCTokenBasedRestClientFactory.CRED_SCOPED_FS_ENABLED_KEY -> "false"
643642
).foreach { case (k, v) => if (!options.containsKey(k)) merged.put(k, v) }
@@ -665,8 +664,8 @@ object UCDeltaCatalogClientImpl extends AbstractDeltaCatalogClientFactory with L
665664
val hasLegacyToken = options.get(LegacyTokenKey) != null
666665
if (!hasAuthPrefix && !hasLegacyToken) {
667666
throw new IllegalArgumentException(
668-
s"auth configuration is required when 'deltaRestApi.enabled' is true " +
669-
s"(catalog '$catalogName'). Set either '${AuthPrefix}type' (with the corresponding " +
667+
s"auth configuration is required (catalog '$catalogName'). " +
668+
s"Set either '${AuthPrefix}type' (with the corresponding " +
670669
s"$AuthPrefix* keys) or the legacy '$LegacyTokenKey' option.")
671670
}
672671
}

spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,8 @@ trait UCClientFactory {
280280
* Recognised ucConfig keys:
281281
* - `uri` (required) -- the UC server endpoint.
282282
* - `auth.*` / `token` (legacy) -- authentication parameters for [[TokenProvider]].
283-
* - `deltaRestApi.enabled` -- if `"true"`, uses [[UCDeltaTokenBasedRestClient]];
284-
* otherwise uses [[UCTokenBasedRestClient]].
283+
* - `deltaRestApi.enabled` -- defaults to `"true"`; uses [[UCDeltaTokenBasedRestClient]]
284+
* unless explicitly set to `"false"`, in which case [[UCTokenBasedRestClient]] is used.
285285
* - `appVersions.*` -- caller-supplied version entries merged with defaults; e.g.
286286
* `appVersions.Kernel -> "0.7.0"` adds a `"Kernel"` entry to the version map.
287287
*/
@@ -309,7 +309,7 @@ object UCTokenBasedRestClientFactory extends UCClientFactory {
309309
val tokenProvider = TokenProvider.create(authConfig.asJava)
310310

311311
val className =
312-
if (Option(ucConfig.get(DELTA_REST_API_ENABLED_KEY)).exists(_.equalsIgnoreCase("true"))) {
312+
if (ucConfig.getOrDefault(DELTA_REST_API_ENABLED_KEY, "true").toBoolean) {
313313
DELTA_UC_CLIENT_CLASS
314314
} else {
315315
DEFAULT_UC_CLIENT_CLASS

spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,19 @@ class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLComm
5252

5353
test("deltaRestApi.enabled=false leaves deltaCatalogClient empty") {
5454
val catalog = new AbstractDeltaCatalog
55-
catalog.initialize("test_cat", options())
55+
catalog.initialize("test_cat", options("deltaRestApi.enabled" -> "false"))
5656
assert(catalog.deltaCatalogClient.isEmpty,
5757
"UC Delta API client should not be constructed when the catalog opts out")
5858
}
5959

60+
test("deltaRestApi.enabled defaults to true: requires uri when flag absent") {
61+
val catalog = new AbstractDeltaCatalog
62+
val e = intercept[IllegalArgumentException] {
63+
catalog.initialize("test_cat", options())
64+
}
65+
assert(e.getMessage.contains("'uri' is required"))
66+
}
67+
6068
test("deltaRestApi.enabled=true requires uri") {
6169
val catalog = new AbstractDeltaCatalog
6270
val e = intercept[IllegalArgumentException] {
@@ -97,7 +105,7 @@ class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLComm
97105

98106
test("AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled returns None when flag is off") {
99107
val result = AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled(
100-
"test_cat", options(), noFallback)
108+
"test_cat", options("deltaRestApi.enabled" -> "false"), noFallback)
101109
assert(result.isEmpty)
102110
}
103111

@@ -109,6 +117,16 @@ class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLComm
109117
assert(result.isDefined)
110118
}
111119

120+
test("AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled returns Some when flag absent " +
121+
"(default is on)") {
122+
val result = AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled(
123+
"test_cat",
124+
options("uri" -> "http://uc", "token" -> "tok"),
125+
noFallback)
126+
assert(result.isDefined,
127+
"absent deltaRestApi.enabled should default to true and construct the client")
128+
}
129+
112130
private val noFallback: Identifier => Table =
113131
_ => throw new UnsupportedOperationException("fallback not expected in this test")
114132

spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,15 @@ private SparkConf configureSparkWithUnityCatalog(SparkConf conf) {
169169
conf.set("spark.sql.catalog." + catalogName, "io.unitycatalog.spark.UCSingleCatalog")
170170
.set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri())
171171
.set("spark.sql.catalog." + catalogName + ".token", uc.serverToken());
172-
if (useDeltaRestApiForTests()) {
172+
if (!useDeltaRestApiForTests()) {
173+
// Default is true. Tests can opt out.
173174
conf =
174175
conf.set(
175176
"spark.sql.catalog."
176177
+ catalogName
177178
+ "."
178179
+ UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY(),
179-
"true");
180+
"false");
180181
}
181182
return conf;
182183
}

0 commit comments

Comments
 (0)