Skip to content

Commit d01a21b

Browse files
[KYUUBI #7305] Fix JDBC engine returning tables from all databases when a database is specified in the URL
When DBeaver (or any Hive-JDBC client) connects to the Kyuubi JDBC engine with a database path in the URL, DatabaseMetaData.getTables should return only the tables from that database. Previously, the Hive JDBC driver converted a null schemaPattern to "%" before sending the request, which the engine forwarded straight into a WHERE TABLE_SCHEMA LIKE '%' clause on the backend, returning tables from every database. - Introduce JdbcDialect.setCurrentDatabase(connection, database) as an extension point; the default is a no-op returning false. MySQLDialect overrides it with USE `db` so MySQL/StarRocks/Doris switch the backend connection on session open. - JdbcSessionImpl records the successfully applied database in effectiveDatabase and only keeps it when setCurrentDatabase returns true. "default" (the Hive JDBC driver's fallback when no database is specified in the URL) is tolerated on USE failure to stay compatible with backends that have no "default" database. - JdbcOperationManager.newGetTablesOperation treats a blank schema or "%" as "use the session's effective database", routing through effectiveDatabase.orNull. Backends without an effective database (PostgreSQL, Oracle, etc.) keep the previous no-filter behavior. - MySQLDialect now implements getCatalogsOperation and getSchemasOperation (previously featureNotSupported). getSchemasOperation aliases result columns to the JDBC-standard TABLE_CATALOG / TABLE_SCHEM so DatabaseMetaData.getSchemas works with clients such as DBeaver. Closes #7305.
1 parent 3a529eb commit d01a21b

5 files changed

Lines changed: 183 additions & 3 deletions

File tree

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,20 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {
8282

8383
def getSchemaHelper(): SchemaHelper
8484

85+
/**
86+
* Switch the current database on the backend connection.
87+
*
88+
* Called during session open when the client specified a database in the connection URL
89+
* (populated by the Hive JDBC driver into `USE_DATABASE`). Dialects that support an
90+
* in-session database context switch (e.g. MySQL's `USE db`) should override this and
91+
* return `true` on success; the default is a no-op that returns `false`, meaning the
92+
* backend has no notion of "current database" that applies to this session.
93+
*
94+
* The return value is used by `JdbcOperationManager` to decide whether the requested
95+
* database can be treated as the effective schema filter for metadata operations.
96+
*/
97+
def setCurrentDatabase(connection: Connection, database: String): Boolean = false
98+
8599
def cancelStatement(jdbcStatement: Statement): Unit = {
86100
if (jdbcStatement != null) {
87101
jdbcStatement.cancel()

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/MySQLDialect.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,34 @@ class MySQLDialect extends JdbcDialect {
3636
statement
3737
}
3838

39+
override def getCatalogsOperation(): String = {
40+
"SELECT CATALOG_NAME FROM INFORMATION_SCHEMA.SCHEMATA GROUP BY CATALOG_NAME"
41+
}
42+
43+
override def getSchemasOperation(catalog: String, schema: String): String = {
44+
// Alias the MySQL-native columns to the JDBC-standard names expected by
45+
// DatabaseMetaData.getSchemas(): TABLE_CATALOG and TABLE_SCHEM.
46+
val query = new StringBuilder(
47+
"""SELECT CATALOG_NAME AS TABLE_CATALOG, SCHEMA_NAME AS TABLE_SCHEM
48+
|FROM INFORMATION_SCHEMA.SCHEMATA
49+
|""".stripMargin)
50+
51+
val filters = ArrayBuffer[String]()
52+
if (StringUtils.isNotBlank(catalog)) {
53+
filters += s"CATALOG_NAME LIKE '$catalog'"
54+
}
55+
if (StringUtils.isNotBlank(schema)) {
56+
filters += s"SCHEMA_NAME LIKE '$schema'"
57+
}
58+
59+
if (filters.nonEmpty) {
60+
query.append(" WHERE ")
61+
query.append(filters.mkString(" AND "))
62+
}
63+
64+
query.toString()
65+
}
66+
3967
override def getTablesQuery(
4068
catalog: String,
4169
schema: String,
@@ -122,6 +150,16 @@ class MySQLDialect extends JdbcDialect {
122150
query.toString()
123151
}
124152

153+
override def setCurrentDatabase(connection: Connection, database: String): Boolean = {
154+
val stmt = connection.createStatement()
155+
try {
156+
stmt.execute(s"USE `$database`")
157+
true
158+
} finally {
159+
stmt.close()
160+
}
161+
}
162+
125163
override def getTRowSetGenerator(): JdbcTRowSetGenerator = new MySQLTRowSetGenerator
126164

127165
override def getSchemaHelper(): SchemaHelper = {

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperationManager.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package org.apache.kyuubi.engine.jdbc.operation
1818

1919
import java.util
2020

21+
import org.apache.commons.lang3.StringUtils
22+
2123
import org.apache.kyuubi.KyuubiSQLException
2224
import org.apache.kyuubi.config.KyuubiConf
2325
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_FETCH_SIZE, ENGINE_JDBC_OPERATION_INCREMENTAL_COLLECT}
@@ -91,7 +93,12 @@ class JdbcOperationManager(conf: KyuubiConf) extends OperationManager("JdbcOpera
9193
schemaName: String,
9294
tableName: String,
9395
tableTypes: util.List[String]): Operation = {
94-
val query = dialect.getTablesQuery(catalogName, schemaName, tableName, tableTypes)
96+
val effectiveSchema = if (StringUtils.isBlank(schemaName) || schemaName == "%") {
97+
session.asInstanceOf[JdbcSessionImpl].effectiveDatabase.orNull
98+
} else {
99+
schemaName
100+
}
101+
val query = dialect.getTablesQuery(catalogName, effectiveSchema, tableName, tableTypes)
95102
val normalizedConf = session.asInstanceOf[JdbcSessionImpl].normalizedConf
96103
val fetchSize = normalizedConf.get(ENGINE_JDBC_FETCH_SIZE.key).map(_.toInt)
97104
.getOrElse(session.sessionManager.getConf.get(ENGINE_JDBC_FETCH_SIZE))

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/session/JdbcSessionImpl.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@ package org.apache.kyuubi.engine.jdbc.session
1919
import java.sql.{Connection, DatabaseMetaData}
2020

2121
import scala.util.{Failure, Success, Try}
22+
import scala.util.control.NonFatal
2223

2324
import org.apache.kyuubi.KyuubiSQLException
2425
import org.apache.kyuubi.config.KyuubiConf
2526
import org.apache.kyuubi.config.KyuubiConf._
2627
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
2728
import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
29+
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialects
2830
import org.apache.kyuubi.engine.jdbc.util.KyuubiJdbcUtils
29-
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
31+
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager, USE_DATABASE}
3032
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
3133

3234
class JdbcSessionImpl(
@@ -45,6 +47,13 @@ class JdbcSessionImpl(
4547

4648
private var databaseMetaData: DatabaseMetaData = _
4749

50+
/**
51+
* The database that was successfully applied to the backend connection during session open.
52+
* `None` when no `USE_DATABASE` was requested, or when the request was for "default"
53+
* (the Hive JDBC driver's fallback) and the backend had no such database.
54+
*/
55+
private[jdbc] var effectiveDatabase: Option[String] = None
56+
4857
val sessionConf: KyuubiConf = normalizeConf
4958

5059
private def normalizeConf: KyuubiConf = {
@@ -67,6 +76,20 @@ class JdbcSessionImpl(
6776
sessionConf,
6877
sessionConnection,
6978
sessionConf.get(ENGINE_JDBC_SESSION_INITIALIZE_SQL))
79+
conf.get(USE_DATABASE).foreach { database =>
80+
try {
81+
if (JdbcDialects.get(sessionConf).setCurrentDatabase(sessionConnection, database)) {
82+
effectiveDatabase = Some(database)
83+
info(s"Switched to database: $database")
84+
}
85+
} catch {
86+
case NonFatal(e) if database == "default" =>
87+
// The Hive JDBC driver sends "default" when the user didn't specify a database
88+
// in the connection URL. Tolerate USE failure in that case so the session can
89+
// still open against backends that have no "default" database.
90+
warn(s"Failed to switch to database 'default', ignored.", e)
91+
}
92+
}
7093
super.open()
7194
info(s"The jdbc session is started.")
7295
}

externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/MySQLOperationSuite.scala

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.kyuubi.engine.jdbc.mysql
1818

19-
import java.sql.ResultSet
19+
import java.sql.{DriverManager, ResultSet}
2020

2121
import scala.collection.mutable.ArrayBuffer
2222

@@ -250,4 +250,102 @@ abstract class MySQLOperationSuite extends WithMySQLEngine with HiveJDBCTestHelp
250250
statement.execute("drop database db2")
251251
}
252252
}
253+
254+
test("mysql - getTables scopes to database from connection URL (KYUUBI #7305)") {
255+
withJdbcStatement() { statement =>
256+
statement.execute("create database if not exists db7305a")
257+
statement.execute("create table db7305a.ta(id bigint)" +
258+
"ENGINE=InnoDB DEFAULT CHARSET=utf8;")
259+
statement.execute("create database if not exists db7305b")
260+
statement.execute("create table db7305b.tb(id bigint)" +
261+
"ENGINE=InnoDB DEFAULT CHARSET=utf8;")
262+
}
263+
264+
val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head
265+
val scopedUrl = s"jdbc:hive2://$hostAndPort/db7305a;"
266+
val conn = DriverManager.getConnection(scopedUrl, user, password)
267+
try {
268+
val tables = conn.getMetaData.getTables(null, null, "t%", null)
269+
val found = ArrayBuffer[(String, String)]()
270+
while (tables.next()) {
271+
found += ((tables.getString(TABLE_SCHEMA), tables.getString(TABLE_NAME)))
272+
}
273+
assert(found.contains(("db7305a", "ta")))
274+
assert(!found.exists { case (schema, _) => schema == "db7305b" })
275+
276+
// Hive driver converts schemaPattern=null to "%", and the fix routes "%" back to
277+
// the effective database, so the behavior should match the call above.
278+
val tablesPct = conn.getMetaData.getTables(null, "%", "t%", null)
279+
val foundPct = ArrayBuffer[(String, String)]()
280+
while (tablesPct.next()) {
281+
foundPct += ((tablesPct.getString(TABLE_SCHEMA), tablesPct.getString(TABLE_NAME)))
282+
}
283+
assert(foundPct.contains(("db7305a", "ta")))
284+
assert(!foundPct.exists { case (schema, _) => schema == "db7305b" })
285+
} finally {
286+
conn.close()
287+
}
288+
289+
withJdbcStatement() { statement =>
290+
statement.execute("drop table db7305a.ta")
291+
statement.execute("drop database db7305a")
292+
statement.execute("drop table db7305b.tb")
293+
statement.execute("drop database db7305b")
294+
}
295+
}
296+
297+
test("mysql - getTables returns all tables when no database in URL") {
298+
withJdbcStatement() { statement =>
299+
statement.execute("create database if not exists db7305c")
300+
statement.execute("create table db7305c.tc(id bigint)" +
301+
"ENGINE=InnoDB DEFAULT CHARSET=utf8;")
302+
statement.execute("create database if not exists db7305d")
303+
statement.execute("create table db7305d.td(id bigint)" +
304+
"ENGINE=InnoDB DEFAULT CHARSET=utf8;")
305+
}
306+
307+
withJdbcStatement() { statement =>
308+
val tables = statement.getConnection.getMetaData.getTables(null, null, "t%", null)
309+
val found = ArrayBuffer[(String, String)]()
310+
while (tables.next()) {
311+
found += ((tables.getString(TABLE_SCHEMA), tables.getString(TABLE_NAME)))
312+
}
313+
// Without USE_DATABASE, the fix must not filter, so both dbs are visible.
314+
assert(found.contains(("db7305c", "tc")))
315+
assert(found.contains(("db7305d", "td")))
316+
317+
statement.execute("drop table db7305c.tc")
318+
statement.execute("drop database db7305c")
319+
statement.execute("drop table db7305d.td")
320+
statement.execute("drop database db7305d")
321+
}
322+
}
323+
324+
test("mysql - getSchemas returns all schemas (KYUUBI #7305)") {
325+
withJdbcStatement() { statement =>
326+
statement.execute("create database if not exists db7305e")
327+
statement.execute("create database if not exists db7305f")
328+
}
329+
330+
withJdbcStatement() { statement =>
331+
val rs = statement.getConnection.getMetaData.getSchemas
332+
val schemas = ArrayBuffer[String]()
333+
while (rs.next()) schemas += rs.getString("TABLE_SCHEM")
334+
assert(schemas.contains("db7305e"))
335+
assert(schemas.contains("db7305f"))
336+
337+
statement.execute("drop database db7305e")
338+
statement.execute("drop database db7305f")
339+
}
340+
}
341+
342+
test("mysql - session open fails when URL specifies a non-existent database") {
343+
val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head
344+
val badUrl = s"jdbc:hive2://$hostAndPort/does_not_exist_db_7305;"
345+
val ex = intercept[java.sql.SQLException] {
346+
DriverManager.getConnection(badUrl, user, password).close()
347+
}
348+
// Error surface could come from the engine or JDBC driver; we only assert it is raised.
349+
assert(ex != null)
350+
}
253351
}

0 commit comments

Comments
 (0)