Skip to content

Commit 9823770

Browse files
committed
[Spark] Make deltaRestApi.enabled default to true
Signed-off-by: Yi Li <yi.li@databricks.com>
1 parent 84c9d70 commit 9823770

5 files changed

Lines changed: 35 additions & 15 deletions

File tree

spark/src/main/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClient.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,19 @@ private[catalog] object AbstractDeltaCatalogClient extends Logging {
136136
"org.apache.spark.sql.delta.catalog.UCDeltaCatalogClientImpl"
137137

138138
/**
139-
* Returns a [[AbstractDeltaCatalogClient]] wrapped in [[Some]] when the catalog opted in via
140-
* `deltaRestApi.enabled`, else [[None]]. The concrete impl is loaded reflectively so
141-
* [[AbstractDeltaCatalog]] doesn't compile-depend on it. If opt-in is explicit but reflective
142-
* loading fails, throws [[IllegalStateException]] rather than silently degrading.
139+
* Returns a [[AbstractDeltaCatalogClient]] wrapped in [[Some]] unless the catalog has
140+
* opted out via `deltaRestApi.enabled=false`, in which case returns [[None]]. The flag
141+
* defaults to `true` when absent, so any UC catalog goes through the UC Delta API path
142+
* unless the operator explicitly disables it. The concrete impl is loaded reflectively so
143+
* [[AbstractDeltaCatalog]] doesn't compile-depend on it; if loading fails, throws
144+
* [[IllegalStateException]] rather than silently degrading.
143145
*/
144146
def fromCatalogOptionsIfEnabled(
145147
catalogName: String,
146148
options: CaseInsensitiveStringMap,
147149
fallbackLoadTableFunc: Identifier => Table): Option[AbstractDeltaCatalogClient] = {
148150
val key = UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY
149-
if (!options.getBoolean(key, false)) {
151+
if (!options.getBoolean(key, true)) {
150152
return None
151153
}
152154
val factory = try {

spark/src/main/scala/org/apache/spark/sql/delta/catalog/UCDeltaCatalogClientImpl.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,6 @@ object UCDeltaCatalogClientImpl extends AbstractDeltaCatalogClientFactory with L
627627
// case-insensitive so defaults don't create duplicate keys.
628628
val merged = new java.util.HashMap[String, String](options.asCaseSensitiveMap())
629629
Seq(
630-
UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY -> "true",
631630
UCTokenBasedRestClientFactory.RENEW_CREDENTIAL_ENABLED_KEY -> "true",
632631
UCTokenBasedRestClientFactory.CRED_SCOPED_FS_ENABLED_KEY -> "false"
633632
).foreach { case (k, v) => if (!options.containsKey(k)) merged.put(k, v) }
@@ -655,8 +654,8 @@ object UCDeltaCatalogClientImpl extends AbstractDeltaCatalogClientFactory with L
655654
val hasLegacyToken = options.get(LegacyTokenKey) != null
656655
if (!hasAuthPrefix && !hasLegacyToken) {
657656
throw new IllegalArgumentException(
658-
s"auth configuration is required when 'deltaRestApi.enabled' is true " +
659-
s"(catalog '$catalogName'). Set either '${AuthPrefix}type' (with the corresponding " +
657+
s"auth configuration is required (catalog '$catalogName'). " +
658+
s"Set either '${AuthPrefix}type' (with the corresponding " +
660659
s"$AuthPrefix* keys) or the legacy '$LegacyTokenKey' option.")
661660
}
662661
}

spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/UCCommitCoordinatorBuilder.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,8 @@ trait UCClientFactory {
280280
* Recognised ucConfig keys:
281281
* - `uri` (required) -- the UC server endpoint.
282282
* - `auth.*` / `token` (legacy) -- authentication parameters for [[TokenProvider]].
283-
* - `deltaRestApi.enabled` -- if `"true"`, uses [[UCDeltaTokenBasedRestClient]];
284-
* otherwise uses [[UCTokenBasedRestClient]].
283+
* - `deltaRestApi.enabled` -- defaults to `"true"`; uses [[UCDeltaTokenBasedRestClient]]
284+
* unless explicitly set to `"false"`, in which case [[UCTokenBasedRestClient]] is used.
285285
* - `appVersions.*` -- caller-supplied version entries merged with defaults; e.g.
286286
* `appVersions.Kernel -> "0.7.0"` adds a `"Kernel"` entry to the version map.
287287
*/
@@ -309,7 +309,7 @@ object UCTokenBasedRestClientFactory extends UCClientFactory {
309309
val tokenProvider = TokenProvider.create(authConfig.asJava)
310310

311311
val className =
312-
if (Option(ucConfig.get(DELTA_REST_API_ENABLED_KEY)).exists(_.equalsIgnoreCase("true"))) {
312+
if (ucConfig.getOrDefault(DELTA_REST_API_ENABLED_KEY, "true").toBoolean) {
313313
DELTA_UC_CLIENT_CLASS
314314
} else {
315315
DEFAULT_UC_CLIENT_CLASS

spark/src/test/scala/org/apache/spark/sql/delta/catalog/AbstractDeltaCatalogClientRoutingSuite.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,19 @@ class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLComm
5252

5353
test("deltaRestApi.enabled=false leaves deltaCatalogClient empty") {
5454
val catalog = new AbstractDeltaCatalog
55-
catalog.initialize("test_cat", options())
55+
catalog.initialize("test_cat", options("deltaRestApi.enabled" -> "false"))
5656
assert(catalog.deltaCatalogClient.isEmpty,
5757
"UC Delta API client should not be constructed when the catalog opts out")
5858
}
5959

60+
test("deltaRestApi.enabled defaults to true: requires uri when flag absent") {
61+
val catalog = new AbstractDeltaCatalog
62+
val e = intercept[IllegalArgumentException] {
63+
catalog.initialize("test_cat", options())
64+
}
65+
assert(e.getMessage.contains("'uri' is required"))
66+
}
67+
6068
test("deltaRestApi.enabled=true requires uri") {
6169
val catalog = new AbstractDeltaCatalog
6270
val e = intercept[IllegalArgumentException] {
@@ -97,7 +105,7 @@ class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLComm
97105

98106
test("AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled returns None when flag is off") {
99107
val result = AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled(
100-
"test_cat", options(), noFallback)
108+
"test_cat", options("deltaRestApi.enabled" -> "false"), noFallback)
101109
assert(result.isEmpty)
102110
}
103111

@@ -109,6 +117,16 @@ class AbstractDeltaCatalogClientRoutingSuite extends QueryTest with DeltaSQLComm
109117
assert(result.isDefined)
110118
}
111119

120+
test("AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled returns Some when flag absent " +
121+
"(default is on)") {
122+
val result = AbstractDeltaCatalogClient.fromCatalogOptionsIfEnabled(
123+
"test_cat",
124+
options("uri" -> "http://uc", "token" -> "tok"),
125+
noFallback)
126+
assert(result.isDefined,
127+
"absent deltaRestApi.enabled should default to true and construct the client")
128+
}
129+
112130
private val noFallback: Identifier => Table =
113131
_ => throw new UnsupportedOperationException("fallback not expected in this test")
114132

spark/unitycatalog/src/test/java/io/sparkuctest/UCDeltaTableIntegrationBaseTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,15 @@ private SparkConf configureSparkWithUnityCatalog(SparkConf conf) {
169169
conf.set("spark.sql.catalog." + catalogName, "io.unitycatalog.spark.UCSingleCatalog")
170170
.set("spark.sql.catalog." + catalogName + ".uri", uc.serverUri())
171171
.set("spark.sql.catalog." + catalogName + ".token", uc.serverToken());
172-
if (useDeltaRestApiForTests()) {
172+
if (!useDeltaRestApiForTests()) {
173+
// Default is true. Tests can opt out.
173174
conf =
174175
conf.set(
175176
"spark.sql.catalog."
176177
+ catalogName
177178
+ "."
178179
+ UCTokenBasedRestClientFactory.DELTA_REST_API_ENABLED_KEY(),
179-
"true");
180+
"false");
180181
}
181182
return conf;
182183
}

0 commit comments

Comments
 (0)