diff --git a/Cargo.lock b/Cargo.lock index eb70aa6199bee..dc4e9f98ed26c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3431,7 +3431,6 @@ dependencies = [ "databend-common-functions", "databend-common-meta-api", "databend-common-meta-app", - "databend-common-meta-embedded", "databend-common-meta-kvapi", "databend-common-meta-store", "databend-common-meta-types", @@ -3596,21 +3595,6 @@ dependencies = [ "url", ] -[[package]] -name = "databend-common-meta-embedded" -version = "0.1.0" -dependencies = [ - "anyhow", - "async-trait", - "databend-common-base", - "databend-common-meta-api", - "databend-common-meta-kvapi", - "databend-common-meta-raft-store", - "databend-common-tracing", - "fastrace", - "test-harness", -] - [[package]] name = "databend-common-meta-kvapi" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 4c845fe161ee0..d2529a69cc4be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,7 +97,6 @@ members = [ "src/meta/client", "src/meta/control", "src/meta/ee", - "src/meta/embedded", "src/meta/kvapi", "src/meta/process", "src/meta/proto-conv", @@ -146,7 +145,6 @@ databend-common-meta-app-types = { path = "src/meta/app-types" } databend-common-meta-cache = { path = "src/meta/cache" } databend-common-meta-client = { path = "src/meta/client" } databend-common-meta-control = { path = "src/meta/control" } -databend-common-meta-embedded = { path = "src/meta/embedded" } databend-common-meta-kvapi = { path = "src/meta/kvapi" } databend-common-meta-process = { path = "src/meta/process" } databend-common-meta-raft-store = { path = "src/meta/raft-store" } diff --git a/src/meta/embedded/Cargo.toml b/src/meta/embedded/Cargo.toml deleted file mode 100644 index 789a76cdb66cf..0000000000000 --- a/src/meta/embedded/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "databend-common-meta-embedded" -description = "distributed meta data service" -version = { workspace = true } -authors = { workspace = true } -license = { workspace = true } -publish = { workspace = true } -edition = { workspace = true } - -[features] -io-uring = ["databend-common-meta-raft-store/io-uring"] - -[dependencies] -async-trait = { workspace = true } -databend-common-base = { workspace = true } -databend-common-meta-api = { workspace = true } -databend-common-meta-kvapi = { workspace = true } -databend-common-meta-raft-store = { workspace = true } -databend-common-tracing = { workspace = true } -fastrace = { workspace = true } - -[dev-dependencies] -anyhow = { workspace = true } -test-harness = { workspace = true } - -[lints] -workspace = true diff --git a/src/meta/embedded/src/lib.rs b/src/meta/embedded/src/lib.rs deleted file mode 100644 index e396d77855d1b..0000000000000 --- a/src/meta/embedded/src/lib.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub use databend_common_meta_raft_store::mem_meta::MemMeta; -pub use databend_common_meta_raft_store::mem_state_machine::MemStateMachine; diff --git a/src/meta/embedded/tests/it/kv_api_impl.rs b/src/meta/embedded/tests/it/kv_api_impl.rs deleted file mode 100644 index 8457342b9fcbe..0000000000000 --- a/src/meta/embedded/tests/it/kv_api_impl.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; -use std::sync::Once; - -use databend_common_base::base::tokio; -use databend_common_meta_kvapi::kvapi; -use databend_common_meta_raft_store::mem_meta::MemMeta; -use databend_common_tracing::init_logging; -use databend_common_tracing::Config; - -#[derive(Clone)] -struct Builder; - -#[async_trait::async_trait] -impl kvapi::ApiBuilder for Builder { - async fn build(&self) -> MemMeta { - MemMeta::default() - } - - async fn build_cluster(&self) -> Vec { - unreachable!("InMemoryMeta does not support cluster") - } -} - -#[tokio::test(flavor = "multi_thread")] -async fn test_mem_meta_kv_api() -> anyhow::Result<()> { - setup_test(); - kvapi::TestSuite {}.test_single_node(&Builder).await -} - -fn setup_test() { - static INIT: Once = Once::new(); - INIT.call_once(|| { - let mut config = Config::new_testing(); - config.file.level = "DEBUG".to_string(); - - let guards = init_logging("meta_unittests", &config, BTreeMap::new()); - Box::leak(Box::new(guards)); - }); -} diff --git a/src/meta/embedded/tests/it/main.rs b/src/meta/embedded/tests/it/main.rs deleted file mode 100644 index 7b742d84e5808..0000000000000 --- a/src/meta/embedded/tests/it/main.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![allow(clippy::diverging_sub_expression)] - -mod kv_api_impl; -mod schema_api_impl; -mod testing; diff --git a/src/meta/embedded/tests/it/schema_api_impl.rs b/src/meta/embedded/tests/it/schema_api_impl.rs deleted file mode 100644 index b8ed34a55d4a5..0000000000000 --- a/src/meta/embedded/tests/it/schema_api_impl.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use async_trait::async_trait; -use databend_common_meta_api::SchemaApiTestSuite; -use databend_common_meta_kvapi::kvapi; -use databend_common_meta_raft_store::mem_meta::MemMeta; -use test_harness::test; - -use crate::testing::mem_meta_test_harness; - -#[derive(Clone)] -pub struct MemMetaBuilder {} - -#[async_trait] -impl kvapi::ApiBuilder for MemMetaBuilder { - async fn build(&self) -> MemMeta { - MemMeta::default() - } - - async fn build_cluster(&self) -> Vec { - unimplemented!("embedded meta does not support cluster mode") - } -} - -#[test(harness = mem_meta_test_harness)] -#[fastrace::trace] -async fn test_mem_meta_schema_api() -> anyhow::Result<()> { - SchemaApiTestSuite::test_single_node(MemMetaBuilder {}).await?; - Ok(()) -} diff --git a/src/meta/embedded/tests/it/testing.rs b/src/meta/embedded/tests/it/testing.rs deleted file mode 100644 index b811f2ede1cbe..0000000000000 --- a/src/meta/embedded/tests/it/testing.rs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; -use std::sync::Once; - -use databend_common_base::base::tokio; -use databend_common_tracing::closure_name; -use databend_common_tracing::init_logging; -use databend_common_tracing::Config; -use fastrace::prelude::*; - -pub fn mem_meta_test_harness(test: F) -where - F: FnOnce() -> Fut + 'static, - Fut: std::future::Future> + Send + 'static, -{ - setup_test(); - - let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(3) - .enable_all() - .build() - .unwrap(); - let root = Span::root(closure_name::(), SpanContext::random()); - let test = test().in_span(root); - rt.block_on(test).unwrap(); - - shutdown_test(); -} - -fn setup_test() { - static INIT: Once = Once::new(); - INIT.call_once(|| { - let mut config = Config::new_testing(); - config.file.level = "DEBUG".to_string(); - let guards = init_logging("embedded_meta_unittests", &config, BTreeMap::new()); - Box::leak(Box::new(guards)); - }); -} - -fn shutdown_test() { - fastrace::flush(); -} diff --git a/src/meta/raft-store/src/lib.rs b/src/meta/raft-store/src/lib.rs index f58c703fea71a..0abc58d3e5d6c 100644 --- a/src/meta/raft-store/src/lib.rs +++ b/src/meta/raft-store/src/lib.rs @@ -23,8 +23,6 @@ pub mod config; pub mod key_spaces; pub mod leveled_store; pub(crate) mod marked; -pub mod mem_meta; -pub mod mem_state_machine; pub mod ondisk; pub mod raft_log_v004; pub mod sm_v003; diff --git a/src/meta/raft-store/src/mem_meta.rs b/src/meta/raft-store/src/mem_meta.rs deleted file mode 100644 index 4611b12bb5410..0000000000000 --- a/src/meta/raft-store/src/mem_meta.rs +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::future; -use std::io; -use std::sync::Arc; - -use databend_common_meta_kvapi::kvapi::KVApi; -use databend_common_meta_kvapi::kvapi::KVStream; -use databend_common_meta_types::protobuf::StreamItem; -use databend_common_meta_types::AppliedState; -use databend_common_meta_types::Change; -use databend_common_meta_types::CmdContext; -use databend_common_meta_types::MetaError; -use databend_common_meta_types::SeqV; -use databend_common_meta_types::SeqValue; -use databend_common_meta_types::TxnReply; -use databend_common_meta_types::TxnRequest; -use databend_common_meta_types::UpsertKV; -use futures_util::StreamExt; -use futures_util::TryStreamExt; -use log::debug; -use tokio::sync::Mutex; -use tokio::sync::Semaphore; - -use crate::applier::Applier; -use crate::mem_state_machine::MemStateMachine; -use crate::state_machine_api_ext::StateMachineApiExt; - -#[derive(Clone, Default)] -pub struct MemMeta { - sm: Arc>, - pub locks: Arc>>>, -} - -impl MemMeta { - async fn init_applier(&self, a: &mut Applier<'_, MemStateMachine>) -> Result<(), io::Error> { - let now = SeqV::<()>::now_ms(); - a.cmd_ctx = CmdContext::from_millis(now); - a.clean_expired_kvs(now).await?; - Ok(()) - } - - fn non_expired(seq_value: Option>, now_ms: u64) -> Option> { - if seq_value.is_expired(now_ms) { - None - } else { - seq_value - } - } -} - -#[async_trait::async_trait] -impl KVApi for MemMeta { - type Error = MetaError; - - async fn upsert_kv(&self, upsert_kv: UpsertKV) -> Result>, Self::Error> { - debug!("InMemoryStateMachine::upsert_kv({})", upsert_kv); - - let mut sm = self.sm.lock().await; - let mut applier = Applier::new(&mut *sm); - self.init_applier(&mut applier).await?; - - let (prev, result) = applier.upsert_kv(&upsert_kv).await?; - - let st = Change::new(prev, result); - Ok(st) - } - - async fn get_kv_stream(&self, keys: &[String]) -> Result, Self::Error> { - debug!("InMemoryStateMachine::get_kv_stream({:?})", keys); - - let local_now_ms = SeqV::<()>::now_ms(); - - let mut items = Vec::with_capacity(keys.len()); - - let sm = self.sm.lock().await; - - for k in keys { - let got = sm.get_maybe_expired_kv(k).await?; - let v = Self::non_expired(got, local_now_ms); - items.push(Ok(StreamItem::from((k.clone(), v)))); - } - - Ok(futures::stream::iter(items).boxed()) - } - - async fn list_kv(&self, prefix: &str) -> Result, Self::Error> { - debug!("InMemoryStateMachine::list_kv({})", prefix); - - let local_now_ms = SeqV::<()>::now_ms(); - - let sm = self.sm.lock().await; - - let strm = sm - .list_kv(prefix) - .await? - .try_filter(move |(_k, v)| future::ready(!v.is_expired(local_now_ms))) - .map_ok(StreamItem::from) - .map_err(|e| e.into()); - - Ok(strm.boxed()) - } - - async fn transaction(&self, txn: TxnRequest) -> Result { - debug!("InMemoryStateMachine::transaction({})", txn); - - let mut sm = self.sm.lock().await; - let mut applier = Applier::new(&mut *sm); - self.init_applier(&mut applier).await?; - - let applied_state = applier.apply_txn(&txn).await?; - match applied_state { - AppliedState::TxnReply(txn_reply) => Ok(txn_reply), - _ => unreachable!("expect TxnReply, got {:?}", applied_state), - } - } -} diff --git a/src/meta/raft-store/src/mem_state_machine.rs b/src/meta/raft-store/src/mem_state_machine.rs deleted file mode 100644 index 5c99e43a22dd2..0000000000000 --- a/src/meta/raft-store/src/mem_state_machine.rs +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use databend_common_meta_types::sys_data::SysData; - -use crate::leveled_store::level::Level; -use crate::state_machine::ExpireKey; -use crate::state_machine_api::SMEventSender; -use crate::state_machine_api::StateMachineApi; -/// A pure in-memory state machine as mock for testing. -#[derive(Debug, Default)] -pub struct MemStateMachine { - level: Level, - expire_cursor: ExpireKey, -} - -impl StateMachineApi for MemStateMachine { - type Map = Level; - - fn get_expire_cursor(&self) -> ExpireKey { - self.expire_cursor - } - - fn set_expire_cursor(&mut self, cursor: ExpireKey) { - self.expire_cursor = cursor; - } - - fn map_ref(&self) -> &Self::Map { - &self.level - } - - fn map_mut(&mut self) -> &mut Self::Map { - &mut self.level - } - - fn sys_data_mut(&mut self) -> &mut SysData { - &mut self.level.sys_data - } - - fn event_sender(&self) -> Option<&dyn SMEventSender> { - None - } -} diff --git a/src/meta/store/src/lib.rs b/src/meta/store/src/lib.rs index 814d01cf17deb..e823b979c7941 100644 --- a/src/meta/store/src/lib.rs +++ b/src/meta/store/src/lib.rs @@ -49,7 +49,7 @@ pub struct MetaStoreProvider { rpc_conf: RpcClientConf, } -/// MetaStore is impl with either a local embedded meta store, or a grpc-client of metasrv +/// MetaStore is impl with either a local meta-service, or a grpc-client of metasrv #[derive(Clone)] pub enum MetaStore { L(Arc), diff --git a/src/query/management/Cargo.toml b/src/query/management/Cargo.toml index 3aad0338a3488..5b7958b8164ff 100644 --- a/src/query/management/Cargo.toml +++ b/src/query/management/Cargo.toml @@ -30,7 +30,6 @@ thiserror = { workspace = true } [dev-dependencies] databend-common-expression = { workspace = true } -databend-common-meta-embedded = { workspace = true } mockall = { workspace = true } [lints] diff --git a/src/query/management/tests/it/quota.rs b/src/query/management/tests/it/quota.rs index c61b1bf6fd32c..238d068650ba3 100644 --- a/src/query/management/tests/it/quota.rs +++ b/src/query/management/tests/it/quota.rs @@ -20,8 +20,8 @@ use databend_common_management::*; use databend_common_meta_api::deserialize_struct; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_app::tenant::TenantQuota; -use databend_common_meta_embedded::MemMeta; use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_store::MetaStore; use databend_common_meta_types::MatchSeq; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -84,8 +84,9 @@ async fn test_update_quota_from_json_to_pb() -> Result<()> { Ok(()) } -async fn new_quota_api() -> Result<(Arc, QuotaMgr, QuotaMgr)> { - let test_api = Arc::new(MemMeta::default()); +async fn new_quota_api() -> Result<(Arc, QuotaMgr, QuotaMgr)> { + let test_api = MetaStore::new_local_testing().await; + let test_api = Arc::new(test_api); let mgr_json = QuotaMgr::::create(test_api.clone(), &Tenant::new_literal("admin")); let mgr_pb = QuotaMgr::::create(test_api.clone(), &Tenant::new_literal("admin")); Ok((test_api, mgr_json, mgr_pb)) diff --git a/src/query/management/tests/it/role.rs b/src/query/management/tests/it/role.rs index 9b751dd165f3a..e866852e4778c 100644 --- a/src/query/management/tests/it/role.rs +++ b/src/query/management/tests/it/role.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use databend_common_base::base::tokio; use databend_common_management::*; use databend_common_meta_app::tenant::Tenant; -use databend_common_meta_embedded::MemMeta; +use databend_common_meta_store::MetaStore; use databend_common_meta_types::UpsertKV; use mockall::predicate::*; @@ -89,8 +89,10 @@ mod add { async fn new_role_api( enable_meta_data_upgrade_json_to_pb_from_v307: bool, -) -> databend_common_exception::Result<(Arc, RoleMgr)> { - let test_api = Arc::new(MemMeta::default()); +) -> databend_common_exception::Result<(Arc, RoleMgr)> { + let test_api = MetaStore::new_local_testing().await; + let test_api = Arc::new(test_api); + let tenant = Tenant::new_literal("admin"); let mgr = RoleMgr::create( test_api.clone(), diff --git a/src/query/management/tests/it/setting.rs b/src/query/management/tests/it/setting.rs index 3612a1273a27f..c77692db87404 100644 --- a/src/query/management/tests/it/setting.rs +++ b/src/query/management/tests/it/setting.rs @@ -20,8 +20,8 @@ use databend_common_management::*; use databend_common_meta_app::principal::UserSetting; use databend_common_meta_app::principal::UserSettingValue; use databend_common_meta_app::tenant::Tenant; -use databend_common_meta_embedded::MemMeta; use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_store::MetaStore; use databend_common_meta_types::seq_value::SeqV; use fastrace::func_name; @@ -111,8 +111,10 @@ async fn test_set_setting() -> Result<()> { Ok(()) } -async fn new_setting_api() -> Result<(Arc, SettingMgr)> { - let test_api = Arc::new(MemMeta::default()); +async fn new_setting_api() -> Result<(Arc, SettingMgr)> { + let test_api = MetaStore::new_local_testing().await; + let test_api = Arc::new(test_api); + let mgr = SettingMgr::create( test_api.clone(), &Tenant::new_or_err("databend_query", func_name!()).unwrap(), diff --git a/src/query/management/tests/it/stage.rs b/src/query/management/tests/it/stage.rs index e104b96e7b288..5b1ee9a1c2f3c 100644 --- a/src/query/management/tests/it/stage.rs +++ b/src/query/management/tests/it/stage.rs @@ -25,8 +25,8 @@ use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::storage::StorageParams; use databend_common_meta_app::storage::StorageS3Config; use databend_common_meta_app::tenant::Tenant; -use databend_common_meta_embedded::MemMeta; use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_store::MetaStore; use databend_common_meta_types::seq_value::SeqV; use fastrace::func_name; @@ -138,8 +138,10 @@ fn create_test_stage_info() -> StageInfo { } } -async fn new_stage_api() -> Result<(Arc, StageMgr)> { - let test_api = Arc::new(MemMeta::default()); +async fn new_stage_api() -> Result<(Arc, StageMgr)> { + let test_api = MetaStore::new_local_testing().await; + let test_api = Arc::new(test_api); + let mgr = StageMgr::create( test_api.clone(), &Tenant::new_or_err("admin", func_name!()).unwrap(), diff --git a/src/query/management/tests/it/udf.rs b/src/query/management/tests/it/udf.rs index ee4673067c708..92ff54ef145dc 100644 --- a/src/query/management/tests/it/udf.rs +++ b/src/query/management/tests/it/udf.rs @@ -25,8 +25,8 @@ use databend_common_management::*; use databend_common_meta_app::principal::UserDefinedFunction; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::tenant::Tenant; -use databend_common_meta_embedded::MemMeta; use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_store::MetaStore; use databend_common_meta_types::seq_value::SeqV; use databend_common_meta_types::MatchSeq; @@ -234,8 +234,10 @@ fn create_test_udf_script() -> UserDefinedFunction { ) } -async fn new_udf_api() -> Result<(Arc, UdfMgr)> { - let test_api = Arc::new(MemMeta::default()); +async fn new_udf_api() -> Result<(Arc, UdfMgr)> { + let test_api = MetaStore::new_local_testing().await; + let test_api = Arc::new(test_api); + let mgr = UdfMgr::create(test_api.clone(), &Tenant::new_literal("admin")); Ok((test_api, mgr)) }