From e56df8aa517e38f320a4addd24c81fc5abca4209 Mon Sep 17 00:00:00 2001 From: TCeason Date: Mon, 28 Apr 2025 17:59:47 +0800 Subject: [PATCH 1/2] refactor(query): iceberg catalog support information_shcema/system database --- Cargo.lock | 1 + .../management/src/procedure/procedure_mgr.rs | 3 +- .../src/catalogs/default/database_catalog.rs | 2 +- .../src/catalogs/default/immutable_catalog.rs | 18 +- .../src/catalogs/iceberg/iceberg_catalog.rs | 609 ++++++++++++++++++ src/query/service/src/catalogs/iceberg/mod.rs | 17 + src/query/service/src/catalogs/mod.rs | 3 + .../information_schema_database.rs | 16 +- .../src/databases/system/system_database.rs | 91 +-- src/query/service/src/global_services.rs | 3 +- .../tests/it/catalogs/immutable_catalogs.rs | 2 +- .../it/databases/system/system_database.rs | 4 +- src/query/service/tests/it/storages/system.rs | 4 +- .../sql/src/planner/binder/ddl/catalog.rs | 2 +- .../sql/src/planner/binder/ddl/database.rs | 16 +- src/query/sql/src/planner/binder/ddl/table.rs | 26 +- src/query/storages/iceberg/src/catalog.rs | 14 +- src/query/storages/iceberg/src/database.rs | 6 +- src/query/storages/iceberg/src/lib.rs | 4 +- .../storages/information_schema/Cargo.toml | 1 + .../information_schema/src/columns_table.rs | 20 +- .../src/key_column_usage_table.rs | 13 +- .../information_schema/src/keywords_table.rs | 13 +- .../information_schema/src/schemata_table.rs | 20 +- .../src/statistics_table.rs | 13 +- .../information_schema/src/tables_table.rs | 20 +- .../information_schema/src/views_table.rs | 20 +- .../storages/system/src/databases_table.rs | 33 +- src/query/storages/system/src/lib.rs | 1 + src/query/storages/system/src/locks_table.rs | 24 +- .../storages/system/src/streams_table.rs | 399 ++++++------ src/query/storages/system/src/tables_table.rs | 59 +- src/query/storages/system/src/util.rs | 25 + .../suites/tpch_iceberg/utils.test | 6 + .../00_rest/00_0000_create_and_show.result | 2 + 35 files changed, 1157 insertions(+), 353 deletions(-) create mode 100644 src/query/service/src/catalogs/iceberg/iceberg_catalog.rs create mode 100644 src/query/service/src/catalogs/iceberg/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 1273c81935324..3ba4de576a5d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4273,6 +4273,7 @@ dependencies = [ "databend-common-ast", "databend-common-catalog", "databend-common-meta-app", + "databend-common-storages-system", "databend-common-storages-view", ] diff --git a/src/query/management/src/procedure/procedure_mgr.rs b/src/query/management/src/procedure/procedure_mgr.rs index 546d2ef1f6ac6..df9ea2dc15b25 100644 --- a/src/query/management/src/procedure/procedure_mgr.rs +++ b/src/query/management/src/procedure/procedure_mgr.rs @@ -103,7 +103,7 @@ impl ProcedureMgr { Ok(dropped) } - //#[fastrace::trace] + #[fastrace::trace] pub async fn get_procedure( &self, req: &GetProcedureReq, @@ -121,6 +121,7 @@ impl ProcedureMgr { procedure_meta: seq_meta.data, })) } + #[fastrace::trace] pub async fn list_procedures( &self, diff --git a/src/query/service/src/catalogs/default/database_catalog.rs b/src/query/service/src/catalogs/default/database_catalog.rs index fcb68ba79c212..7cf794755c367 100644 --- a/src/query/service/src/catalogs/default/database_catalog.rs +++ b/src/query/service/src/catalogs/default/database_catalog.rs @@ -141,7 +141,7 @@ impl Debug for DatabaseCatalog { impl DatabaseCatalog { #[async_backtrace::framed] pub async fn try_create_with_config(conf: InnerConfig) -> Result { - let immutable_catalog = ImmutableCatalog::try_create_with_config(&conf).await?; + let immutable_catalog = ImmutableCatalog::try_create_with_config(Some(&conf), None)?; let mutable_catalog = MutableCatalog::try_create_with_config(conf).await?; let session_catalog = SessionCatalog::create(mutable_catalog, SessionState::default()); let table_function_factory = TableFunctionFactory::create(); diff --git a/src/query/service/src/catalogs/default/immutable_catalog.rs b/src/query/service/src/catalogs/default/immutable_catalog.rs index 3f7ef2bc68610..7be7778b513d2 100644 --- a/src/query/service/src/catalogs/default/immutable_catalog.rs +++ b/src/query/service/src/catalogs/default/immutable_catalog.rs @@ -122,14 +122,22 @@ impl Debug for ImmutableCatalog { impl ImmutableCatalog { #[async_backtrace::framed] - pub async fn try_create_with_config(conf: &InnerConfig) -> Result { + pub fn try_create_with_config( + conf: Option<&InnerConfig>, + catalog_name: Option<&String>, + ) -> Result { // The global db meta. let mut sys_db_meta = InMemoryMetas::create(SYS_DB_ID_BEGIN, SYS_TBL_ID_BEGIN); sys_db_meta.init_db("system"); sys_db_meta.init_db("information_schema"); - let sys_db = SystemDatabase::create(&mut sys_db_meta, conf); - let info_schema_db = InformationSchemaDatabase::create(&mut sys_db_meta); + let catalog_name = if let Some(ctl_name) = catalog_name { + ctl_name + } else { + "default" + }; + let sys_db = SystemDatabase::create(&mut sys_db_meta, conf, catalog_name); + let info_schema_db = InformationSchemaDatabase::create(&mut sys_db_meta, catalog_name); Ok(Self { info_schema_db: Arc::new(info_schema_db), @@ -472,8 +480,6 @@ impl Catalog for ImmutableCatalog { )) } - // Table index - #[async_backtrace::framed] async fn create_index(&self, _req: CreateIndexReq) -> Result { unimplemented!() @@ -512,8 +518,6 @@ impl Catalog for ImmutableCatalog { unimplemented!() } - // Virtual column - #[async_backtrace::framed] async fn create_virtual_column(&self, _req: CreateVirtualColumnReq) -> Result<()> { unimplemented!() diff --git a/src/query/service/src/catalogs/iceberg/iceberg_catalog.rs b/src/query/service/src/catalogs/iceberg/iceberg_catalog.rs new file mode 100644 index 0000000000000..a35c691f4ebaa --- /dev/null +++ b/src/query/service/src/catalogs/iceberg/iceberg_catalog.rs @@ -0,0 +1,609 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use databend_common_ast::ast::Engine; +use databend_common_catalog::catalog::Catalog; +use databend_common_catalog::catalog::CatalogCreator; +use databend_common_catalog::catalog::StorageDescription; +use databend_common_catalog::database::Database; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; +use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent; +use databend_common_meta_app::schema::CatalogInfo; +use databend_common_meta_app::schema::CreateDatabaseReply; +use databend_common_meta_app::schema::CreateDatabaseReq; +use databend_common_meta_app::schema::CreateDictionaryReply; +use databend_common_meta_app::schema::CreateDictionaryReq; +use databend_common_meta_app::schema::CreateIndexReply; +use databend_common_meta_app::schema::CreateIndexReq; +use databend_common_meta_app::schema::CreateLockRevReply; +use databend_common_meta_app::schema::CreateLockRevReq; +use databend_common_meta_app::schema::CreateSequenceReply; +use databend_common_meta_app::schema::CreateSequenceReq; +use databend_common_meta_app::schema::CreateTableIndexReq; +use databend_common_meta_app::schema::CreateTableReply; +use databend_common_meta_app::schema::CreateTableReq; +use databend_common_meta_app::schema::CreateVirtualColumnReq; +use databend_common_meta_app::schema::DeleteLockRevReq; +use databend_common_meta_app::schema::DictionaryMeta; +use databend_common_meta_app::schema::DropDatabaseReply; +use databend_common_meta_app::schema::DropDatabaseReq; +use databend_common_meta_app::schema::DropIndexReq; +use databend_common_meta_app::schema::DropSequenceReply; +use databend_common_meta_app::schema::DropSequenceReq; +use databend_common_meta_app::schema::DropTableByIdReq; +use databend_common_meta_app::schema::DropTableIndexReq; +use databend_common_meta_app::schema::DropTableReply; +use databend_common_meta_app::schema::DropVirtualColumnReq; +use databend_common_meta_app::schema::ExtendLockRevReq; +use databend_common_meta_app::schema::GetDictionaryReply; +use databend_common_meta_app::schema::GetIndexReply; +use databend_common_meta_app::schema::GetIndexReq; +use databend_common_meta_app::schema::GetSequenceNextValueReply; +use databend_common_meta_app::schema::GetSequenceNextValueReq; +use databend_common_meta_app::schema::GetSequenceReply; +use databend_common_meta_app::schema::GetSequenceReq; +use databend_common_meta_app::schema::GetTableCopiedFileReply; +use databend_common_meta_app::schema::GetTableCopiedFileReq; +use databend_common_meta_app::schema::ListDictionaryReq; +use databend_common_meta_app::schema::ListLockRevReq; +use databend_common_meta_app::schema::ListLocksReq; +use databend_common_meta_app::schema::ListVirtualColumnsReq; +use databend_common_meta_app::schema::LockInfo; +use databend_common_meta_app::schema::LockMeta; +use databend_common_meta_app::schema::RenameDatabaseReply; +use databend_common_meta_app::schema::RenameDatabaseReq; +use databend_common_meta_app::schema::RenameDictionaryReq; +use databend_common_meta_app::schema::RenameTableReply; +use databend_common_meta_app::schema::RenameTableReq; +use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; +use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; +use databend_common_meta_app::schema::TableInfo; +use databend_common_meta_app::schema::TableMeta; +use databend_common_meta_app::schema::TruncateTableReply; +use databend_common_meta_app::schema::TruncateTableReq; +use databend_common_meta_app::schema::UndropDatabaseReply; +use databend_common_meta_app::schema::UndropDatabaseReq; +use databend_common_meta_app::schema::UndropTableReq; +use databend_common_meta_app::schema::UpdateDictionaryReply; +use databend_common_meta_app::schema::UpdateDictionaryReq; +use databend_common_meta_app::schema::UpdateIndexReply; +use databend_common_meta_app::schema::UpdateIndexReq; +use databend_common_meta_app::schema::UpdateVirtualColumnReq; +use databend_common_meta_app::schema::UpsertTableOptionReply; +use databend_common_meta_app::schema::UpsertTableOptionReq; +use databend_common_meta_app::schema::VirtualColumnMeta; +use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_app::KeyWithTenant; +use databend_common_meta_types::MetaId; +use databend_common_meta_types::SeqV; +use databend_common_storages_iceberg::IcebergMutableCatalog; +use log::info; + +use crate::catalogs::default::ImmutableCatalog; +use crate::storages::Table; + +#[derive(Debug)] +pub struct IcebergCreator; + +impl CatalogCreator for IcebergCreator { + fn try_create(&self, info: Arc) -> Result> { + let catalog_name = &info.name_ident.catalog_name; + let res = IcebergCatalog { + immutable_catalog: Arc::new(ImmutableCatalog::try_create_with_config( + None, + Some(catalog_name), + )?), + iceberg_catalog: Arc::new(IcebergMutableCatalog::try_create(info)?), + }; + Ok(Arc::new(res)) + } +} + +/// Combine two catalogs together +/// - read/search like operations are always performed at +/// upper layer first, and bottom layer later(if necessary) +/// - metadata are written to the bottom layer +#[derive(Clone)] +pub struct IcebergCatalog { + /// the upper layer, read only + immutable_catalog: Arc, + /// bottom layer, writing goes here + iceberg_catalog: Arc, +} + +impl Debug for IcebergCatalog { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("IcebergCatalog").finish_non_exhaustive() + } +} + +#[async_trait::async_trait] +impl Catalog for IcebergCatalog { + fn name(&self) -> String { + self.iceberg_catalog.info().name_ident.catalog_name.clone() + } + + fn info(&self) -> Arc { + self.iceberg_catalog.info().clone() + } + + fn disable_table_info_refresh(self: Arc) -> Result> { + Ok(self) + } + + #[async_backtrace::framed] + async fn get_database(&self, tenant: &Tenant, db_name: &str) -> Result> { + let r = self.immutable_catalog.get_database(tenant, db_name).await; + match r { + Err(e) => { + if e.code() == ErrorCode::UNKNOWN_DATABASE { + self.iceberg_catalog.get_database(tenant, db_name).await + } else { + Err(e) + } + } + Ok(db) => Ok(db), + } + } + + #[async_backtrace::framed] + async fn list_databases_history(&self, _tenant: &Tenant) -> Result>> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn list_databases(&self, tenant: &Tenant) -> Result>> { + let mut dbs = self.immutable_catalog.list_databases(tenant).await?; + let mut other = self.iceberg_catalog.list_databases(tenant).await?; + dbs.append(&mut other); + Ok(dbs) + } + + #[async_backtrace::framed] + async fn create_database(&self, req: CreateDatabaseReq) -> Result { + info!("Create database from req:{:?}", req); + + if self + .immutable_catalog + .exists_database(req.name_ident.tenant(), req.name_ident.database_name()) + .await? + { + return Err(ErrorCode::DatabaseAlreadyExists(format!( + "{} database exists", + req.name_ident.database_name() + ))); + } + // create db in BOTTOM layer only + self.iceberg_catalog.create_database(req).await + } + + #[async_backtrace::framed] + async fn drop_database(&self, req: DropDatabaseReq) -> Result { + info!("Drop database from req:{:?}", req); + + // drop db in BOTTOM layer only + if self + .immutable_catalog + .exists_database(req.name_ident.tenant(), req.name_ident.database_name()) + .await? + { + return self.immutable_catalog.drop_database(req).await; + } + self.iceberg_catalog.drop_database(req).await + } + + #[async_backtrace::framed] + async fn undrop_database(&self, _req: UndropDatabaseReq) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn create_index(&self, _req: CreateIndexReq) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn drop_index(&self, _req: DropIndexReq) -> Result<()> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn get_index(&self, _req: GetIndexReq) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn update_index(&self, _req: UpdateIndexReq) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn create_virtual_column(&self, _req: CreateVirtualColumnReq) -> Result<()> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn update_virtual_column(&self, _req: UpdateVirtualColumnReq) -> Result<()> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn drop_virtual_column(&self, _req: DropVirtualColumnReq) -> Result<()> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn list_virtual_columns( + &self, + _req: ListVirtualColumnsReq, + ) -> Result> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn rename_database(&self, _req: RenameDatabaseReq) -> Result { + unimplemented!() + } + + fn get_table_by_info(&self, table_info: &TableInfo) -> Result> { + let res = self.immutable_catalog.get_table_by_info(table_info); + match res { + Ok(t) => Ok(t), + Err(e) => { + if e.code() == ErrorCode::UNKNOWN_TABLE { + self.iceberg_catalog.get_table_by_info(table_info) + } else { + Err(e) + } + } + } + } + + #[async_backtrace::framed] + async fn get_table_meta_by_id(&self, _table_id: MetaId) -> Result>> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn mget_table_names_by_ids( + &self, + _tenant: &Tenant, + _table_ids: &[MetaId], + _get_dropped_table: bool, + ) -> Result>> { + Err(ErrorCode::Unimplemented( + "Cannot get tables name by ids in ICEBERG catalog", + )) + } + + #[async_backtrace::framed] + async fn get_db_name_by_id(&self, _db_id: MetaId) -> Result { + Err(ErrorCode::Unimplemented( + "Cannot get db name by id in ICEBERG catalog", + )) + } + + async fn mget_databases( + &self, + _tenant: &Tenant, + _db_names: &[DatabaseNameIdent], + ) -> Result>> { + Err(ErrorCode::Unimplemented( + "Cannot mget databases in ICEBERG catalog", + )) + } + + #[async_backtrace::framed] + async fn mget_database_names_by_ids( + &self, + _tenant: &Tenant, + _db_ids: &[MetaId], + ) -> Result>> { + Err(ErrorCode::Unimplemented( + "Cannot get dbs name by ids in ICEBERG catalog", + )) + } + + #[async_backtrace::framed] + async fn get_table_name_by_id(&self, _table_id: MetaId) -> Result> { + Err(ErrorCode::Unimplemented( + "Cannot get tables name by ids in ICEBERG catalog", + )) + } + + fn support_partition(&self) -> bool { + true + } + + fn is_external(&self) -> bool { + true + } + + fn get_table_engines(&self) -> Vec { + vec![] + } + + fn default_table_engine(&self) -> Engine { + Engine::Iceberg + } + + #[async_backtrace::framed] + async fn get_table( + &self, + tenant: &Tenant, + db_name: &str, + table_name: &str, + ) -> Result> { + let res = self + .immutable_catalog + .get_table(tenant, db_name, table_name) + .await; + match res { + Ok(v) => Ok(v), + Err(e) => { + if e.code() == ErrorCode::UNKNOWN_DATABASE { + self.iceberg_catalog + .get_table(tenant, db_name, table_name) + .await + } else { + Err(e) + } + } + } + } + + #[async_backtrace::framed] + async fn get_table_history( + &self, + _tenant: &Tenant, + _db_name: &str, + _table_name: &str, + ) -> Result>> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn list_tables(&self, tenant: &Tenant, db_name: &str) -> Result>> { + let r = self.immutable_catalog.list_tables(tenant, db_name).await; + match r { + Ok(x) => Ok(x), + Err(e) => { + if e.code() == ErrorCode::UNKNOWN_DATABASE { + self.iceberg_catalog.list_tables(tenant, db_name).await + } else { + Err(e) + } + } + } + } + + #[async_backtrace::framed] + async fn list_tables_names(&self, tenant: &Tenant, db_name: &str) -> Result> { + let r = self + .immutable_catalog + .list_tables_names(tenant, db_name) + .await; + match r { + Ok(x) => Ok(x), + Err(e) => { + if e.code() == ErrorCode::UNKNOWN_DATABASE { + self.iceberg_catalog + .list_tables_names(tenant, db_name) + .await + } else { + Err(e) + } + } + } + } + + #[async_backtrace::framed] + async fn list_tables_history( + &self, + _tenant: &Tenant, + _db_name: &str, + ) -> Result>> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn create_table(&self, req: CreateTableReq) -> Result { + info!("Create table from req:{:?}", req); + + if self + .immutable_catalog + .exists_database(req.tenant(), req.db_name()) + .await? + { + return self.immutable_catalog.create_table(req).await; + } + self.iceberg_catalog.create_table(req).await + } + + #[async_backtrace::framed] + async fn drop_table_by_id(&self, req: DropTableByIdReq) -> Result { + let res = self.iceberg_catalog.drop_table_by_id(req).await?; + Ok(res) + } + + #[async_backtrace::framed] + async fn undrop_table(&self, _req: UndropTableReq) -> Result<()> { + unimplemented!() + } + + // Table index + + #[async_backtrace::framed] + async fn rename_table(&self, req: RenameTableReq) -> Result { + info!("Rename table from req:{:?}", req); + + if self + .immutable_catalog + .exists_database(req.tenant(), req.db_name()) + .await? + || self + .immutable_catalog + .exists_database(req.tenant(), &req.new_db_name) + .await? + { + return Err(ErrorCode::Unimplemented( + "Cannot rename table from(to) system databases", + )); + } + + self.iceberg_catalog.rename_table(req).await + } + + #[async_backtrace::framed] + async fn upsert_table_option( + &self, + _tenant: &Tenant, + _db_name: &str, + _req: UpsertTableOptionReq, + ) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn set_table_column_mask_policy( + &self, + _req: SetTableColumnMaskPolicyReq, + ) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn create_table_index(&self, _req: CreateTableIndexReq) -> Result<()> { + unimplemented!() + } + + // Virtual column + + #[async_backtrace::framed] + async fn drop_table_index(&self, _req: DropTableIndexReq) -> Result<()> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn get_table_copied_file_info( + &self, + _tenant: &Tenant, + _db_name: &str, + _req: GetTableCopiedFileReq, + ) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn truncate_table( + &self, + _table_info: &TableInfo, + _req: TruncateTableReq, + ) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn list_lock_revisions(&self, _req: ListLockRevReq) -> Result> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn create_lock_revision(&self, _req: CreateLockRevReq) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn extend_lock_revision(&self, _req: ExtendLockRevReq) -> Result<()> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn delete_lock_revision(&self, _req: DeleteLockRevReq) -> Result<()> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn list_locks(&self, _req: ListLocksReq) -> Result> { + unimplemented!() + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn create_sequence(&self, _req: CreateSequenceReq) -> Result { + unimplemented!() + } + async fn get_sequence(&self, _req: GetSequenceReq) -> Result { + unimplemented!() + } + + async fn get_sequence_next_value( + &self, + _req: GetSequenceNextValueReq, + ) -> Result { + unimplemented!() + } + + async fn drop_sequence(&self, _req: DropSequenceReq) -> Result { + unimplemented!() + } + + /// Dictionary + #[async_backtrace::framed] + async fn create_dictionary(&self, _req: CreateDictionaryReq) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn update_dictionary(&self, _req: UpdateDictionaryReq) -> Result { + unimplemented!() + } + + #[async_backtrace::framed] + async fn drop_dictionary( + &self, + _dict_ident: DictionaryNameIdent, + ) -> Result>> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn get_dictionary( + &self, + _req: DictionaryNameIdent, + ) -> Result> { + unimplemented!() + } + + #[async_backtrace::framed] + async fn list_dictionaries( + &self, + _req: ListDictionaryReq, + ) -> Result> { + unimplemented!() + } + + async fn rename_dictionary(&self, _req: RenameDictionaryReq) -> Result<()> { + unimplemented!() + } +} diff --git a/src/query/service/src/catalogs/iceberg/mod.rs b/src/query/service/src/catalogs/iceberg/mod.rs new file mode 100644 index 0000000000000..8bd9ae7c18086 --- /dev/null +++ b/src/query/service/src/catalogs/iceberg/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod iceberg_catalog; + +pub use iceberg_catalog::IcebergCreator; diff --git a/src/query/service/src/catalogs/mod.rs b/src/query/service/src/catalogs/mod.rs index 7064bfb752e6e..6a7d96b6b9378 100644 --- a/src/query/service/src/catalogs/mod.rs +++ b/src/query/service/src/catalogs/mod.rs @@ -13,7 +13,10 @@ // limitations under the License. pub mod default; +mod iceberg; + pub use databend_common_catalog::catalog::Catalog; pub use databend_common_storages_hive as hive; pub use default::table_memory_meta::InMemoryMetas; pub use default::DatabaseCatalog; +pub use iceberg::IcebergCreator; diff --git a/src/query/service/src/databases/information_schema/information_schema_database.rs b/src/query/service/src/databases/information_schema/information_schema_database.rs index fef701e3a3c2a..13b81ad6cb561 100644 --- a/src/query/service/src/databases/information_schema/information_schema_database.rs +++ b/src/query/service/src/databases/information_schema/information_schema_database.rs @@ -38,15 +38,15 @@ pub struct InformationSchemaDatabase { } impl InformationSchemaDatabase { - pub fn create(sys_db_meta: &mut InMemoryMetas) -> Self { + pub fn create(sys_db_meta: &mut InMemoryMetas, ctl_name: &str) -> Self { let table_list: Vec> = vec![ - ColumnsTable::create(sys_db_meta.next_table_id()), - TablesTable::create(sys_db_meta.next_table_id()), - KeywordsTable::create(sys_db_meta.next_table_id()), - ViewsTable::create(sys_db_meta.next_table_id()), - SchemataTable::create(sys_db_meta.next_table_id()), - StatisticsTable::create(sys_db_meta.next_table_id()), - KeyColumnUsageTable::create(sys_db_meta.next_table_id()), + ColumnsTable::create(sys_db_meta.next_table_id(), ctl_name), + TablesTable::create(sys_db_meta.next_table_id(), ctl_name), + KeywordsTable::create(sys_db_meta.next_table_id(), ctl_name), + ViewsTable::create(sys_db_meta.next_table_id(), ctl_name), + SchemataTable::create(sys_db_meta.next_table_id(), ctl_name), + StatisticsTable::create(sys_db_meta.next_table_id(), ctl_name), + KeyColumnUsageTable::create(sys_db_meta.next_table_id(), ctl_name), ]; let db = "information_schema"; diff --git a/src/query/service/src/databases/system/system_database.rs b/src/query/service/src/databases/system/system_database.rs index 855ea388eafda..fce306cc6dc1a 100644 --- a/src/query/service/src/databases/system/system_database.rs +++ b/src/query/service/src/databases/system/system_database.rs @@ -79,7 +79,6 @@ use databend_common_version::VERGEN_CARGO_FEATURES; use crate::catalogs::InMemoryMetas; use crate::databases::Database; -use crate::storages::Table; #[derive(Clone)] pub struct SystemDatabase { @@ -98,8 +97,12 @@ impl SystemDatabase { map } - pub fn create(sys_db_meta: &mut InMemoryMetas, config: &InnerConfig) -> Self { - let table_list: Vec> = vec![ + pub fn create( + sys_db_meta: &mut InMemoryMetas, + config: Option<&InnerConfig>, + ctl_name: &str, + ) -> Self { + let mut table_list = vec![ OneTable::create(sys_db_meta.next_table_id()), FunctionsTable::create(sys_db_meta.next_table_id()), ContributorsTable::create(sys_db_meta.next_table_id(), DATABEND_COMMIT_AUTHORS), @@ -110,13 +113,11 @@ impl SystemDatabase { DATABEND_CREDITS_LICENSES, ), SettingsTable::create(sys_db_meta.next_table_id()), - TablesTableWithoutHistory::create(sys_db_meta.next_table_id()), - TablesTableWithHistory::create(sys_db_meta.next_table_id()), + TablesTableWithoutHistory::create(sys_db_meta.next_table_id(), ctl_name), ClustersTable::create(sys_db_meta.next_table_id()), - DatabasesTableWithHistory::create(sys_db_meta.next_table_id()), - DatabasesTableWithoutHistory::create(sys_db_meta.next_table_id()), - FullStreamsTable::create(sys_db_meta.next_table_id()), - TerseStreamsTable::create(sys_db_meta.next_table_id()), + DatabasesTableWithoutHistory::create(sys_db_meta.next_table_id(), ctl_name), + FullStreamsTable::create(sys_db_meta.next_table_id(), ctl_name), + TerseStreamsTable::create(sys_db_meta.next_table_id(), ctl_name), ProcessesTable::create(sys_db_meta.next_table_id()), ConfigsTable::create(sys_db_meta.next_table_id()), MetricsTable::create(sys_db_meta.next_table_id()), @@ -126,49 +127,61 @@ impl SystemDatabase { MallocStatsTotalsTable::create(sys_db_meta.next_table_id()), ColumnsTable::create(sys_db_meta.next_table_id()), UsersTable::create(sys_db_meta.next_table_id()), - Arc::new(QueryLogTable::create( - sys_db_meta.next_table_id(), - config.query.max_query_log_size, - )), - Arc::new(ClusteringHistoryTable::create( - sys_db_meta.next_table_id(), - config.query.max_query_log_size, - )), EnginesTable::create(sys_db_meta.next_table_id()), RolesTable::create(sys_db_meta.next_table_id()), StagesTable::create(sys_db_meta.next_table_id()), - BuildOptionsTable::create( - sys_db_meta.next_table_id(), - VERGEN_CARGO_FEATURES, - DATABEND_CARGO_CFG_TARGET_FEATURE, - ), CatalogsTable::create(sys_db_meta.next_table_id()), - QueryCacheTable::create(sys_db_meta.next_table_id()), - TableFunctionsTable::create(sys_db_meta.next_table_id()), - CachesTable::create(sys_db_meta.next_table_id()), - IndexesTable::create(sys_db_meta.next_table_id()), - BacktraceTable::create(sys_db_meta.next_table_id()), - TempFilesTable::create(sys_db_meta.next_table_id()), - TasksTable::create(sys_db_meta.next_table_id()), - TaskHistoryTable::create(sys_db_meta.next_table_id()), - QueriesProfilingTable::create(sys_db_meta.next_table_id()), - LocksTable::create(sys_db_meta.next_table_id()), VirtualColumnsTable::create(sys_db_meta.next_table_id()), PasswordPoliciesTable::create(sys_db_meta.next_table_id()), UserFunctionsTable::create(sys_db_meta.next_table_id()), - NotificationsTable::create(sys_db_meta.next_table_id()), - NotificationHistoryTable::create(sys_db_meta.next_table_id()), - ViewsTableWithHistory::create(sys_db_meta.next_table_id()), - ViewsTableWithoutHistory::create(sys_db_meta.next_table_id()), - TemporaryTablesTable::create(sys_db_meta.next_table_id()), + ViewsTableWithoutHistory::create(sys_db_meta.next_table_id(), ctl_name), ProceduresTable::create(sys_db_meta.next_table_id()), - DictionariesTable::create(sys_db_meta.next_table_id()), ]; + let disable_system_table_load; + + if let Some(config) = config { + table_list.extend(vec![ + TablesTableWithHistory::create(sys_db_meta.next_table_id(), ctl_name), + DatabasesTableWithHistory::create(sys_db_meta.next_table_id(), ctl_name), + BuildOptionsTable::create( + sys_db_meta.next_table_id(), + VERGEN_CARGO_FEATURES, + DATABEND_CARGO_CFG_TARGET_FEATURE, + ), + QueryCacheTable::create(sys_db_meta.next_table_id()), + TableFunctionsTable::create(sys_db_meta.next_table_id()), + CachesTable::create(sys_db_meta.next_table_id()), + IndexesTable::create(sys_db_meta.next_table_id()), + BacktraceTable::create(sys_db_meta.next_table_id()), + TempFilesTable::create(sys_db_meta.next_table_id()), + TasksTable::create(sys_db_meta.next_table_id()), + TaskHistoryTable::create(sys_db_meta.next_table_id()), + QueriesProfilingTable::create(sys_db_meta.next_table_id()), + LocksTable::create(sys_db_meta.next_table_id(), ctl_name), + NotificationsTable::create(sys_db_meta.next_table_id()), + NotificationHistoryTable::create(sys_db_meta.next_table_id()), + ViewsTableWithHistory::create(sys_db_meta.next_table_id(), ctl_name), + TemporaryTablesTable::create(sys_db_meta.next_table_id()), + DictionariesTable::create(sys_db_meta.next_table_id()), + Arc::new(QueryLogTable::create( + sys_db_meta.next_table_id(), + config.query.max_query_log_size, + )), + Arc::new(ClusteringHistoryTable::create( + sys_db_meta.next_table_id(), + config.query.max_query_log_size, + )), + ]); + disable_system_table_load = config.query.disable_system_table_load; + } else { + disable_system_table_load = false; + } + let disable_tables = Self::disable_system_tables(); for tbl in table_list.into_iter() { // Not load the disable system tables. - if config.query.disable_system_table_load { + if disable_system_table_load { let name = tbl.name(); if !disable_tables.contains_key(name) { sys_db_meta.insert("system", tbl); diff --git a/src/query/service/src/global_services.rs b/src/query/service/src/global_services.rs index 2ad647946e8d3..b534bf2a94e9c 100644 --- a/src/query/service/src/global_services.rs +++ b/src/query/service/src/global_services.rs @@ -29,7 +29,6 @@ use databend_common_meta_app::schema::CatalogType; use databend_common_storage::DataOperator; use databend_common_storage::ShareTableConfig; use databend_common_storages_hive::HiveCreator; -use databend_common_storages_iceberg::IcebergCreator; use databend_common_storages_system::ProfilesLogQueue; use databend_common_tracing::GlobalLogger; use databend_common_users::builtin::BuiltIn; @@ -43,6 +42,7 @@ use crate::auth::AuthMgr; use crate::builtin::BuiltinUDFs; use crate::builtin::BuiltinUsers; use crate::catalogs::DatabaseCatalog; +use crate::catalogs::IcebergCreator; use crate::clusters::ClusterDiscovery; use crate::locks::LockManager; use crate::persistent_log::GlobalPersistentLog; @@ -100,7 +100,6 @@ impl GlobalServices { // Maybe we can do some refactor to simplify the logic here. { // Init default catalog. - let default_catalog = DatabaseCatalog::try_create_with_config(config.clone()).await?; let catalog_creator: Vec<(CatalogType, Arc)> = vec![ diff --git a/src/query/service/tests/it/catalogs/immutable_catalogs.rs b/src/query/service/tests/it/catalogs/immutable_catalogs.rs index 365a3b3a83bb9..3703fd12efa02 100644 --- a/src/query/service/tests/it/catalogs/immutable_catalogs.rs +++ b/src/query/service/tests/it/catalogs/immutable_catalogs.rs @@ -31,7 +31,7 @@ async fn test_immutable_catalogs_database() -> Result<()> { let tenant = Tenant::new_literal(tenant_name); let conf = databend_query::test_kits::ConfigBuilder::create().config(); - let catalog = ImmutableCatalog::try_create_with_config(&conf).await?; + let catalog = ImmutableCatalog::try_create_with_config(Some(&conf), None)?; // get system database let database = catalog.get_database(&tenant, "system").await?; diff --git a/src/query/service/tests/it/databases/system/system_database.rs b/src/query/service/tests/it/databases/system/system_database.rs index fcb5cc50f0fd5..c88c39b20f394 100644 --- a/src/query/service/tests/it/databases/system/system_database.rs +++ b/src/query/service/tests/it/databases/system/system_database.rs @@ -27,7 +27,7 @@ fn test_disable_system_table() -> Result<()> { { let mut sys_db_meta = InMemoryMetas::create(SYS_DB_ID_BEGIN, SYS_TBL_ID_BEGIN); sys_db_meta.init_db("system"); - let _ = SystemDatabase::create(&mut sys_db_meta, &conf); + let _ = SystemDatabase::create(&mut sys_db_meta, Some(&conf), "default"); let t1 = sys_db_meta.get_by_name("system", "clusters"); assert!(t1.is_ok()); } @@ -38,7 +38,7 @@ fn test_disable_system_table() -> Result<()> { let mut sys_db_meta = InMemoryMetas::create(SYS_DB_ID_BEGIN, SYS_TBL_ID_BEGIN); sys_db_meta.init_db("system"); - let _ = SystemDatabase::create(&mut sys_db_meta, &conf); + let _ = SystemDatabase::create(&mut sys_db_meta, Some(&conf), "default"); let t1 = sys_db_meta.get_by_name("system", "clusters"); assert!(t1.is_err()); } diff --git a/src/query/service/tests/it/storages/system.rs b/src/query/service/tests/it/storages/system.rs index 6c4999956e326..8126bf5f6c790 100644 --- a/src/query/service/tests/it/storages/system.rs +++ b/src/query/service/tests/it/storages/system.rs @@ -274,7 +274,7 @@ async fn test_databases_table() -> Result<()> { let fixture = TestFixture::setup_with_config(&config).await?; let ctx = fixture.new_query_ctx().await?; - let table = DatabasesTableWithoutHistory::create(1); + let table = DatabasesTableWithoutHistory::create(1, "default"); let mut mint = Mint::new("tests/it/storages/testdata"); let file = &mut mint.new_goldenfile("databases_table.txt").unwrap(); @@ -290,7 +290,7 @@ async fn test_databases_history_table() -> Result<()> { let fixture = TestFixture::setup_with_config(&config).await?; let ctx = fixture.new_query_ctx().await?; - let table = DatabasesTableWithHistory::create(1); + let table = DatabasesTableWithHistory::create(1, "default"); let mut mint = Mint::new("tests/it/storages/testdata"); let file = &mut mint diff --git a/src/query/sql/src/planner/binder/ddl/catalog.rs b/src/query/sql/src/planner/binder/ddl/catalog.rs index 2384dfeebbe93..8525b7064cdc5 100644 --- a/src/query/sql/src/planner/binder/ddl/catalog.rs +++ b/src/query/sql/src/planner/binder/ddl/catalog.rs @@ -59,7 +59,7 @@ impl Binder { ) -> Result { let ShowCatalogsStmt { limit } = stmt; let mut query = String::new(); - let default_catalog = self.ctx.get_default_catalog()?.name(); + let default_catalog = self.ctx.get_current_catalog(); write!( query, "SELECT name AS Catalogs FROM {}.system.catalogs", diff --git a/src/query/sql/src/planner/binder/ddl/database.rs b/src/query/sql/src/planner/binder/ddl/database.rs index cacccb96dbb55..be5bac8cff3d6 100644 --- a/src/query/sql/src/planner/binder/ddl/database.rs +++ b/src/query/sql/src/planner/binder/ddl/database.rs @@ -26,6 +26,7 @@ use databend_common_ast::ast::ShowDatabasesStmt; use databend_common_ast::ast::ShowDropDatabasesStmt; use databend_common_ast::ast::ShowLimit; use databend_common_ast::ast::UndropDatabaseStmt; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::DataField; @@ -59,16 +60,21 @@ impl Binder { limit, } = stmt; - let default_catalog = self.ctx.get_default_catalog()?.name(); - let mut select_builder = - SelectBuilder::from(&format!("{}.system.databases", default_catalog)); - let ctl = if let Some(ctl) = catalog { + if let Err(err) = self.ctx.get_catalog(ctl.to_string().as_str()).await { + return Err(ErrorCode::UnknownCatalog(format!( + "Get catalog {} with error: {}", + ctl, err + ))); + } normalize_identifier(ctl, &self.name_resolution_ctx).name } else { self.ctx.get_current_catalog().to_string() }; + let mut select_builder = + SelectBuilder::from(&format!("{}.system.databases", ctl.to_lowercase())); + select_builder.with_filter(format!("catalog = '{ctl}'")); if *full { @@ -135,7 +141,7 @@ impl Binder { None => (), } let query = select_builder.build(); - debug!("show databases rewrite to: {:?}", query); + debug!("show drop databases rewrite to: {:?}", query); self.bind_rewrite_to_query(bind_context, query.as_str(), RewriteKind::ShowDropDatabases) .await diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 7d2d632be1da5..300031cb84276 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -171,13 +171,22 @@ impl Binder { with_history, } = stmt; - let default_catalog = self.ctx.get_default_catalog()?.name(); + let catalog_name = match catalog { + None => self.ctx.get_current_catalog(), + Some(ident) => { + // check in check_database_exist + normalize_identifier(ident, &self.name_resolution_ctx).name + } + }; let database = self.check_database_exist(catalog, database).await?; let mut select_builder = if stmt.with_history { - SelectBuilder::from(&format!("{}.system.tables_with_history", default_catalog)) + SelectBuilder::from(&format!( + "{}.system.tables_with_history", + catalog_name.to_lowercase() + )) } else { - SelectBuilder::from(&format!("{}.system.tables", default_catalog)) + SelectBuilder::from(&format!("{}.system.tables", catalog_name.to_lowercase())) }; if *full { @@ -214,15 +223,6 @@ impl Binder { select_builder.with_filter(format!("database = '{database}'")); select_builder.with_filter("table_type = 'BASE TABLE'".to_string()); - let catalog_name = match catalog { - None => self.ctx.get_current_catalog(), - Some(ident) => { - let catalog = normalize_identifier(ident, &self.name_resolution_ctx).name; - self.ctx.get_catalog(&catalog).await?; - catalog - } - }; - select_builder.with_filter(format!("catalog = '{catalog_name}'")); let query = match limit { None => select_builder.build(), @@ -309,7 +309,7 @@ impl Binder { ) -> Result { let ShowTablesStatusStmt { database, limit } = stmt; - let default_catalog = self.ctx.get_default_catalog()?.name(); + let default_catalog = self.ctx.get_current_catalog(); let database = self.check_database_exist(&None, database).await?; let select_cols = "name AS Name, engine AS Engine, 0 AS Version, \ diff --git a/src/query/storages/iceberg/src/catalog.rs b/src/query/storages/iceberg/src/catalog.rs index 68172d1aef574..d64500f7a4060 100644 --- a/src/query/storages/iceberg/src/catalog.rs +++ b/src/query/storages/iceberg/src/catalog.rs @@ -118,11 +118,11 @@ use crate::IcebergTable; pub const ICEBERG_CATALOG: &str = "iceberg"; #[derive(Debug)] -pub struct IcebergCreator; +pub struct IcebergMutableCreator; -impl CatalogCreator for IcebergCreator { +impl CatalogCreator for IcebergMutableCreator { fn try_create(&self, info: Arc) -> Result> { - let catalog: Arc = Arc::new(IcebergCatalog::try_create(info)?); + let catalog: Arc = Arc::new(IcebergMutableCatalog::try_create(info)?); Ok(catalog) } } @@ -135,7 +135,7 @@ impl CatalogCreator for IcebergCreator { /// - Table metadata are saved in external Iceberg storage #[derive(Clone, Educe)] #[educe(Debug)] -pub struct IcebergCatalog { +pub struct IcebergMutableCatalog { /// info of this iceberg table. info: Arc, @@ -143,7 +143,7 @@ pub struct IcebergCatalog { ctl: Arc, } -impl IcebergCatalog { +impl IcebergMutableCatalog { /// create a new iceberg catalog from the endpoint_address #[fastrace::trace] pub fn try_create(info: Arc) -> Result { @@ -228,7 +228,7 @@ impl IcebergCatalog { } #[async_trait] -impl Catalog for IcebergCatalog { +impl Catalog for IcebergMutableCatalog { fn name(&self) -> String { self.info.name_ident.catalog_name.clone() } @@ -386,7 +386,7 @@ impl Catalog for IcebergCatalog { _get_dropped_table: bool, ) -> Result>> { Err(ErrorCode::Unimplemented( - "Cannot get tables name by ids in HIVE catalog", + "Cannot get tables name by ids in ICEBERG catalog", )) } diff --git a/src/query/storages/iceberg/src/database.rs b/src/query/storages/iceberg/src/database.rs index 51415f76fbc19..39ede8a4952e7 100644 --- a/src/query/storages/iceberg/src/database.rs +++ b/src/query/storages/iceberg/src/database.rs @@ -55,21 +55,21 @@ use iceberg::TableCreation; use iceberg::TableIdent; use crate::cache; -use crate::IcebergCatalog; +use crate::IcebergMutableCatalog; const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id"; #[derive(Clone, Educe)] #[educe(Debug)] pub struct IcebergDatabase { - ctl: IcebergCatalog, + ctl: IcebergMutableCatalog, info: DatabaseInfo, ident: iceberg::NamespaceIdent, } impl IcebergDatabase { - pub fn create(ctl: IcebergCatalog, name: &str) -> Self { + pub fn create(ctl: IcebergMutableCatalog, name: &str) -> Self { let ident = iceberg::NamespaceIdent::new(name.to_string()); let info = DatabaseInfo { database_id: DatabaseId::new(0), diff --git a/src/query/storages/iceberg/src/lib.rs b/src/query/storages/iceberg/src/lib.rs index e50ea43dcc999..af62a28b26421 100644 --- a/src/query/storages/iceberg/src/lib.rs +++ b/src/query/storages/iceberg/src/lib.rs @@ -29,8 +29,8 @@ mod statistics; pub mod table; mod table_source; -pub use catalog::IcebergCatalog; -pub use catalog::IcebergCreator; +pub use catalog::IcebergMutableCatalog; +pub use catalog::IcebergMutableCreator; pub use catalog::ICEBERG_CATALOG; pub use iceberg_inspect::IcebergInspectTable; pub use table::IcebergTable; diff --git a/src/query/storages/information_schema/Cargo.toml b/src/query/storages/information_schema/Cargo.toml index 7ec8b53c0b144..099aff99974a1 100644 --- a/src/query/storages/information_schema/Cargo.toml +++ b/src/query/storages/information_schema/Cargo.toml @@ -10,6 +10,7 @@ edition = { workspace = true } databend-common-ast = { workspace = true } databend-common-catalog = { workspace = true } databend-common-meta-app = { workspace = true } +databend-common-storages-system = { workspace = true } databend-common-storages-view = { workspace = true } [lints] diff --git a/src/query/storages/information_schema/src/columns_table.rs b/src/query/storages/information_schema/src/columns_table.rs index be293fc3ba1b8..ef1c2581c9eb0 100644 --- a/src/query/storages/information_schema/src/columns_table.rs +++ b/src/query/storages/information_schema/src/columns_table.rs @@ -16,17 +16,22 @@ use std::collections::BTreeMap; use std::sync::Arc; use databend_common_catalog::table::Table; +use databend_common_meta_app::schema::CatalogInfo; +use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; +use databend_common_meta_app::tenant::Tenant; +use databend_common_storages_system::generate_catalog_meta; use databend_common_storages_view::view_table::ViewTable; use databend_common_storages_view::view_table::QUERY; pub struct ColumnsTable {} impl ColumnsTable { - pub fn create(table_id: u64) -> Arc { - let query = "SELECT + pub fn create(table_id: u64, ctl_name: &str) -> Arc { + let query = format!( + "SELECT database AS table_catalog, database AS table_schema, table AS table_name, @@ -59,12 +64,14 @@ impl ColumnsTable { NULL AS privileges, default_expression as default, NULL AS extra - FROM default.system.columns;"; + FROM {}.system.columns;", + ctl_name + ); let mut options = BTreeMap::new(); options.insert(QUERY.to_string(), query.to_string()); let table_info = TableInfo { - desc: "'default'.'information_schema'.'columns'".to_string(), + desc: "'information_schema'.'columns'".to_string(), name: "columns".to_string(), ident: TableIdent::new(table_id, 0), meta: TableMeta { @@ -72,6 +79,11 @@ impl ColumnsTable { engine: "VIEW".to_string(), ..Default::default() }, + catalog_info: Arc::new(CatalogInfo { + name_ident: CatalogNameIdent::new(Tenant::new_literal("dummy"), ctl_name).into(), + meta: generate_catalog_meta(ctl_name), + ..Default::default() + }), ..Default::default() }; diff --git a/src/query/storages/information_schema/src/key_column_usage_table.rs b/src/query/storages/information_schema/src/key_column_usage_table.rs index 9e38b277141c8..9ef46f3cce88a 100644 --- a/src/query/storages/information_schema/src/key_column_usage_table.rs +++ b/src/query/storages/information_schema/src/key_column_usage_table.rs @@ -16,16 +16,20 @@ use std::collections::BTreeMap; use std::sync::Arc; use databend_common_catalog::table::Table; +use databend_common_meta_app::schema::CatalogInfo; +use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; +use databend_common_meta_app::tenant::Tenant; +use databend_common_storages_system::generate_catalog_meta; use databend_common_storages_view::view_table::ViewTable; use databend_common_storages_view::view_table::QUERY; pub struct KeyColumnUsageTable {} impl KeyColumnUsageTable { - pub fn create(table_id: u64) -> Arc { + pub fn create(table_id: u64, ctl_name: &str) -> Arc { let query = "SELECT \ NULL as constraint_catalog, \ NULL as constraint_schema, \ @@ -44,7 +48,7 @@ impl KeyColumnUsageTable { let mut options = BTreeMap::new(); options.insert(QUERY.to_string(), query); let table_info = TableInfo { - desc: "'default'.'information_schema'.'key_column_usage'".to_string(), + desc: "'information_schema'.'key_column_usage'".to_string(), name: "key_column_usage".to_string(), ident: TableIdent::new(table_id, 0), meta: TableMeta { @@ -52,6 +56,11 @@ impl KeyColumnUsageTable { engine: "VIEW".to_string(), ..Default::default() }, + catalog_info: Arc::new(CatalogInfo { + name_ident: CatalogNameIdent::new(Tenant::new_literal("dummy"), ctl_name).into(), + meta: generate_catalog_meta(ctl_name), + ..Default::default() + }), ..Default::default() }; diff --git a/src/query/storages/information_schema/src/keywords_table.rs b/src/query/storages/information_schema/src/keywords_table.rs index 7f9fc6b678fbd..faf54cc3bac56 100644 --- a/src/query/storages/information_schema/src/keywords_table.rs +++ b/src/query/storages/information_schema/src/keywords_table.rs @@ -17,16 +17,20 @@ use std::sync::Arc; use databend_common_ast::parser::token::all_reserved_keywords; use databend_common_catalog::table::Table; +use databend_common_meta_app::schema::CatalogInfo; +use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; +use databend_common_meta_app::tenant::Tenant; +use databend_common_storages_system::generate_catalog_meta; use databend_common_storages_view::view_table::ViewTable; use databend_common_storages_view::view_table::QUERY; pub struct KeywordsTable {} impl KeywordsTable { - pub fn create(table_id: u64) -> Arc { + pub fn create(table_id: u64, ctl_name: &str) -> Arc { let all_keywords_vec = all_reserved_keywords(); let all_keywords = all_keywords_vec.join(", "); let query = "SELECT '".to_owned() + &all_keywords + "' AS KEYWORDS, 1 AS RESERVED"; @@ -34,7 +38,7 @@ impl KeywordsTable { let mut options = BTreeMap::new(); options.insert(QUERY.to_string(), query); let table_info = TableInfo { - desc: "'default'.'information_schema'.'keywords'".to_string(), + desc: "'information_schema'.'keywords'".to_string(), name: "keywords".to_string(), ident: TableIdent::new(table_id, 0), meta: TableMeta { @@ -42,6 +46,11 @@ impl KeywordsTable { engine: "VIEW".to_string(), ..Default::default() }, + catalog_info: Arc::new(CatalogInfo { + name_ident: CatalogNameIdent::new(Tenant::new_literal("dummy"), ctl_name).into(), + meta: generate_catalog_meta(ctl_name), + ..Default::default() + }), ..Default::default() }; diff --git a/src/query/storages/information_schema/src/schemata_table.rs b/src/query/storages/information_schema/src/schemata_table.rs index c43fb70ff9064..f3f570cb39ab3 100644 --- a/src/query/storages/information_schema/src/schemata_table.rs +++ b/src/query/storages/information_schema/src/schemata_table.rs @@ -16,17 +16,22 @@ use std::collections::BTreeMap; use std::sync::Arc; use databend_common_catalog::table::Table; +use databend_common_meta_app::schema::CatalogInfo; +use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; +use databend_common_meta_app::tenant::Tenant; +use databend_common_storages_system::generate_catalog_meta; use databend_common_storages_view::view_table::ViewTable; use databend_common_storages_view::view_table::QUERY; pub struct SchemataTable {} impl SchemataTable { - pub fn create(table_id: u64) -> Arc { - let query = "SELECT + pub fn create(table_id: u64, ctl_name: &str) -> Arc { + let query = format!( + "SELECT name AS catalog_name, name AS schema_name, 'default' AS schema_owner, @@ -35,12 +40,14 @@ impl SchemataTable { NULL AS default_character_set_name, NULL AS default_collation_name, NULL AS sql_path - FROM default.system.databases;"; + FROM {}.system.databases;", + ctl_name + ); let mut options = BTreeMap::new(); options.insert(QUERY.to_string(), query.to_string()); let table_info = TableInfo { - desc: "'default'.'information_schema'.'schemata'".to_string(), + desc: "'information_schema'.'schemata'".to_string(), name: "schemata".to_string(), ident: TableIdent::new(table_id, 0), meta: TableMeta { @@ -48,6 +55,11 @@ impl SchemataTable { engine: "VIEW".to_string(), ..Default::default() }, + catalog_info: Arc::new(CatalogInfo { + name_ident: CatalogNameIdent::new(Tenant::new_literal("dummy"), ctl_name).into(), + meta: generate_catalog_meta(ctl_name), + ..Default::default() + }), ..Default::default() }; diff --git a/src/query/storages/information_schema/src/statistics_table.rs b/src/query/storages/information_schema/src/statistics_table.rs index e8eb02026e57f..a23866a57fce1 100644 --- a/src/query/storages/information_schema/src/statistics_table.rs +++ b/src/query/storages/information_schema/src/statistics_table.rs @@ -16,16 +16,20 @@ use std::collections::BTreeMap; use std::sync::Arc; use databend_common_catalog::table::Table; +use databend_common_meta_app::schema::CatalogInfo; +use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; +use databend_common_meta_app::tenant::Tenant; +use databend_common_storages_system::generate_catalog_meta; use databend_common_storages_view::view_table::ViewTable; use databend_common_storages_view::view_table::QUERY; pub struct StatisticsTable {} impl StatisticsTable { - pub fn create(table_id: u64) -> Arc { + pub fn create(table_id: u64, ctl_name: &str) -> Arc { let query = "SELECT \ NULL as table_catalog, \ NULL as table_schema, \ @@ -48,7 +52,7 @@ impl StatisticsTable { let mut options = BTreeMap::new(); options.insert(QUERY.to_string(), query); let table_info = TableInfo { - desc: "'default'.'information_schema'.'statistics'".to_string(), + desc: "'information_schema'.'statistics'".to_string(), name: "statistics".to_string(), ident: TableIdent::new(table_id, 0), meta: TableMeta { @@ -56,6 +60,11 @@ impl StatisticsTable { engine: "VIEW".to_string(), ..Default::default() }, + catalog_info: Arc::new(CatalogInfo { + name_ident: CatalogNameIdent::new(Tenant::new_literal("dummy"), ctl_name).into(), + meta: generate_catalog_meta(ctl_name), + ..Default::default() + }), ..Default::default() }; diff --git a/src/query/storages/information_schema/src/tables_table.rs b/src/query/storages/information_schema/src/tables_table.rs index c7446b6f91f9e..2ffa51e38ade2 100644 --- a/src/query/storages/information_schema/src/tables_table.rs +++ b/src/query/storages/information_schema/src/tables_table.rs @@ -16,9 +16,13 @@ use std::collections::BTreeMap; use std::sync::Arc; use databend_common_catalog::table::Table; +use databend_common_meta_app::schema::CatalogInfo; +use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; +use databend_common_meta_app::tenant::Tenant; +use databend_common_storages_system::generate_catalog_meta; use databend_common_storages_view::view_table::ViewTable; use databend_common_storages_view::view_table::QUERY; @@ -51,8 +55,9 @@ impl TablesTable { // | CREATE_OPTIONS | varchar(256) | YES | | NULL | | // | TABLE_COMMENT | text | YES | | NULL | | // +-----------------+--------------------------------------------------------------------+------+-----+---------+-------+ - pub fn create(table_id: u64) -> Arc { - let query = "SELECT + pub fn create(table_id: u64, ctl_name: &str) -> Arc { + let query = format!( + "SELECT database AS table_catalog, database AS table_schema, name AS table_name, @@ -67,12 +72,14 @@ impl TablesTable { NULL AS table_collation, NULL AS data_free, comment AS table_comment - FROM default.system.tables ORDER BY table_schema;"; + FROM {}.system.tables ORDER BY table_schema;", + ctl_name + ); let mut options = BTreeMap::new(); options.insert(QUERY.to_string(), query.to_string()); let table_info = TableInfo { - desc: "'default'.'information_schema'.'tables'".to_string(), + desc: "'information_schema'.'tables'".to_string(), name: "tables".to_string(), ident: TableIdent::new(table_id, 0), meta: TableMeta { @@ -80,6 +87,11 @@ impl TablesTable { engine: "VIEW".to_string(), ..Default::default() }, + catalog_info: Arc::new(CatalogInfo { + name_ident: CatalogNameIdent::new(Tenant::new_literal("dummy"), ctl_name).into(), + meta: generate_catalog_meta(ctl_name), + ..Default::default() + }), ..Default::default() }; diff --git a/src/query/storages/information_schema/src/views_table.rs b/src/query/storages/information_schema/src/views_table.rs index 24b8a9eaee8c7..56a1c11ae079e 100644 --- a/src/query/storages/information_schema/src/views_table.rs +++ b/src/query/storages/information_schema/src/views_table.rs @@ -16,17 +16,22 @@ use std::collections::BTreeMap; use std::sync::Arc; use databend_common_catalog::table::Table; +use databend_common_meta_app::schema::CatalogInfo; +use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; +use databend_common_meta_app::tenant::Tenant; +use databend_common_storages_system::generate_catalog_meta; use databend_common_storages_view::view_table::ViewTable; use databend_common_storages_view::view_table::QUERY; pub struct ViewsTable {} impl ViewsTable { - pub fn create(table_id: u64) -> Arc { - let query = "SELECT + pub fn create(table_id: u64, ctl_name: &str) -> Arc { + let query = format!( + "SELECT catalog AS table_catalog, database AS table_schema, name AS table_name, @@ -37,12 +42,14 @@ impl ViewsTable { 0 AS is_trigger_updatable, 0 AS is_trigger_deletable, 0 AS is_trigger_insertable_into - FROM default.system.views"; + FROM {}.system.views", + ctl_name + ); let mut options = BTreeMap::new(); options.insert(QUERY.to_string(), query.to_string()); let table_info = TableInfo { - desc: "default.'information_schema'.'views'".to_string(), + desc: "'information_schema'.'views'".to_string(), name: "views".to_string(), ident: TableIdent::new(table_id, 0), meta: TableMeta { @@ -50,6 +57,11 @@ impl ViewsTable { engine: "VIEW".to_string(), ..Default::default() }, + catalog_info: Arc::new(CatalogInfo { + name_ident: CatalogNameIdent::new(Tenant::new_literal("dummy"), ctl_name).into(), + meta: generate_catalog_meta(ctl_name), + ..Default::default() + }), ..Default::default() }; diff --git a/src/query/storages/system/src/databases_table.rs b/src/query/storages/system/src/databases_table.rs index 255cc2074ecbd..adc489bb2ccb2 100644 --- a/src/query/storages/system/src/databases_table.rs +++ b/src/query/storages/system/src/databases_table.rs @@ -34,6 +34,8 @@ use databend_common_expression::TableSchemaRefExt; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_meta_app::principal::OwnershipObject; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; +use databend_common_meta_app::schema::CatalogInfo; +use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; @@ -44,6 +46,7 @@ use log::warn; use crate::table::AsyncOneBlockSystemTable; use crate::table::AsyncSystemTable; use crate::util::find_eq_filter; +use crate::util::generate_catalog_meta; pub type DatabasesTableWithHistory = DatabasesTable; pub type DatabasesTableWithoutHistory = DatabasesTable; @@ -123,17 +126,22 @@ where DatabasesTable: HistoryAware let catalogs = if let Some(filter_catalog_name) = filter_catalog_name { let mut res = vec![]; - let ctl = ctx.get_catalog(&filter_catalog_name).await?; - res.push((filter_catalog_name, ctl)); + if filter_catalog_name == self.get_table_info().catalog() { + let ctl = ctx.get_catalog(&filter_catalog_name).await?; + res.push((filter_catalog_name, ctl)); + } + // If empty return empty result res } else { - let catalogs = CatalogManager::instance(); - catalogs - .list_catalogs(&tenant, ctx.session_state()) - .await? - .iter() - .map(|e| (e.name(), e.clone())) - .collect() + let catalog_mgr = CatalogManager::instance(); + let ctl = catalog_mgr + .get_catalog( + tenant.tenant_name(), + self.get_table_info().catalog(), + ctx.session_state(), + ) + .await?; + vec![(ctl.name(), ctl)] }; let user_api = UserApiProvider::instance(); @@ -286,7 +294,7 @@ where DatabasesTable: HistoryAware impl DatabasesTable where DatabasesTable: HistoryAware { - pub fn create(table_id: u64) -> Arc { + pub fn create(table_id: u64, ctl_name: &str) -> Arc { let schema = TableSchemaRefExt::create(vec![ TableField::new("catalog", TableDataType::String), TableField::new("name", TableDataType::String), @@ -311,6 +319,11 @@ where DatabasesTable: HistoryAware engine: "SystemDatabases".to_string(), ..Default::default() }, + catalog_info: Arc::new(CatalogInfo { + name_ident: CatalogNameIdent::new(Tenant::new_literal("dummy"), ctl_name).into(), + meta: generate_catalog_meta(ctl_name), + ..Default::default() + }), ..Default::default() }; diff --git a/src/query/storages/system/src/lib.rs b/src/query/storages/system/src/lib.rs index 4e7fabf732c73..4e8c0b4d4bacb 100644 --- a/src/query/storages/system/src/lib.rs +++ b/src/query/storages/system/src/lib.rs @@ -131,4 +131,5 @@ pub use temp_files_table::TempFilesTable; pub use temporary_tables_table::TemporaryTablesTable; pub use user_functions_table::UserFunctionsTable; pub use users_table::UsersTable; +pub use util::generate_catalog_meta; pub use virtual_columns_table::VirtualColumnsTable; diff --git a/src/query/storages/system/src/locks_table.rs b/src/query/storages/system/src/locks_table.rs index c5d2fcdb5ab58..f58480d8a2df2 100644 --- a/src/query/storages/system/src/locks_table.rs +++ b/src/query/storages/system/src/locks_table.rs @@ -31,14 +31,18 @@ use databend_common_expression::TableField; use databend_common_expression::TableSchemaRef; use databend_common_expression::TableSchemaRefExt; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_meta_app::schema::CatalogInfo; +use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::ListLocksReq; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; +use databend_common_meta_app::tenant::Tenant; use crate::table::AsyncOneBlockSystemTable; use crate::table::AsyncSystemTable; use crate::util::find_eq_filter; +use crate::util::generate_catalog_meta; pub struct LocksTable { table_info: TableInfo, @@ -60,10 +64,15 @@ impl AsyncSystemTable for LocksTable { ) -> Result { let tenant = ctx.get_tenant(); let catalog_mgr = CatalogManager::instance(); - let ctls = catalog_mgr - .list_catalogs(&tenant, ctx.session_state()) - .await?; - + let ctls = vec![ + catalog_mgr + .get_catalog( + tenant.tenant_name(), + self.table_info.catalog(), + ctx.session_state(), + ) + .await?, + ]; let mut lock_table_id = Vec::new(); let mut lock_revision = Vec::new(); let mut lock_type = Vec::new(); @@ -155,7 +164,7 @@ impl LocksTable { ]) } - pub fn create(table_id: u64) -> Arc { + pub fn create(table_id: u64, ctl_name: &str) -> Arc { let table_info = TableInfo { desc: "'system'.'locks'".to_string(), name: "locks".to_string(), @@ -165,6 +174,11 @@ impl LocksTable { engine: "SystemLocks".to_string(), ..Default::default() }, + catalog_info: Arc::new(CatalogInfo { + name_ident: CatalogNameIdent::new(Tenant::new_literal("dummy"), ctl_name).into(), + meta: generate_catalog_meta(ctl_name), + ..Default::default() + }), ..Default::default() }; AsyncOneBlockSystemTable::create(LocksTable { table_info }) diff --git a/src/query/storages/system/src/streams_table.rs b/src/query/storages/system/src/streams_table.rs index 7808d9ea5a439..7238f0d753759 100644 --- a/src/query/storages/system/src/streams_table.rs +++ b/src/query/storages/system/src/streams_table.rs @@ -37,9 +37,12 @@ use databend_common_expression::TableSchemaRef; use databend_common_expression::TableSchemaRefExt; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_meta_app::principal::OwnershipObject; +use databend_common_meta_app::schema::CatalogInfo; +use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; +use databend_common_meta_app::tenant::Tenant; use databend_common_storages_fuse::operations::acquire_task_permit; use databend_common_storages_fuse::FuseTable; use databend_common_storages_stream::stream_table::StreamTable; @@ -49,6 +52,7 @@ use log::warn; use crate::table::AsyncOneBlockSystemTable; use crate::table::AsyncSystemTable; use crate::util::find_eq_filter; +use crate::util::generate_catalog_meta; pub type FullStreamsTable = StreamsTable; pub type TerseStreamsTable = StreamsTable; @@ -74,12 +78,15 @@ impl AsyncSystemTable for StreamsTable { let tenant = ctx.get_tenant(); let catalog_mgr = CatalogManager::instance(); - let ctls = catalog_mgr - .list_catalogs(&tenant, ctx.session_state()) - .await? - .iter() - .map(|e| (e.name(), e.clone())) - .collect::>(); + let ctl = catalog_mgr + .get_catalog( + tenant.tenant_name(), + self.table_info.catalog(), + ctx.session_state(), + ) + .await?; + let ctl_name = ctl.name(); + let visibility_checker = ctx.get_visibility_checker(false).await?; let user_api = UserApiProvider::instance(); @@ -102,222 +109,217 @@ impl AsyncSystemTable for StreamsTable { let io_request_semaphore = Arc::new(Semaphore::new(max_threads)); let runtime = GlobalIORuntime::instance(); - for (ctl_name, ctl) in ctls.iter() { - let mut dbs = Vec::new(); - if let Some(push_downs) = &push_downs { - let mut db_name = Vec::new(); - if let Some(filter) = push_downs.filters.as_ref().map(|f| &f.filter) { - let expr = filter.as_expr(&BUILTIN_FUNCTIONS); - find_eq_filter(&expr, &mut |col_name, scalar| { - if col_name == "database" { - if let Scalar::String(database) = scalar { - if !db_name.contains(database) { - db_name.push(database.clone()); - } + let mut dbs = Vec::new(); + if let Some(push_downs) = &push_downs { + let mut db_name = Vec::new(); + if let Some(filter) = push_downs.filters.as_ref().map(|f| &f.filter) { + let expr = filter.as_expr(&BUILTIN_FUNCTIONS); + find_eq_filter(&expr, &mut |col_name, scalar| { + if col_name == "database" { + if let Scalar::String(database) = scalar { + if !db_name.contains(database) { + db_name.push(database.clone()); } } - Ok(()) - }); - for db in db_name { - match ctl.get_database(&tenant, db.as_str()).await { - Ok(database) => dbs.push(database), - Err(err) => { - let msg = format!("Failed to get database: {}, {}", db, err); - warn!("{}", msg); - ctx.push_warning(msg); - } + } + Ok(()) + }); + for db in db_name { + match ctl.get_database(&tenant, db.as_str()).await { + Ok(database) => dbs.push(database), + Err(err) => { + let msg = format!("Failed to get database: {}, {}", db, err); + warn!("{}", msg); + ctx.push_warning(msg); } } } } + } - if dbs.is_empty() { - dbs = match ctl.list_databases(&tenant).await { - Ok(dbs) => dbs, - Err(err) => { - let msg = - format!("List databases failed on catalog {}: {}", ctl.name(), err); - warn!("{}", msg); - ctx.push_warning(msg); + if dbs.is_empty() { + dbs = match ctl.list_databases(&tenant).await { + Ok(dbs) => dbs, + Err(err) => { + let msg = format!("List databases failed on catalog {}: {}", ctl.name(), err); + warn!("{}", msg); + ctx.push_warning(msg); - vec![] - } + vec![] } } + } + + let final_dbs = dbs + .into_iter() + .filter(|db| { + visibility_checker.check_database_visibility( + &ctl_name, + db.name(), + db.get_db_info().database_id.db_id, + ) + }) + .collect::>(); - let final_dbs = dbs - .into_iter() - .filter(|db| { - visibility_checker.check_database_visibility( - ctl_name, - db.name(), - db.get_db_info().database_id.db_id, - ) - }) - .collect::>(); - - let ownership = if T { - user_api.list_ownerships(&tenant).await.unwrap_or_default() - } else { - HashMap::new() + let ownership = if T { + user_api.list_ownerships(&tenant).await.unwrap_or_default() + } else { + HashMap::new() + }; + + let mut source_db_id_set = HashSet::new(); + let mut source_tb_id_set = HashSet::new(); + let mut source_db_tb_ids = vec![]; + for db in final_dbs { + let db_id = db.get_db_info().database_id.db_id; + let db_name = db.name(); + let tables = match ctl.list_tables(&tenant, db_name).await { + Ok(tables) => tables, + Err(err) => { + // Swallow the errors related with sharing. Listing tables in a shared database + // is easy to get errors with invalid configs, but system.streams is better not + // to be affected by it. + let msg = format!("Failed to list tables in database: {}, {}", db_name, err); + warn!("{}", msg); + ctx.push_warning(msg); + + continue; + } }; - let mut source_db_id_set = HashSet::new(); - let mut source_tb_id_set = HashSet::new(); - let mut source_db_tb_ids = vec![]; - for db in final_dbs { - let db_id = db.get_db_info().database_id.db_id; - let db_name = db.name(); - let tables = match ctl.list_tables(&tenant, db_name).await { - Ok(tables) => tables, - Err(err) => { - // Swallow the errors related with sharing. Listing tables in a shared database - // is easy to get errors with invalid configs, but system.streams is better not - // to be affected by it. - let msg = - format!("Failed to list tables in database: {}, {}", db_name, err); - warn!("{}", msg); - ctx.push_warning(msg); + let mut handlers = Vec::new(); + for table in tables { + // If db1 is visible, do not mean db1.table1 is visible. A user may have a grant about db1.table2, so db1 is visible + // for her, but db1.table1 may be not visible. So we need an extra check about table here after db visibility check. + let t_id = table.get_id(); + if visibility_checker.check_table_visibility( + &ctl_name, + db.name(), + table.name(), + db_id, + t_id, + ) && table.is_stream() + { + let stream_info = table.get_table_info(); + let stream_table = StreamTable::try_from_table(table.as_ref())?; - continue; + let source_db_id = stream_table.source_database_id(ctl.as_ref()).await.ok(); + if let Some(source_db_id) = source_db_id { + source_db_id_set.insert(source_db_id); + } + let source_tb_id = stream_table.source_table_id().ok(); + if let Some(source_tb_id) = source_tb_id { + source_tb_id_set.insert(source_tb_id); } - }; - - let mut handlers = Vec::new(); - for table in tables { - // If db1 is visible, do not mean db1.table1 is visible. A user may have a grant about db1.table2, so db1 is visible - // for her, but db1.table1 may be not visible. So we need an extra check about table here after db visibility check. - let t_id = table.get_id(); - if visibility_checker.check_table_visibility( - ctl_name, - db.name(), - table.name(), - db_id, - t_id, - ) && table.is_stream() - { - let stream_info = table.get_table_info(); - let stream_table = StreamTable::try_from_table(table.as_ref())?; - - let source_db_id = stream_table.source_database_id(ctl.as_ref()).await.ok(); - if let Some(source_db_id) = source_db_id { - source_db_id_set.insert(source_db_id); + match (source_db_id, source_tb_id) { + (Some(source_db_id), Some(source_tb_id)) => { + source_db_tb_ids.push(Some((source_db_id, source_tb_id))); } - let source_tb_id = stream_table.source_table_id().ok(); - if let Some(source_tb_id) = source_tb_id { - source_tb_id_set.insert(source_tb_id); + (_, _) => { + source_db_tb_ids.push(None); } - match (source_db_id, source_tb_id) { - (Some(source_db_id), Some(source_tb_id)) => { - source_db_tb_ids.push(Some((source_db_id, source_tb_id))); - } - (_, _) => { - source_db_tb_ids.push(None); - } + } + catalogs.push(ctl_name.as_str()); + databases.push(db_name.to_owned()); + names.push(stream_table.name().to_string()); + mode.push(stream_table.mode().to_string()); + + if T { + stream_id.push(stream_info.ident.table_id); + created_on.push(stream_info.meta.created_on.timestamp_micros()); + updated_on.push(stream_info.meta.updated_on.timestamp_micros()); + + if ownership.is_empty() { + owner.push(None); + } else { + owner.push( + ownership + .get(&OwnershipObject::Table { + catalog_name: ctl_name.to_string(), + db_id, + table_id: t_id, + }) + .map(|role| role.to_string()), + ); } - catalogs.push(ctl_name.as_str()); - databases.push(db_name.to_owned()); - names.push(stream_table.name().to_string()); - mode.push(stream_table.mode().to_string()); - - if T { - stream_id.push(stream_info.ident.table_id); - created_on.push(stream_info.meta.created_on.timestamp_micros()); - updated_on.push(stream_info.meta.updated_on.timestamp_micros()); - - if ownership.is_empty() { - owner.push(None); - } else { - owner.push( - ownership - .get(&OwnershipObject::Table { - catalog_name: ctl_name.to_string(), - db_id, - table_id: t_id, - }) - .map(|role| role.to_string()), - ); - } - comment.push(stream_info.meta.comment.clone()); - - table_version.push(stream_table.offset().ok()); - table_id.push(source_tb_id); - snapshot_location.push(stream_table.snapshot_loc()); - - let permit = acquire_task_permit(io_request_semaphore.clone()).await?; - let ctx = ctx.clone(); - let table = table.clone(); - let handler = runtime.spawn(async move { - let mut reason = "".to_string(); - // safe unwrap. - let stream_table = - StreamTable::try_from_table(table.as_ref()).unwrap(); - match stream_table.source_table(ctx).await { - Ok(source) => { - // safe unwrap, has been checked in source_table. - let fuse_table = - FuseTable::try_from_table(source.as_ref()).unwrap(); - if let Some(location) = stream_table.snapshot_loc() { - reason = fuse_table - .changes_read_offset_snapshot(&location) - .await - .err() - .map_or("".to_string(), |e| e.display_text()); - } - } - Err(e) => { - reason = e.display_text(); + comment.push(stream_info.meta.comment.clone()); + + table_version.push(stream_table.offset().ok()); + table_id.push(source_tb_id); + snapshot_location.push(stream_table.snapshot_loc()); + + let permit = acquire_task_permit(io_request_semaphore.clone()).await?; + let ctx = ctx.clone(); + let table = table.clone(); + let handler = runtime.spawn(async move { + let mut reason = "".to_string(); + // safe unwrap. + let stream_table = StreamTable::try_from_table(table.as_ref()).unwrap(); + match stream_table.source_table(ctx).await { + Ok(source) => { + // safe unwrap, has been checked in source_table. + let fuse_table = + FuseTable::try_from_table(source.as_ref()).unwrap(); + if let Some(location) = stream_table.snapshot_loc() { + reason = fuse_table + .changes_read_offset_snapshot(&location) + .await + .err() + .map_or("".to_string(), |e| e.display_text()); } } - drop(permit); - reason - }); - handlers.push(handler); - } + Err(e) => { + reason = e.display_text(); + } + } + drop(permit); + reason + }); + handlers.push(handler); } } - - let mut joint = futures::future::try_join_all(handlers) - .await - .unwrap_or_default(); - invalid_reason.append(&mut joint); } - let mut source_db_ids = source_db_id_set.into_iter().collect::>(); - source_db_ids.sort(); - let source_db_names = ctl - .mget_database_names_by_ids(&tenant, &source_db_ids) - .await?; - let source_db_map = source_db_ids - .into_iter() - .zip(source_db_names.into_iter()) - .filter(|(_, db_name)| db_name.is_some()) - .map(|(db_id, db_name)| (db_id, db_name.unwrap())) - .collect::>(); - - let mut source_tb_ids = source_tb_id_set.into_iter().collect::>(); - source_tb_ids.sort(); - let source_tb_names = ctl - .mget_table_names_by_ids(&tenant, &source_tb_ids, false) - .await?; - let source_tb_map = source_tb_ids - .into_iter() - .zip(source_tb_names.into_iter()) - .filter(|(_, tb_name)| tb_name.is_some()) - .map(|(tb_id, tb_name)| (tb_id, tb_name.unwrap())) - .collect::>(); - - for source_db_tb_id in source_db_tb_ids.into_iter() { - if let Some((db_id, tb_id)) = source_db_tb_id { - if let Some(db) = source_db_map.get(&db_id) { - if let Some(tb) = source_tb_map.get(&tb_id) { - table_name.push(Some(format!("{db}.{tb}"))); - continue; - } + let mut joint = futures::future::try_join_all(handlers) + .await + .unwrap_or_default(); + invalid_reason.append(&mut joint); + } + + let mut source_db_ids = source_db_id_set.into_iter().collect::>(); + source_db_ids.sort(); + let source_db_names = ctl + .mget_database_names_by_ids(&tenant, &source_db_ids) + .await?; + let source_db_map = source_db_ids + .into_iter() + .zip(source_db_names.into_iter()) + .filter(|(_, db_name)| db_name.is_some()) + .map(|(db_id, db_name)| (db_id, db_name.unwrap())) + .collect::>(); + + let mut source_tb_ids = source_tb_id_set.into_iter().collect::>(); + source_tb_ids.sort(); + let source_tb_names = ctl + .mget_table_names_by_ids(&tenant, &source_tb_ids, false) + .await?; + let source_tb_map = source_tb_ids + .into_iter() + .zip(source_tb_names.into_iter()) + .filter(|(_, tb_name)| tb_name.is_some()) + .map(|(tb_id, tb_name)| (tb_id, tb_name.unwrap())) + .collect::>(); + + for source_db_tb_id in source_db_tb_ids.into_iter() { + if let Some((db_id, tb_id)) = source_db_tb_id { + if let Some(db) = source_db_map.get(&db_id) { + if let Some(tb) = source_tb_map.get(&tb_id) { + table_name.push(Some(format!("{db}.{tb}"))); + continue; } } - table_name.push(None); } + table_name.push(None); } if T { @@ -401,7 +403,7 @@ impl StreamsTable { } } - pub fn create(table_id: u64) -> Arc { + pub fn create(table_id: u64, ctl_name: &str) -> Arc { let name = if T { "streams" } else { "streams_terse" }; let table_info = TableInfo { desc: format!("'system'.'{name}'"), @@ -412,6 +414,11 @@ impl StreamsTable { engine: "SystemStreams".to_string(), ..Default::default() }, + catalog_info: Arc::new(CatalogInfo { + name_ident: CatalogNameIdent::new(Tenant::new_literal("dummy"), ctl_name).into(), + meta: generate_catalog_meta(ctl_name), + ..Default::default() + }), ..Default::default() }; AsyncOneBlockSystemTable::create(StreamsTable:: { table_info }) diff --git a/src/query/storages/system/src/tables_table.rs b/src/query/storages/system/src/tables_table.rs index 56daa4d1d9aa1..e808a27c00681 100644 --- a/src/query/storages/system/src/tables_table.rs +++ b/src/query/storages/system/src/tables_table.rs @@ -43,6 +43,8 @@ use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_management::RoleApi; use databend_common_meta_app::principal::OwnershipObject; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; +use databend_common_meta_app::schema::CatalogInfo; +use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::CatalogType; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; @@ -58,6 +60,7 @@ use crate::table::AsyncOneBlockSystemTable; use crate::table::AsyncSystemTable; use crate::util::find_eq_filter; use crate::util::find_eq_or_filter; +use crate::util::generate_catalog_meta; pub struct TablesTable { table_info: TableInfo, @@ -160,21 +163,22 @@ where TablesTable: HistoryAware ) -> Result { let tenant = ctx.get_tenant(); let catalog_mgr = CatalogManager::instance(); - let catalogs: Vec> = catalog_mgr - .list_catalogs(&tenant, ctx.session_state()) - .await? - .into_iter() - .map(|cat| cat.disable_table_info_refresh()) - .collect::>>()?; + let catalog = catalog_mgr + .get_catalog( + tenant.tenant_name(), + self.get_table_info().catalog(), + ctx.session_state(), + ) + .await?; // Optimization target: Fast path for known iceberg catalog SHOW TABLES if let Some((catalog_name, db_name)) = - self.is_external_show_tables_query(&push_downs, &catalogs) + self.is_external_show_tables_query(&push_downs, &catalog) { self.show_tables_from_external_catalog(ctx, catalog_name, db_name) .await } else { - self.get_full_data_from_catalogs(ctx, push_downs, catalogs) + self.get_full_data_from_catalogs(ctx, push_downs, catalog) .await } } @@ -282,13 +286,10 @@ where TablesTable: HistoryAware &self, ctx: Arc, push_downs: Option, - catalogs: Vec>, + catalog_impl: Arc, ) -> Result { let tenant = ctx.get_tenant(); - let mut ctls: Vec<(String, Arc)> = - catalogs.iter().map(|e| (e.name(), e.clone())).collect(); - let mut catalogs = vec![]; let mut databases = vec![]; let mut databases_ids = vec![]; @@ -390,15 +391,20 @@ where TablesTable: HistoryAware } } - ctls = if !catalog_name.is_empty() && !invalid_optimize { + let ctl_name = catalog_impl.name(); + + let ctls = if !catalog_name.is_empty() && !invalid_optimize { let mut res = vec![]; for name in &catalog_name { - let ctl = ctx.get_catalog(name).await?; - res.push((name.to_string(), ctl)); + if *name == ctl_name { + let ctl = ctx.get_catalog(name).await?; + res.push((name.to_string(), ctl)); + } } + // If empty return empty result res } else { - ctls + vec![(ctl_name, catalog_impl)] }; let visibility_checker = ctx.get_visibility_checker(false).await?; @@ -963,7 +969,7 @@ where TablesTable: HistoryAware fn is_external_show_tables_query( &self, push_downs: &Option, - catalogs: &[Arc], + catalog: &Arc, ) -> Option<(String, String)> { if !WITH_HISTORY && WITHOUT_VIEW { let mut database_name = None; @@ -1012,11 +1018,9 @@ where TablesTable: HistoryAware // Check iceberg catalog existence if let Some(catalog_name) = catalog_name { if let Some(database_name) = database_name { - for catalog in catalogs { - if catalog.name() == catalog_name { - if let CatalogType::Iceberg = catalog.info().catalog_type() { - return Some((catalog_name, database_name)); - } + if catalog.name() == catalog_name { + if let CatalogType::Iceberg = catalog.info().catalog_type() { + return Some((catalog_name, database_name)); } } } @@ -1035,9 +1039,7 @@ where TablesTable: HistoryAware ) -> Result { let tenant = ctx.get_tenant(); let catalog = ctx.get_catalog(&catalog_name).await?; - let db = catalog.get_database(&tenant, &db_name).await?; - let all_table_names = db.list_tables_names().await?; - + let all_table_names = catalog.list_tables_names(&tenant, &db_name).await?; let rows = all_table_names.len(); Self::generate_tables_block( vec![catalog_name; rows], @@ -1066,7 +1068,7 @@ where TablesTable: HistoryAware ) } - pub fn create(table_id: u64) -> Arc { + pub fn create(table_id: u64, ctl_name: &str) -> Arc { let name = Self::TABLE_NAME; let table_info = TableInfo { desc: format!("'system'.'{name}'"), @@ -1078,6 +1080,11 @@ where TablesTable: HistoryAware ..Default::default() }, + catalog_info: Arc::new(CatalogInfo { + name_ident: CatalogNameIdent::new(Tenant::new_literal("dummy"), ctl_name).into(), + meta: generate_catalog_meta(ctl_name), + ..Default::default() + }), ..Default::default() }; diff --git a/src/query/storages/system/src/util.rs b/src/query/storages/system/src/util.rs index 6aff7cc74ac3a..c737388bfdffe 100644 --- a/src/query/storages/system/src/util.rs +++ b/src/query/storages/system/src/util.rs @@ -12,9 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_catalog::catalog_kind::CATALOG_DEFAULT; use databend_common_exception::Result; use databend_common_expression::expr::*; use databend_common_expression::Scalar; +use databend_common_meta_app::schema::CatalogMeta; +use databend_common_meta_app::schema::CatalogOption; +use databend_common_meta_app::schema::IcebergCatalogOption; +use databend_common_meta_app::schema::IcebergRestCatalogOption; + +pub fn generate_catalog_meta(ctl_name: &str) -> CatalogMeta { + if ctl_name.to_lowercase() == CATALOG_DEFAULT { + CatalogMeta { + catalog_option: CatalogOption::Default, + created_on: Default::default(), + } + } else { + CatalogMeta { + catalog_option: CatalogOption::Iceberg(IcebergCatalogOption::Rest( + IcebergRestCatalogOption { + uri: "".to_string(), + warehouse: "".to_string(), + props: Default::default(), + }, + )), + created_on: Default::default(), + } + } +} pub fn find_eq_filter(expr: &Expr, visitor: &mut impl FnMut(&str, &Scalar) -> Result<()>) { match expr { diff --git a/tests/sqllogictests/suites/tpch_iceberg/utils.test b/tests/sqllogictests/suites/tpch_iceberg/utils.test index e806e1371c129..093d3e78490b7 100644 --- a/tests/sqllogictests/suites/tpch_iceberg/utils.test +++ b/tests/sqllogictests/suites/tpch_iceberg/utils.test @@ -23,6 +23,8 @@ ctl query T show databases from ctl; ---- +information_schema +system tpch statement error 1003 @@ -83,6 +85,8 @@ show databases; ---- abc ef123 +information_schema +system tpch statement ok @@ -92,6 +96,8 @@ query T rowsort show databases from ctl; ---- abc +information_schema +system tpch statement ok diff --git a/tests/suites/3_stateful_iceberg/00_rest/00_0000_create_and_show.result b/tests/suites/3_stateful_iceberg/00_rest/00_0000_create_and_show.result index f8dd78f5eb9c6..ef8581c820a74 100644 --- a/tests/suites/3_stateful_iceberg/00_rest/00_0000_create_and_show.result +++ b/tests/suites/3_stateful_iceberg/00_rest/00_0000_create_and_show.result @@ -2,5 +2,7 @@ iceberg_ctl iceberg ADDRESS http://127.0.0.1:8181 WAREHOUSE s3://icebergdata/demo +information_schema +system default iceberg_ctl From 485018dc872619fd9c27341688fc7aa356011b61 Mon Sep 17 00:00:00 2001 From: TCeason Date: Tue, 29 Apr 2025 11:40:03 +0800 Subject: [PATCH 2/2] try optimize external sys table query, no need to check privilege --- .../storages/system/src/databases_table.rs | 36 +-- src/query/storages/system/src/tables_table.rs | 208 +++++++++++------- 2 files changed, 152 insertions(+), 92 deletions(-) diff --git a/src/query/storages/system/src/databases_table.rs b/src/query/storages/system/src/databases_table.rs index adc489bb2ccb2..0513b6a8f8490 100644 --- a/src/query/storages/system/src/databases_table.rs +++ b/src/query/storages/system/src/databases_table.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use databend_common_catalog::catalog::Catalog; -use databend_common_catalog::catalog::CatalogManager; use databend_common_catalog::database::Database; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::table::Table; @@ -124,23 +123,23 @@ where DatabasesTable: HistoryAware } } + let ctl = ctx.get_catalog(self.get_table_info().catalog()).await?; + let visibility_checker = if ctl.is_external() { + None + } else { + Some(ctx.get_visibility_checker(false).await?) + }; + let catalog_dbs = visibility_checker + .as_ref() + .and_then(|c| c.get_visibility_database()); let catalogs = if let Some(filter_catalog_name) = filter_catalog_name { let mut res = vec![]; if filter_catalog_name == self.get_table_info().catalog() { - let ctl = ctx.get_catalog(&filter_catalog_name).await?; res.push((filter_catalog_name, ctl)); } // If empty return empty result res } else { - let catalog_mgr = CatalogManager::instance(); - let ctl = catalog_mgr - .get_catalog( - tenant.tenant_name(), - self.get_table_info().catalog(), - ctx.session_state(), - ) - .await?; vec![(ctl.name(), ctl)] }; @@ -151,8 +150,6 @@ where DatabasesTable: HistoryAware let mut owners: Vec> = vec![]; let mut dropped_on: Vec> = vec![]; - let visibility_checker = ctx.get_visibility_checker(false).await?; - let catalog_dbs = visibility_checker.get_visibility_database(); // None means has global level privileges if let Some(catalog_dbs) = catalog_dbs { if WITH_HISTORY { @@ -252,11 +249,16 @@ where DatabasesTable: HistoryAware let final_dbs = databases .into_iter() .filter(|db| { - visibility_checker.check_database_visibility( - &ctl_name, - db.name(), - db.get_db_info().database_id.db_id, - ) + visibility_checker + .as_ref() + .map(|c| { + c.check_database_visibility( + &ctl_name, + db.name(), + db.get_db_info().database_id.db_id, + ) + }) + .unwrap_or(true) }) .collect::>(); diff --git a/src/query/storages/system/src/tables_table.rs b/src/query/storages/system/src/tables_table.rs index e808a27c00681..24b774655a477 100644 --- a/src/query/storages/system/src/tables_table.rs +++ b/src/query/storages/system/src/tables_table.rs @@ -175,6 +175,7 @@ where TablesTable: HistoryAware if let Some((catalog_name, db_name)) = self.is_external_show_tables_query(&push_downs, &catalog) { + println!("call show_tables_from_external_catalog"); self.show_tables_from_external_catalog(ctx, catalog_name, db_name) .await } else { @@ -290,6 +291,12 @@ where TablesTable: HistoryAware ) -> Result { let tenant = ctx.get_tenant(); + let visibility_checker = if catalog_impl.is_external() { + None + } else { + Some(ctx.get_visibility_checker(false).await?) + }; + let mut catalogs = vec![]; let mut databases = vec![]; let mut databases_ids = vec![]; @@ -407,7 +414,6 @@ where TablesTable: HistoryAware vec![(ctl_name, catalog_impl)] }; - let visibility_checker = ctx.get_visibility_checker(false).await?; // from system.tables where database = 'db' and name = 'name' // from system.tables where database = 'db' and table_id = 123 if db_name.len() == 1 @@ -447,35 +453,51 @@ where TablesTable: HistoryAware Ok(t) => { let db_id = db.get_db_info().database_id.db_id; let table_id = t.get_id(); - let role = user_api - .role_api(&tenant) - .get_ownership(&OwnershipObject::Table { - catalog_name: ctl_name.to_string(), + if let Some(visibility_checker) = &visibility_checker { + let role = user_api + .role_api(&tenant) + .get_ownership(&OwnershipObject::Table { + catalog_name: ctl_name.to_string(), + db_id, + table_id, + }) + .await? + .map(|o| o.role); + if visibility_checker.check_table_visibility( + ctl_name, + db.name(), + table_name, db_id, - table_id, - }) - .await? - .map(|o| o.role); - if visibility_checker.check_table_visibility( - ctl_name, - db.name(), - table_name, - db_id, - t.get_id(), - ) { - catalogs.push(ctl_name.to_string()); - databases.push(db.name().to_owned()); - databases_ids.push(db.get_db_info().database_id.db_id); - database_tables.push(t); - owner.push(role); - } else if let Some(role) = role { - let roles = ctx.get_all_effective_roles().await?; - if roles.iter().any(|r| r.name == role) { - catalogs.push(ctl_name.to_string()); - databases.push(db.name().to_owned()); - databases_ids.push(db.get_db_info().database_id.db_id); - database_tables.push(t); - owner.push(Some(role)); + t.get_id(), + ) { + push_table_info( + &mut catalogs, + &mut databases, + &mut databases_ids, + &mut database_tables, + &mut owner, + ctl_name, + db.name(), + db.get_db_info().database_id.db_id, + t, + role, + ); + } else if let Some(role) = role { + let roles = ctx.get_all_effective_roles().await?; + if roles.iter().any(|r| r.name == role) { + push_table_info( + &mut catalogs, + &mut databases, + &mut databases_ids, + &mut database_tables, + &mut owner, + ctl_name, + db.name(), + db.get_db_info().database_id.db_id, + t, + Some(role), + ); + } } } } @@ -495,7 +517,9 @@ where TablesTable: HistoryAware } } } else { - let catalog_dbs = visibility_checker.get_visibility_database(); + let catalog_dbs = visibility_checker + .as_ref() + .and_then(|c| c.get_visibility_database()); for (ctl_name, ctl) in ctls.iter() { let default_catalog = ctl.info().catalog_type() == CatalogType::Default; @@ -516,18 +540,21 @@ where TablesTable: HistoryAware } } - match ctl - .mget_table_names_by_ids(&tenant, &tables_ids, WITH_HISTORY) - .await - { - Ok(tables) => { - for table in tables.into_iter().flatten() { - tables_names.insert(table.clone()); + if visibility_checker.is_some() { + match ctl + .mget_table_names_by_ids(&tenant, &tables_ids, WITH_HISTORY) + .await + { + Ok(tables) => { + for table in tables.into_iter().flatten() { + tables_names.insert(table.clone()); + } + } + Err(err) => { + let msg = + format!("Failed to get tables: {}, {}", ctl.name(), err); + warn!("{}", msg); } - } - Err(err) => { - let msg = format!("Failed to get tables: {}, {}", ctl.name(), err); - warn!("{}", msg); } } } @@ -592,22 +619,27 @@ where TablesTable: HistoryAware .clone() .into_iter() .filter(|db| { - visibility_checker.check_database_visibility( - ctl_name, - db.name(), - db.get_db_info().database_id.db_id, - ) + visibility_checker + .as_ref() + .map(|c| { + c.check_database_visibility( + ctl_name, + db.name(), + db.get_db_info().database_id.db_id, + ) + }) + .unwrap_or(true) }) .collect::>(); // Now we get the final dbs, need to clear dbs vec. dbs.clear(); - let ownership = if get_ownership && default_catalog { + let ownership = if get_ownership && visibility_checker.is_some() { user_api.list_ownerships(&tenant).await.unwrap_or_default() } else { HashMap::new() }; - let mock_table = !default_catalog && only_get_name; + let mock_table = ctl.is_external() && only_get_name; for db in final_dbs { let db_id = db.get_db_info().database_id.db_id; let db_name = db.name(); @@ -691,17 +723,18 @@ where TablesTable: HistoryAware for table in tables { let table_id = table.get_id(); - let check_table_visibility = if default_catalog { - visibility_checker.check_table_visibility( - ctl_name, - db_name, - table.name(), - db_id, - table_id, - ) - } else { - true - }; + let check_table_visibility = visibility_checker + .as_ref() + .map(|c| { + c.check_table_visibility( + ctl_name, + db_name, + table.name(), + db_id, + table_id, + ) + }) + .unwrap_or(true); // If db1 is visible, do not mean db1.table1 is visible. A user may have a grant about db1.table2, so db1 is visible // for her, but db1.table1 may be not visible. So we need an extra check about table here after db visibility check. if (table.get_table_info().engine() == "VIEW" || WITHOUT_VIEW) @@ -710,23 +743,29 @@ where TablesTable: HistoryAware { // system.tables store view name but not store view query // decrease information_schema.tables union. - catalogs.push(ctl_name.to_string()); - databases.push(db_name.to_owned()); - databases_ids.push(db.get_db_info().database_id.db_id); - database_tables.push(table); - if ownership.is_empty() { - owner.push(None); + let role = if ownership.is_empty() { + None } else { - owner.push( - ownership - .get(&OwnershipObject::Table { - catalog_name: ctl_name.to_string(), - db_id, - table_id, - }) - .map(|role| role.to_string()), - ); - } + ownership + .get(&OwnershipObject::Table { + catalog_name: ctl_name.to_string(), + db_id, + table_id, + }) + .map(|role| role.to_string()) + }; + push_table_info( + &mut catalogs, + &mut databases, + &mut databases_ids, + &mut database_tables, + &mut owner, + ctl_name, + db.name(), + db.get_db_info().database_id.db_id, + table, + role, + ); } } } @@ -1091,3 +1130,22 @@ where TablesTable: HistoryAware AsyncOneBlockSystemTable::create(TablesTable:: { table_info }) } } + +fn push_table_info( + catalogs: &mut Vec, + databases: &mut Vec, + databases_ids: &mut Vec, + database_tables: &mut Vec>, + owner: &mut Vec>, + ctl_name: &str, + db_name: &str, + db_id: u64, + table: Arc, + role: Option, +) { + catalogs.push(ctl_name.to_string()); + databases.push(db_name.to_string()); + databases_ids.push(db_id); + database_tables.push(table); // 如果 T: Copy, 这是 Copy; 如果 T: Clone, 外部调用时需要 .clone() + owner.push(role); +}