Skip to content

Commit

Permalink
[3.2] DeltaCatalog.createTable should respect PROP_IS_MANAGED_LOCATION (
Browse files Browse the repository at this point in the history
#3663)

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Even if a table has the location field, it should still be a managed
table if `PROP_IS_MANAGED_LOCATION` is present in the table properties.

Note: this case won't happen with Spark integration solely. It's only an
issue for third-party catalogs that delegate requests to `DeltaCatalog`,
such as Unity Catalog.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
new test
## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
no

Co-authored-by: Wenchen Fan <[email protected]>
  • Loading branch information
tdas and cloud-fan authored Sep 10, 2024
1 parent 8fef464 commit b18fe40
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,19 @@ class DeltaCatalog extends DelegatingCatalogExtension
.getOrElse(spark.sessionState.catalog.defaultTablePath(id))
val storage = DataSource.buildStorageFormatFromOptions(writeOptions)
.copy(locationUri = Option(loc))
val tableType =
if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
// PROP_IS_MANAGED_LOCATION indicates that the table location is not user-specified but
// system-generated. The table should be created as managed table in this case.
val isManagedLocation = Option(allTableProperties.get(TableCatalog.PROP_IS_MANAGED_LOCATION))
.exists(_.equalsIgnoreCase("true"))
// Note: Spark generates the table location for managed tables in
// `DeltaCatalog#delegate#createTable`, so `isManagedLocation` should never be true if
// Unity Catalog is not involved. For safety we also check `isUnityCatalog` here.
val respectManagedLoc = isUnityCatalog || org.apache.spark.util.Utils.isTesting
val tableType = if (location.isEmpty || (isManagedLocation && respectManagedLoc)) {
CatalogTableType.MANAGED
} else {
CatalogTableType.EXTERNAL
}
val commentOpt = Option(allTableProperties.get("comment"))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,22 @@ class CustomCatalogSuite extends QueryTest with SharedSparkSession
}
}
}

test("custom catalog that generates location for managed tables") {
// Reset catalog manager so that the new `spark_catalog` implementation can apply.
spark.sessionState.catalogManager.reset()
withSQLConf("spark.sql.catalog.spark_catalog" -> classOf[DummySessionCatalog].getName) {
withTable("t") {
withTempPath { path =>
sql(s"CREATE TABLE t (id LONG) USING delta TBLPROPERTIES (fakeLoc='$path')")
val t = spark.sessionState.catalogManager.v2SessionCatalog.asInstanceOf[TableCatalog]
.loadTable(Identifier.of(Array("default"), "t"))
// It should be a managed table.
assert(!t.properties().containsKey(TableCatalog.PROP_EXTERNAL))
}
}
}
}
}

class DummyCatalog extends TableCatalog {
Expand Down Expand Up @@ -396,9 +412,10 @@ class DummySessionCatalogInner extends DelegatingCatalogExtension {
}

// A dummy catalog that adds a layer between DeltaCatalog and the Spark SessionCatalog,
// to attach additional table storage properties after the table is loaded.
// to attach additional table storage properties after the table is loaded, and generates location
// for managed tables.
class DummySessionCatalog extends TableCatalog {
private var deltaCatalog: DelegatingCatalogExtension = null
private var deltaCatalog: DeltaCatalog = null

override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
val inner = new DummySessionCatalogInner()
Expand All @@ -421,7 +438,16 @@ class DummySessionCatalog extends TableCatalog {
schema: StructType,
partitions: Array[Transform],
properties: java.util.Map[String, String]): Table = {
deltaCatalog.createTable(ident, schema, partitions, properties)
if (!properties.containsKey(TableCatalog.PROP_EXTERNAL) &&
!properties.containsKey(TableCatalog.PROP_LOCATION)) {
val newProps = new java.util.HashMap[String, String]
newProps.putAll(properties)
newProps.put(TableCatalog.PROP_LOCATION, properties.get("fakeLoc"))
newProps.put(TableCatalog.PROP_IS_MANAGED_LOCATION, "true")
deltaCatalog.createTable(ident, schema, partitions, newProps)
} else {
deltaCatalog.createTable(ident, schema, partitions, properties)
}
}

override def alterTable(ident: Identifier, changes: TableChange*): Table = {
Expand Down

0 comments on commit b18fe40

Please sign in to comment.