Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {

def getSchemaHelper(): SchemaHelper

/**
* Mirrors the standard `Connection#setSchema(String)` JDBC API with a leading
* `conn` parameter, allowing dialects to customize the call. The default forwards
* to the standard API; dialects whose driver does not support it (e.g. Phoenix,
* Impala) should override this to a no-op. Callers should treat this as if it
* always takes effect.
*/
def setSchema(conn: Connection, schema: String): Unit = {
conn.setSchema(schema)
}

def cancelStatement(jdbcStatement: Statement): Unit = {
if (jdbcStatement != null) {
jdbcStatement.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,34 @@ class MySQLDialect extends JdbcDialect {
statement
}

override def getCatalogsOperation(): String = {
"SELECT CATALOG_NAME FROM INFORMATION_SCHEMA.SCHEMATA GROUP BY CATALOG_NAME"
}

override def getSchemasOperation(catalog: String, schema: String): String = {
// Alias the MySQL-native columns to the JDBC-standard names expected by
// DatabaseMetaData.getSchemas(): TABLE_CATALOG and TABLE_SCHEM.
val query = new StringBuilder(
"""SELECT CATALOG_NAME AS TABLE_CATALOG, SCHEMA_NAME AS TABLE_SCHEM
|FROM INFORMATION_SCHEMA.SCHEMATA
|""".stripMargin)

val filters = ArrayBuffer[String]()
if (StringUtils.isNotBlank(catalog)) {
filters += s"CATALOG_NAME LIKE '$catalog'"
}
if (StringUtils.isNotBlank(schema)) {
filters += s"SCHEMA_NAME LIKE '$schema'"
}

if (filters.nonEmpty) {
query.append(" WHERE ")
query.append(filters.mkString(" AND "))
}

query.toString()
}

override def getTablesQuery(
catalog: String,
schema: String,
Expand Down Expand Up @@ -122,6 +150,17 @@ class MySQLDialect extends JdbcDialect {
query.toString()
}

override def setSchema(conn: Connection, schema: String): Unit = {
// The MySQL Connector/J `databaseTerm` connection property selects which of
// setCatalog / setSchema actually switches the session database; the other
// call returns without effect for that mode (verified by black-box testing
// and consistent with the reference manual). Invoking both ensures the
// database is switched regardless of how the user configured the URL.
// StarRocksDialect / DorisDialect inherit this via MySQLDialect.
conn.setCatalog(schema)
conn.setSchema(schema)
Comment thread
wangzhigang1999 marked this conversation as resolved.
}

override def getTRowSetGenerator(): JdbcTRowSetGenerator = new MySQLTRowSetGenerator

override def getSchemaHelper(): SchemaHelper = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.kyuubi.engine.jdbc.operation

import java.util

import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_FETCH_SIZE, ENGINE_JDBC_OPERATION_INCREMENTAL_COLLECT}
Expand Down Expand Up @@ -91,7 +93,12 @@ class JdbcOperationManager(conf: KyuubiConf) extends OperationManager("JdbcOpera
schemaName: String,
tableName: String,
tableTypes: util.List[String]): Operation = {
val query = dialect.getTablesQuery(catalogName, schemaName, tableName, tableTypes)
val effectiveSchema = if (StringUtils.isBlank(schemaName) || schemaName == "%") {
session.asInstanceOf[JdbcSessionImpl].effectiveDatabase.orNull
} else {
schemaName
}
val query = dialect.getTablesQuery(catalogName, effectiveSchema, tableName, tableTypes)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall the details of the initial code of this part, but the API looks problematic.

the Hive Thrift GetTablesOperation API is derived from the JDBC API, and that's why they look almost identical, for JDBC engine, such a call should directly map to the JDBC API instead of requesting a SQL (if you take a look at the JDBC driver implementation, you may find some impls are non-SQL-based), the current design complicates the whole thing.

Copy link
Copy Markdown
Contributor Author

@wangzhigang1999 wangzhigang1999 Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising this — you're right, and I want to confirm I took it seriously rather than hand-wave it away. To make sure I understood, I ran the full set of DatabaseMetaData calls against three different drivers (mysql-connector-j 8.4.0, postgresql 42.7.2, clickhouse-jdbc 0.6.5) using a small Java probe and docker containers. Test fixtures were tailored to each backend (MySQL/PG had PK + FK + a function; ClickHouse used MergeTree which has no native PK/FK).

Cross-driver behavior of each API:

API MySQL (databaseTerm=CATALOG) MySQL (databaseTerm=SCHEMA) PostgreSQL ClickHouse
getCatalogs() ✓ implemented ⚠ empty (driver databaseTerm selection) ✓ implemented ✓ implemented
getSchemas() ⚠ empty (driver databaseTerm selection) ✓ implemented ✓ implemented — model has no schema layer
getTableTypes()
getTables(...)
getColumns(...)
getPrimaryKeys — MergeTree has no PK
getImported / Exported / CrossReference — no FK in CK
getFunctions(...) (no fixture created)
getTypeInfo()

Things that hold across all three drivers:

  • Every JDBC-standard metadata method is implemented — none threw SQLFeatureNotSupportedException; concepts that don't apply just return empty result sets. (The JDBC contract requires this; clients differ in how strictly they enforce it.)
  • Column names follow the JDBC spec (TABLE_CAT, TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, DATA_TYPE, KEY_SEQ, PK_NAME, ...) — which is exactly what GetTablesOperation / GetColumnsOperation / etc. on the Hive Thrift side expect, since the Thrift API was derived from the JDBC API in the first place.
  • Driver-specific quirks are minor and well-defined — the databaseTerm selection is unique to mysql-connector-j; PG returns column labels in lowercase while MySQL/CK use uppercase (JDBC spec says case-insensitive, but engine-side string comparisons would need to match).

So the JDBC standard API really does cover every operation the dialect models today, across very different DBMS families. Concretely, this means the JdbcDialect.getXxxQuery family of methods could be deleted in favor of the engine calling connection.getMetaData.getXxx(...) directly — and dialects whose driver implements metadata via a non-SQL path (Avatica, etc.) would automatically be used the way they were designed to.

The few wrinkles I hit are all driver-side and resolvable in a few lines of engine-level adapter code, not by reaching for SQL:

  • getCatalogs / getSchemas are selected by mysql-connector-j's databaseTerm property (only one returns rows in a given mode) — handled by checking the property and routing, or by calling both and taking whichever is non-empty.
  • DATA_TYPE is int in the JDBC spec but the current SQL returns a string — java.sql.Types-to-name mapping is one helper.
  • The current SQL returns extra MySQL-specific columns (ENGINE, TABLE_ROWS, ...) but the Hive Thrift GetTables result schema is fixed to JDBC-standard columns, so those extras are already dropped during serialization today and aren't reaching clients — no real loss.

I'd like to keep this PR scoped to the URL-database bug since that's what it set out to fix, and migrating the dialect API to call DatabaseMetaData directly will touch every dialect, the operation manager, and every metadata test — not something I'd want to fold into a bug fix. Happy to open a follow-up issue tracking the migration if you agree with that split. WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

migrating the dialect API to call DatabaseMetaData directly will touch every dialect, the operation manager, and every metadata test

if we do that first, do we still need this patch?

Copy link
Copy Markdown
Member

@pan3793 pan3793 Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No SQLFeatureNotSupportedException anywhere

Actually, the JDBC API Javadocs define behavior in a very clear way, it should not throw SQLFeatureNotSupportedException unless Javadocs say it can. Some tools, like DBeaver, may tolerate it if the JDBC driver vendor does not respect the API contract, but tools like DataGrip may not.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we MUST avoid copying the code from mysql-connector-j, it's GPL-licensed ... try to read the docs or use black box testing to tackle the mysql-connector-j-related issue. (this will be a horrible experience ...)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

migrating the dialect API to call DatabaseMetaData directly will touch every dialect, the operation manager, and every metadata test

if we do that first, do we still need this patch?

Yes — the bug is the Hive JDBC client converting null schemaPattern to "%", and JDBC spec says "%" means "don't narrow by schema", so calling DatabaseMetaData.getTables(...) directly reproduces it. Verified on mysql-connector-j 8.4.0 (connection opened against jdbc:mysql://.../dbA):

conn.getCatalog() == "dbA"
getTables(null, "%",  "%", null) → tables from dbA AND dbB   ← bug reproduces
getTables("dbA", null, "%", null) → tables from dbA only     ← only narrows when explicit

So the engine still has to substitute the URL-scoped database into the call when the client sends null/"%". That routing logic (effectiveDatabase + the if-blank-then-route block in JdbcOperationManager) survives the refactor unchanged — only the line that builds a SQL string flips to conn.getMetaData.getTables(...). The setSchema-on-session-open machinery is also independent.

What the refactor does subsume: the two getCatalogsOperation / getSchemasOperation SQL implementations I added to MySQLDialect — those go away once the engine calls getCatalogs() / getSchemas() directly.

Happy either way on ordering — bug fix first then refactor, or refactor first and a smaller patch on top.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we MUST avoid copying the code from mysql-connector-j, it's GPL-licensed ... try to read the docs or use black box testing to tackle the mysql-connector-j-related issue. (this will be a horrible experience ...)

Agreed, and thanks for catching this early. I've rewritten all my prior PR comments and the in-tree comment in MySQLDialect.setSchema to remove any reference / paraphrase of mysql-connector-j source — keeping only the documented databaseTerm property and black-box test observations. Also amended the FOLLOWUP commit message similarly. Will follow this rule going forward.

val normalizedConf = session.asInstanceOf[JdbcSessionImpl].normalizedConf
val fetchSize = normalizedConf.get(ENGINE_JDBC_FETCH_SIZE.key).map(_.toInt)
.getOrElse(session.sessionManager.getConf.get(ENGINE_JDBC_FETCH_SIZE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.jdbc.connection.ConnectionProvider
import org.apache.kyuubi.engine.jdbc.dialect.JdbcDialects
import org.apache.kyuubi.engine.jdbc.util.KyuubiJdbcUtils
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager, USE_DATABASE}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}

class JdbcSessionImpl(
Expand All @@ -45,6 +46,16 @@ class JdbcSessionImpl(

private var databaseMetaData: DatabaseMetaData = _

/**
* The database the client requested via `USE_DATABASE` at session open. Used by
* `JdbcOperationManager` as the metadata schema filter. `None` when no `USE_DATABASE`
* was requested, or when the request was the literal `"default"`. The Hive JDBC
* driver sends `"default"` as a protocol stub when the user did not specify a
* database in the URL, so it cannot be distinguished from a genuine request and
* is treated as "no scope".
*/
private[jdbc] var effectiveDatabase: Option[String] = None

val sessionConf: KyuubiConf = normalizeConf

private def normalizeConf: KyuubiConf = {
Expand All @@ -67,6 +78,14 @@ class JdbcSessionImpl(
sessionConf,
sessionConnection,
sessionConf.get(ENGINE_JDBC_SESSION_INITIALIZE_SQL))
// The Hive JDBC driver sends `"default"` as USE_DATABASE when the user did not
// specify a database in the URL: a protocol stub indistinguishable from a real
// request. Filter it here so we don't push a non-existent / unintended schema
// filter into metadata operations on backends without a `default` database.
conf.get(USE_DATABASE).filter(_ != "default").foreach { database =>
JdbcDialects.get(sessionConf).setSchema(sessionConnection, database)
effectiveDatabase = Some(database)
}
super.open()
info(s"The jdbc session is started.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kyuubi.engine.jdbc.mysql

import java.sql.ResultSet
import java.sql.{DriverManager, ResultSet}

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -250,4 +250,102 @@ abstract class MySQLOperationSuite extends WithMySQLEngine with HiveJDBCTestHelp
statement.execute("drop database db2")
}
}

test("mysql - getTables scopes to database from connection URL (KYUUBI #7305)") {
withJdbcStatement() { statement =>
statement.execute("create database if not exists db7305a")
statement.execute("create table db7305a.ta(id bigint)" +
"ENGINE=InnoDB DEFAULT CHARSET=utf8;")
statement.execute("create database if not exists db7305b")
statement.execute("create table db7305b.tb(id bigint)" +
"ENGINE=InnoDB DEFAULT CHARSET=utf8;")
}

val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head
val scopedUrl = s"jdbc:hive2://$hostAndPort/db7305a;"
val conn = DriverManager.getConnection(scopedUrl, user, password)
try {
val tables = conn.getMetaData.getTables(null, null, "t%", null)
val found = ArrayBuffer[(String, String)]()
while (tables.next()) {
found += ((tables.getString(TABLE_SCHEMA), tables.getString(TABLE_NAME)))
}
assert(found.contains(("db7305a", "ta")))
assert(!found.exists { case (schema, _) => schema == "db7305b" })

// Hive driver converts schemaPattern=null to "%", and the fix routes "%" back to
// the effective database, so the behavior should match the call above.
val tablesPct = conn.getMetaData.getTables(null, "%", "t%", null)
val foundPct = ArrayBuffer[(String, String)]()
while (tablesPct.next()) {
foundPct += ((tablesPct.getString(TABLE_SCHEMA), tablesPct.getString(TABLE_NAME)))
}
assert(foundPct.contains(("db7305a", "ta")))
assert(!foundPct.exists { case (schema, _) => schema == "db7305b" })
} finally {
conn.close()
}

withJdbcStatement() { statement =>
statement.execute("drop table db7305a.ta")
statement.execute("drop database db7305a")
statement.execute("drop table db7305b.tb")
statement.execute("drop database db7305b")
}
}

test("mysql - getTables returns all tables when no database in URL") {
withJdbcStatement() { statement =>
statement.execute("create database if not exists db7305c")
statement.execute("create table db7305c.tc(id bigint)" +
"ENGINE=InnoDB DEFAULT CHARSET=utf8;")
statement.execute("create database if not exists db7305d")
statement.execute("create table db7305d.td(id bigint)" +
"ENGINE=InnoDB DEFAULT CHARSET=utf8;")
}

withJdbcStatement() { statement =>
val tables = statement.getConnection.getMetaData.getTables(null, null, "t%", null)
val found = ArrayBuffer[(String, String)]()
while (tables.next()) {
found += ((tables.getString(TABLE_SCHEMA), tables.getString(TABLE_NAME)))
}
// Without USE_DATABASE, the fix must not filter, so both dbs are visible.
assert(found.contains(("db7305c", "tc")))
assert(found.contains(("db7305d", "td")))

statement.execute("drop table db7305c.tc")
statement.execute("drop database db7305c")
statement.execute("drop table db7305d.td")
statement.execute("drop database db7305d")
}
}

test("mysql - getSchemas returns all schemas (KYUUBI #7305)") {
withJdbcStatement() { statement =>
statement.execute("create database if not exists db7305e")
statement.execute("create database if not exists db7305f")
}

withJdbcStatement() { statement =>
val rs = statement.getConnection.getMetaData.getSchemas
val schemas = ArrayBuffer[String]()
while (rs.next()) schemas += rs.getString("TABLE_SCHEM")
assert(schemas.contains("db7305e"))
assert(schemas.contains("db7305f"))

statement.execute("drop database db7305e")
statement.execute("drop database db7305f")
}
}

test("mysql - session open fails when URL specifies a non-existent database") {
val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head
val badUrl = s"jdbc:hive2://$hostAndPort/does_not_exist_db_7305;"
val ex = intercept[java.sql.SQLException] {
DriverManager.getConnection(badUrl, user, password).close()
}
// Error surface could come from the engine or JDBC driver; we only assert it is raised.
assert(ex != null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.kyuubi.engine.jdbc.starrocks

import java.sql.ResultSet
import java.sql.{DriverManager, ResultSet}

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -258,4 +258,86 @@ abstract class StarRocksOperationSuite extends WithStarRocksEngine with HiveJDBC
statement.execute("drop database db2")
}
}

test("starrocks - getTables scopes to database from connection URL (KYUUBI #7305)") {
withJdbcStatement() { statement =>
statement.execute("create database if not exists db7305a")
statement.execute("create table db7305a.ta(id bigint)" +
"ENGINE=OLAP DISTRIBUTED BY HASH(`id`) BUCKETS 32 " +
"PROPERTIES ('replication_num' = '1')")
statement.execute("create database if not exists db7305b")
statement.execute("create table db7305b.tb(id bigint)" +
"ENGINE=OLAP DISTRIBUTED BY HASH(`id`) BUCKETS 32 " +
"PROPERTIES ('replication_num' = '1')")
}

val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head
val scopedUrl = s"jdbc:hive2://$hostAndPort/db7305a;"
val conn = DriverManager.getConnection(scopedUrl, user, password)
try {
val tables = conn.getMetaData.getTables(null, null, "t%", null)
val found = ArrayBuffer[(String, String)]()
while (tables.next()) {
found += ((tables.getString(TABLE_SCHEMA), tables.getString(TABLE_NAME)))
}
assert(found.contains(("db7305a", "ta")))
assert(!found.exists { case (schema, _) => schema == "db7305b" })

// Hive driver converts schemaPattern=null to "%", and the fix routes "%" back to
// the effective database, so the behavior should match the call above.
val tablesPct = conn.getMetaData.getTables(null, "%", "t%", null)
val foundPct = ArrayBuffer[(String, String)]()
while (tablesPct.next()) {
foundPct += ((tablesPct.getString(TABLE_SCHEMA), tablesPct.getString(TABLE_NAME)))
}
assert(foundPct.contains(("db7305a", "ta")))
assert(!foundPct.exists { case (schema, _) => schema == "db7305b" })
} finally {
conn.close()
}

withJdbcStatement() { statement =>
statement.execute("drop table db7305a.ta")
statement.execute("drop database db7305a")
statement.execute("drop table db7305b.tb")
statement.execute("drop database db7305b")
}
}

test("starrocks - getTables returns all tables when no database in URL") {
withJdbcStatement() { statement =>
statement.execute("create database if not exists db7305c")
statement.execute("create table db7305c.tc(id bigint)" +
"ENGINE=OLAP DISTRIBUTED BY HASH(`id`) BUCKETS 32 " +
"PROPERTIES ('replication_num' = '1')")
statement.execute("create database if not exists db7305d")
statement.execute("create table db7305d.td(id bigint)" +
"ENGINE=OLAP DISTRIBUTED BY HASH(`id`) BUCKETS 32 " +
"PROPERTIES ('replication_num' = '1')")
}

withJdbcStatement() { statement =>
val tables = statement.getConnection.getMetaData.getTables(null, null, "t%", null)
val found = ArrayBuffer[(String, String)]()
while (tables.next()) {
found += ((tables.getString(TABLE_SCHEMA), tables.getString(TABLE_NAME)))
}
// Without USE_DATABASE, the fix must not filter, so both dbs are visible.
assert(found.contains(("db7305c", "tc")))
assert(found.contains(("db7305d", "td")))

statement.execute("drop table db7305c.tc")
statement.execute("drop database db7305c")
statement.execute("drop table db7305d.td")
statement.execute("drop database db7305d")
}
}

test("starrocks - session open fails when URL specifies a non-existent database") {
val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head
val badUrl = s"jdbc:hive2://$hostAndPort/does_not_exist_db_7305;"
intercept[java.sql.SQLException] {
DriverManager.getConnection(badUrl, user, password).close()
}
}
}
Loading