Skip to content

Commit 31bbb53

Browse files
davidyuan1223yaooqinn
authored andcommitted
[KYUUBI apache#7100] [apache#7099] Ranger Support Check Iceberg Alter Table Command & Change Iceberg Test Use Jdbc Catalog
Parent Issue apache#7040 Support Check Iceberg Alter Table Command ### Why are the changes needed? - [x] Alter Table Rename To - [x] Alter Table Set Properties - [x] Alter Table Unset Properties - [x] Alter Table Add Column - [x] Alter Table Rename Column - [x] Alter Table Alter Column - [x] Alter Table Drop Column ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? Closes apache#7100 from davidyuan1223/iceberg_alter_table_check. Closes apache#7100 4be2210 [davidyuan] update 53eda10 [davidyuan] update Authored-by: davidyuan <yuanfuyuan@mafengwo.com> Signed-off-by: Kent Yao <yao@apache.org>
1 parent aaac07f commit 31bbb53

5 files changed

Lines changed: 220 additions & 15 deletions

File tree

extensions/spark/kyuubi-spark-authz/pom.xml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,25 @@
3737
</properties>
3838

3939
<dependencies>
40+
41+
<dependency>
42+
<groupId>com.mysql</groupId>
43+
<artifactId>mysql-connector-j</artifactId>
44+
<scope>test</scope>
45+
</dependency>
46+
47+
<dependency>
48+
<groupId>com.dimafeng</groupId>
49+
<artifactId>testcontainers-scala-mysql_${scala.binary.version}</artifactId>
50+
<scope>test</scope>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>com.dimafeng</groupId>
55+
<artifactId>testcontainers-scala-scalatest_${scala.binary.version}</artifactId>
56+
<scope>test</scope>
57+
</dependency>
58+
4059
<dependency>
4160
<groupId>org.apache.kyuubi</groupId>
4261
<artifactId>kyuubi-util-scala_${scala.binary.version}</artifactId>
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.plugin.spark.authz
19+
20+
import com.dimafeng.testcontainers.MySQLContainer
21+
import org.testcontainers.utility.DockerImageName
22+
23+
trait MysqlContainerEnv {
24+
25+
val containerDef: MySQLContainer = MySQLContainer.Def(
26+
dockerImageName = DockerImageName.parse("mysql:5.7"),
27+
databaseName = "hive_metastore",
28+
username = "root",
29+
password = "123456")
30+
.createContainer()
31+
32+
def startEngine(): Unit = {
33+
containerDef.start()
34+
}
35+
36+
def stopEngine(): Unit = {
37+
containerDef.stop()
38+
}
39+
}

extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,24 +40,40 @@ trait SparkSessionProvider {
4040

4141
protected val extraSparkConf: SparkConf = new SparkConf()
4242

43+
protected val useMysqlEnv: Boolean = false
44+
45+
def getMysqlJdbcUrl: String = ""
46+
47+
def getMysqlUsername: String = ""
48+
49+
def getMysqlPassword: String = ""
50+
51+
def getDriverClassName: String = ""
52+
4353
protected lazy val spark: SparkSession = {
44-
val metastore = {
45-
val path = Utils.createTempDir(prefix = "hms")
46-
Files.deleteIfExists(path)
47-
path
48-
}
49-
val ret = SparkSession.builder()
54+
val sessionBuilder = SparkSession.builder()
5055
.master("local")
5156
.config("spark.ui.enabled", "false")
52-
.config("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastore;create=true")
5357
.config("spark.sql.catalogImplementation", catalogImpl)
5458
.config(
5559
"spark.sql.warehouse.dir",
5660
Utils.createTempDir("spark-warehouse").toString)
5761
.config("spark.sql.extensions", sqlExtensions)
5862
.withExtensions(extension)
5963
.config(extraSparkConf)
60-
.getOrCreate()
64+
65+
if (!useMysqlEnv) {
66+
val metastore = {
67+
val path = Utils.createTempDir(prefix = "hms")
68+
Files.deleteIfExists(path)
69+
path
70+
}
71+
sessionBuilder.config(
72+
"javax.jdo.option.ConnectionURL",
73+
s"jdbc:derby:;databaseName=$metastore;create=true")
74+
}
75+
76+
val ret = sessionBuilder.getOrCreate()
6177
if (catalogImpl == "hive") {
6278
// Ensure HiveExternalCatalog.client.userName is defaultTableOwner
6379
UserGroupInformation.createRemoteUser(defaultTableOwner).doAs(

extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala

Lines changed: 109 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import scala.util.Try
2424
import org.apache.spark.sql.Row
2525
import org.scalatest.Outcome
2626

27-
// scalastyle:off
2827
import org.apache.kyuubi.Utils
28+
// scalastyle:off
2929
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
3030
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
3131
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
@@ -39,11 +39,12 @@ import org.apache.kyuubi.util.AssertionUtils._
3939
*/
4040
@IcebergTest
4141
class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
42-
override protected val catalogImpl: String = "hive"
42+
override protected val useMysqlEnv: Boolean = true
43+
override protected val catalogImpl: String = "in-memory"
4344
override protected val sqlExtensions: String =
4445
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
4546

46-
val catalogV2 = "local"
47+
val catalogV2 = "jdbc_catalog"
4748
val namespace1 = icebergNamespace
4849
val table1 = "table1"
4950
val outputTable1 = "outputTable1"
@@ -55,16 +56,20 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite
5556
}
5657

5758
override def beforeAll(): Unit = {
59+
super.beforeAll()
5860
spark.conf.set(
5961
s"spark.sql.catalog.$catalogV2",
6062
"org.apache.iceberg.spark.SparkCatalog")
61-
spark.conf.set(s"spark.sql.catalog.$catalogV2.type", "hadoop")
63+
spark.conf.set(
64+
s"spark.sql.catalog.$catalogV2.type",
65+
"jdbc")
66+
spark.conf.set(s"spark.sql.catalog.$catalogV2.uri", getMysqlJdbcUrl)
67+
spark.conf.set(s"spark.sql.catalog.$catalogV2.jdbc.user", getMysqlUsername)
68+
spark.conf.set(s"spark.sql.catalog.$catalogV2.jdbc.password", getMysqlPassword)
6269
spark.conf.set(
6370
s"spark.sql.catalog.$catalogV2.warehouse",
6471
Utils.createTempDir("iceberg-hadoop").toString)
6572

66-
super.beforeAll()
67-
6873
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $catalogV2.$namespace1"))
6974
doAs(
7075
admin,
@@ -586,4 +591,102 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite
586591
doAs(admin, sql(dropTagSql))
587592
}
588593
}
594+
595+
test("RENAME TABLE for Iceberg") {
596+
val table = "tablex"
597+
withCleanTmpResources(Seq((table, "table"))) {
598+
doAs(
599+
admin,
600+
sql(
601+
s"CREATE TABLE $catalogV2.$namespace1.$table" +
602+
s"(id int NOT NULL, name string, city string) USING iceberg"))
603+
val renameSql = s"alter table $catalogV2.$namespace1.$table " +
604+
s"rename to $namespace1.new_table"
605+
interceptEndsWith[AccessControlException] {
606+
doAs(someone, sql(renameSql))
607+
}(s"does not have [alter] privilege on [$namespace1/tablex]")
608+
doAs(admin, sql(renameSql))
609+
}
610+
}
611+
612+
test("ALTER TABLE CHANGE PROPERTIES for Iceberg") {
613+
val table = "tablex"
614+
withCleanTmpResources(Seq((s"$catalogV2.$namespace1.$table", "table"))) {
615+
doAs(
616+
admin,
617+
sql(
618+
s"CREATE TABLE $catalogV2.$namespace1.$table" +
619+
s"(id int NOT NULL, name string, city string) USING iceberg"))
620+
val alterTableSetPropertiesSql =
621+
s"""
622+
|ALTER TABLE $catalogV2.$namespace1.$table
623+
|SET TBLPROPERTIES (
624+
| 'read.split.target-size' = '123456'
625+
|)
626+
|""".stripMargin
627+
val alterTableUnsetPropertiesSql =
628+
s"""
629+
|ALTER TABLE $catalogV2.$namespace1.$table
630+
|UNSET TBLPROPERTIES (
631+
| 'read.split.target-size'
632+
| )
633+
|""".stripMargin
634+
interceptEndsWith[AccessControlException] {
635+
doAs(someone, sql(alterTableSetPropertiesSql))
636+
}(s"does not have [alter] privilege on [$namespace1/tablex]")
637+
doAs(admin, sql(alterTableSetPropertiesSql))
638+
interceptEndsWith[AccessControlException] {
639+
doAs(someone, sql(alterTableUnsetPropertiesSql))
640+
}(s"does not have [alter] privilege on [$namespace1/tablex]")
641+
doAs(admin, sql(alterTableUnsetPropertiesSql))
642+
}
643+
}
644+
645+
test("ALTER TABLE CHANGE COLUMN for Iceberg") {
646+
val table = "tablex"
647+
withCleanTmpResources(Seq((s"$catalogV2.$namespace1.$table", "table"))) {
648+
doAs(
649+
admin,
650+
sql(
651+
s"CREATE TABLE $catalogV2.$namespace1.$table" +
652+
s"(id int NOT NULL, name string, city string) USING iceberg"))
653+
val alterTableAddColumnSql =
654+
s"""
655+
|ALTER TABLE $catalogV2.$namespace1.$table
656+
|ADD COLUMN country int;
657+
|""".stripMargin
658+
val alterTableRenameColumnSql =
659+
s"""
660+
|ALTER TABLE $catalogV2.$namespace1.$table
661+
|RENAME COLUMN country to country_code;
662+
|""".stripMargin
663+
val alterTableAlterColumnSql =
664+
s"""
665+
|ALTER TABLE $catalogV2.$namespace1.$table
666+
|ALTER COLUMN country_code type bigint;
667+
|""".stripMargin
668+
val alterTableDropColumnSql =
669+
s"""
670+
|ALTER TABLE $catalogV2.$namespace1.$table
671+
|DROP COLUMN country_code;
672+
|""".stripMargin
673+
interceptEndsWith[AccessControlException] {
674+
doAs(someone, sql(alterTableAddColumnSql))
675+
}(s"does not have [alter] privilege on [$namespace1/tablex]")
676+
doAs(admin, sql(alterTableAddColumnSql))
677+
interceptEndsWith[AccessControlException] {
678+
doAs(someone, sql(alterTableRenameColumnSql))
679+
}(s"does not have [alter] privilege on [$namespace1/tablex]")
680+
doAs(admin, sql(alterTableRenameColumnSql))
681+
interceptEndsWith[AccessControlException] {
682+
doAs(someone, sql(alterTableAlterColumnSql))
683+
}(s"does not have [alter] privilege on [$namespace1/tablex]")
684+
doAs(admin, sql(alterTableAlterColumnSql))
685+
interceptEndsWith[AccessControlException] {
686+
doAs(someone, sql(alterTableDropColumnSql))
687+
}(s"does not have [alter] privilege on [$namespace1/tablex]")
688+
doAs(admin, sql(alterTableDropColumnSql))
689+
}
690+
}
691+
589692
}

extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,48 @@ import org.scalatest.funsuite.AnyFunSuite
3838

3939
import org.apache.kyuubi.Utils
4040
import org.apache.kyuubi.plugin.spark.authz.{AccessControlException, SparkSessionProvider}
41+
import org.apache.kyuubi.plugin.spark.authz.MysqlContainerEnv
4142
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
4243
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
4344
import org.apache.kyuubi.plugin.spark.authz.rule.Authorization.KYUUBI_AUTHZ_TAG
4445
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
4546
import org.apache.kyuubi.util.AssertionUtils._
4647
import org.apache.kyuubi.util.reflect.ReflectUtils._
4748
abstract class RangerSparkExtensionSuite extends AnyFunSuite
48-
with SparkSessionProvider with BeforeAndAfterAll {
49+
with SparkSessionProvider with BeforeAndAfterAll with MysqlContainerEnv {
4950
// scalastyle:on
5051
override protected val extension: SparkSessionExtensions => Unit = new RangerSparkExtension
5152

53+
var mysqlJdbcUrl = ""
54+
var mysqlUsername = ""
55+
var mysqlPassword = ""
56+
var driverClassName = ""
57+
58+
override def getMysqlJdbcUrl: String = mysqlJdbcUrl
59+
60+
override def getMysqlUsername: String = mysqlUsername
61+
62+
override def getMysqlPassword: String = mysqlPassword
63+
64+
override def getDriverClassName: String = driverClassName
65+
5266
override def afterAll(): Unit = {
5367
spark.stop()
5468
super.afterAll()
69+
if (useMysqlEnv) {
70+
stopEngine()
71+
}
72+
}
73+
74+
override def beforeAll(): Unit = {
75+
if (useMysqlEnv) {
76+
startEngine()
77+
this.mysqlJdbcUrl = containerDef.jdbcUrl
78+
this.mysqlUsername = containerDef.username
79+
this.mysqlPassword = containerDef.password
80+
this.driverClassName = containerDef.driverClassName
81+
}
82+
super.beforeAll()
5583
}
5684

5785
protected def errorMessage(

0 commit comments

Comments
 (0)