From b3c7f8846e7338f0969f82a9ab3a23a76a86d2f8 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Sun, 29 Dec 2024 22:04:41 +0800 Subject: [PATCH] [refactor](execenv) remove shared ptr from exec env (#46034) ExecEnv should be the last object to deconstructed, so that it should not own any shared ptr. If it own any shared ptr, then we could not make sure the deconstruct sequence. --- be/src/cloud/cloud_stream_load_executor.h | 2 + be/src/runtime/exec_env.cpp | 6 --- be/src/runtime/exec_env.h | 30 ++++++-------- be/src/runtime/exec_env_init.cpp | 40 ++++++++++++++++--- be/src/runtime/memory/lru_cache_policy.h | 12 +++--- be/test/http/stream_load_test.cpp | 4 +- be/test/olap/wal/wal_manager_test.cpp | 13 +++--- .../routine_load_task_executor_test.cpp | 19 +++++---- be/test/testutil/run_all_tests.cpp | 1 - be/test/vec/exec/vwal_scanner_test.cpp | 3 +- 10 files changed, 77 insertions(+), 53 deletions(-) diff --git a/be/src/cloud/cloud_stream_load_executor.h b/be/src/cloud/cloud_stream_load_executor.h index b0cb91d06ac42a..d04e55feba552e 100644 --- a/be/src/cloud/cloud_stream_load_executor.h +++ b/be/src/cloud/cloud_stream_load_executor.h @@ -21,6 +21,8 @@ namespace doris { class CloudStreamLoadExecutor final : public StreamLoadExecutor { + ENABLE_FACTORY_CREATOR(CloudStreamLoadExecutor); + public: CloudStreamLoadExecutor(ExecEnv* exec_env); diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index ab24d7ca192689..e3a71261b677eb 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -38,12 +38,6 @@ namespace doris { -ExecEnv::ExecEnv() = default; - -ExecEnv::~ExecEnv() { - destroy(); -} - #ifdef BE_TEST void ExecEnv::set_inverted_index_searcher_cache( segment_v2::InvertedIndexSearcherCache* inverted_index_searcher_cache) { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 886927fa68bb82..abf301b782d7b8 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -244,14 +244,14 @@ class ExecEnv { } LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; } LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr.get(); } - std::shared_ptr new_load_stream_mgr() { return _new_load_stream_mgr; } + NewLoadStreamMgr* new_load_stream_mgr() { return _new_load_stream_mgr.get(); } SmallFileMgr* small_file_mgr() { return _small_file_mgr; } doris::vectorized::SpillStreamManager* spill_stream_mgr() { return _spill_stream_mgr; } GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; } const std::vector& store_paths() const { return _store_paths; } - std::shared_ptr stream_load_executor() { return _stream_load_executor; } + StreamLoadExecutor* stream_load_executor() { return _stream_load_executor.get(); } RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; } HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; } vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; } @@ -273,12 +273,10 @@ class ExecEnv { _memtable_memory_limiter.reset(limiter); } void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info = cluster_info; } - void set_new_load_stream_mgr(std::shared_ptr new_load_stream_mgr) { - this->_new_load_stream_mgr = new_load_stream_mgr; - } - void set_stream_load_executor(std::shared_ptr stream_load_executor) { - this->_stream_load_executor = stream_load_executor; - } + void set_new_load_stream_mgr(std::unique_ptr&& new_load_stream_mgr); + void clear_new_load_stream_mgr(); + void set_stream_load_executor(std::unique_ptr&& stream_load_executor); + void clear_stream_load_executor(); void set_storage_engine(std::unique_ptr&& engine); void set_inverted_index_searcher_cache( @@ -294,10 +292,9 @@ class ExecEnv { void set_routine_load_task_executor(RoutineLoadTaskExecutor* r) { this->_routine_load_task_executor = r; } - void set_wal_mgr(std::shared_ptr wm) { this->_wal_manager = wm; } - void set_dummy_lru_cache(std::shared_ptr dummy_lru_cache) { - this->_dummy_lru_cache = dummy_lru_cache; - } + void set_wal_mgr(std::unique_ptr&& wm); + void clear_wal_mgr(); + void set_write_cooldown_meta_executors(); static void set_tracking_memory(bool tracking_memory) { _s_tracking_memory.store(tracking_memory, std::memory_order_release); @@ -333,7 +330,6 @@ class ExecEnv { return _inverted_index_query_cache; } QueryCache* get_query_cache() { return _query_cache; } - std::shared_ptr get_dummy_lru_cache() { return _dummy_lru_cache; } pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() { return _runtime_filter_timer_queue; @@ -431,13 +427,12 @@ class ExecEnv { BrokerMgr* _broker_mgr = nullptr; LoadChannelMgr* _load_channel_mgr = nullptr; std::unique_ptr _load_stream_mgr; - // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle. - std::shared_ptr _new_load_stream_mgr; + std::unique_ptr _new_load_stream_mgr; BrpcClientCache* _internal_client_cache = nullptr; BrpcClientCache* _streaming_client_cache = nullptr; BrpcClientCache* _function_client_cache = nullptr; - std::shared_ptr _stream_load_executor; + std::unique_ptr _stream_load_executor; RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr; SmallFileMgr* _small_file_mgr = nullptr; HeartbeatFlags* _heartbeat_flags = nullptr; @@ -448,7 +443,7 @@ class ExecEnv { std::unique_ptr _memtable_memory_limiter; std::unique_ptr _load_stream_map_pool; std::unique_ptr _delta_writer_v2_pool; - std::shared_ptr _wal_manager; + std::unique_ptr _wal_manager; DNSCache* _dns_cache = nullptr; std::unique_ptr _write_cooldown_meta_executors; @@ -475,7 +470,6 @@ class ExecEnv { segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr; segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr; QueryCache* _query_cache = nullptr; - std::shared_ptr _dummy_lru_cache = nullptr; std::unique_ptr _file_cache_open_fd_cache; pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index d912807b2e1c94..5efb108c067d97 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -160,6 +160,12 @@ ThreadPool* ExecEnv::non_block_close_thread_pool() { return _non_block_close_thread_pool.get(); } +ExecEnv::ExecEnv() = default; + +ExecEnv::~ExecEnv() { + destroy(); +} + Status ExecEnv::init(ExecEnv* env, const std::vector& store_paths, const std::vector& spill_store_paths, const std::set& broken_paths) { @@ -265,16 +271,16 @@ Status ExecEnv::_init(const std::vector& store_paths, _store_paths.size() * config::flush_thread_num_per_store, static_cast(CpuInfo::num_cores()) * config::max_flush_thread_num_per_cpu); _load_stream_mgr = std::make_unique(num_flush_threads); - _new_load_stream_mgr = NewLoadStreamMgr::create_shared(); + _new_load_stream_mgr = NewLoadStreamMgr::create_unique(); _internal_client_cache = new BrpcClientCache(); _streaming_client_cache = new BrpcClientCache("baidu_std", "single", "streaming"); _function_client_cache = new BrpcClientCache(config::function_service_protocol); if (config::is_cloud_mode()) { - _stream_load_executor = std::make_shared(this); + _stream_load_executor = CloudStreamLoadExecutor::create_unique(this); } else { - _stream_load_executor = StreamLoadExecutor::create_shared(this); + _stream_load_executor = StreamLoadExecutor::create_unique(this); } _routine_load_task_executor = new RoutineLoadTaskExecutor(this); RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit())); @@ -283,7 +289,7 @@ Status ExecEnv::_init(const std::vector& store_paths, _memtable_memory_limiter = std::make_unique(); _load_stream_map_pool = std::make_unique(); _delta_writer_v2_pool = std::make_unique(); - _wal_manager = WalManager::create_shared(this, config::group_commit_wal_path); + _wal_manager = WalManager::create_unique(this, config::group_commit_wal_path); _dns_cache = new DNSCache(); _write_cooldown_meta_executors = std::make_unique(); _spill_stream_mgr = new vectorized::SpillStreamManager(std::move(spill_store_map)); @@ -439,8 +445,6 @@ Status ExecEnv::_init_mem_env() { return Status::InternalError(ss.str()); } - _dummy_lru_cache = std::make_shared(); - _cache_manager = CacheManager::create_global_instance(); int64_t storage_cache_limit = @@ -643,6 +647,30 @@ Status ExecEnv::_check_deploy_mode() { return Status::OK(); } +#ifdef BE_TEST +void ExecEnv::set_new_load_stream_mgr(std::unique_ptr&& new_load_stream_mgr) { + this->_new_load_stream_mgr = std::move(new_load_stream_mgr); +} + +void ExecEnv::clear_new_load_stream_mgr() { + this->_new_load_stream_mgr.reset(); +} + +void ExecEnv::set_stream_load_executor(std::unique_ptr&& stream_load_executor) { + this->_stream_load_executor = std::move(stream_load_executor); +} + +void ExecEnv::clear_stream_load_executor() { + this->_stream_load_executor.reset(); +} + +void ExecEnv::set_wal_mgr(std::unique_ptr&& wm) { + this->_wal_manager = std::move(wm); +} +void ExecEnv::clear_wal_mgr() { + this->_wal_manager.reset(); +} +#endif // TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method. // We need to stop all threads before releasing resource. void ExecEnv::destroy() { diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h index b2fb33b1ac5b52..89e8ab8cc8a033 100644 --- a/be/src/runtime/memory/lru_cache_policy.h +++ b/be/src/runtime/memory/lru_cache_policy.h @@ -44,8 +44,7 @@ class LRUCachePolicy : public CachePolicy { new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards, element_count_capacity)); } else { - CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); - _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); + _cache = std::make_shared(); } _init_mem_tracker(lru_cache_type_string(lru_cache_type)); } @@ -63,8 +62,7 @@ class LRUCachePolicy : public CachePolicy { cache_value_time_extractor, cache_value_check_timestamp, element_count_capacity)); } else { - CHECK(ExecEnv::GetInstance()->get_dummy_lru_cache()); - _cache = ExecEnv::GetInstance()->get_dummy_lru_cache(); + _cache = std::make_shared(); } _init_mem_tracker(lru_cache_type_string(lru_cache_type)); } @@ -155,7 +153,7 @@ class LRUCachePolicy : public CachePolicy { std::lock_guard l(_lock); COUNTER_SET(_freed_entrys_counter, (int64_t)0); COUNTER_SET(_freed_memory_counter, (int64_t)0); - if (_stale_sweep_time_s <= 0 && _cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + if (_stale_sweep_time_s <= 0 || std::dynamic_pointer_cast(_cache)) { return; } if (exceed_prune_limit()) { @@ -202,7 +200,7 @@ class LRUCachePolicy : public CachePolicy { std::lock_guard l(_lock); COUNTER_SET(_freed_entrys_counter, (int64_t)0); COUNTER_SET(_freed_memory_counter, (int64_t)0); - if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + if (std::dynamic_pointer_cast(_cache)) { return; } if ((force && mem_consumption() != 0) || exceed_prune_limit()) { @@ -244,7 +242,7 @@ class LRUCachePolicy : public CachePolicy { COUNTER_SET(_freed_entrys_counter, (int64_t)0); COUNTER_SET(_freed_memory_counter, (int64_t)0); COUNTER_SET(_cost_timer, (int64_t)0); - if (_cache == ExecEnv::GetInstance()->get_dummy_lru_cache()) { + if (std::dynamic_pointer_cast(_cache)) { return 0; } diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp index d797c081f41995..faa582704d11cc 100644 --- a/be/test/http/stream_load_test.cpp +++ b/be/test/http/stream_load_test.cpp @@ -54,11 +54,11 @@ void http_request_done_cb(struct evhttp_request* req, void* arg) { TEST_F(StreamLoadTest, TestHeader) { // 1G - auto wal_mgr = WalManager::create_shared(ExecEnv::GetInstance(), config::group_commit_wal_path); + auto wal_mgr = WalManager::create_unique(ExecEnv::GetInstance(), config::group_commit_wal_path); static_cast(wal_mgr->_wal_dirs_info->add("test_path_1", 1000, 0, 0)); static_cast(wal_mgr->_wal_dirs_info->add("test_path_2", 10000, 0, 0)); static_cast(wal_mgr->_wal_dirs_info->add("test_path_3", 100000, 0, 0)); - ExecEnv::GetInstance()->set_wal_mgr(wal_mgr); + ExecEnv::GetInstance()->set_wal_mgr(std::move(wal_mgr)); // 1. empty info { auto* evhttp_req = evhttp_request_new(nullptr, nullptr); diff --git a/be/test/olap/wal/wal_manager_test.cpp b/be/test/olap/wal/wal_manager_test.cpp index 32162593fc05c4..5a6ce49067bf46 100644 --- a/be/test/olap/wal/wal_manager_test.cpp +++ b/be/test/olap/wal/wal_manager_test.cpp @@ -59,12 +59,12 @@ class WalManagerTest : public testing::Test { _env->_cluster_info->master_fe_addr.hostname = "host name"; _env->_cluster_info->master_fe_addr.port = 1234; _env->_cluster_info->backend_id = 1001; - _env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared(); + _env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); _env->_internal_client_cache = new BrpcClientCache(); _env->_function_client_cache = new BrpcClientCache(); - _env->_stream_load_executor = StreamLoadExecutor::create_shared(_env); + _env->_stream_load_executor = StreamLoadExecutor::create_unique(_env); _env->_store_paths = {StorePath(std::filesystem::current_path(), 0)}; - _env->_wal_manager = WalManager::create_shared(_env, wal_dir.string()); + _env->set_wal_mgr(WalManager::create_unique(_env, wal_dir.string())); k_stream_load_begin_result = TLoadTxnBeginResult(); } void TearDown() override { @@ -78,6 +78,9 @@ class WalManagerTest : public testing::Test { SAFE_DELETE(_env->_function_client_cache); SAFE_DELETE(_env->_internal_client_cache); SAFE_DELETE(_env->_cluster_info); + _env->clear_new_load_stream_mgr(); + _env->clear_stream_load_executor(); + //_env->clear_wal_mgr(); } void prepare() { @@ -155,9 +158,9 @@ TEST_F(WalManagerTest, recovery_normal) { } TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) { - auto wal_mgr = WalManager::create_shared(_env, config::group_commit_wal_path); + auto wal_mgr = WalManager::create_unique(_env, config::group_commit_wal_path); static_cast(wal_mgr->init()); - _env->set_wal_mgr(wal_mgr); + _env->set_wal_mgr(std::move(wal_mgr)); // 1T size_t available_bytes = 1099511627776; diff --git a/be/test/runtime/routine_load_task_executor_test.cpp b/be/test/runtime/routine_load_task_executor_test.cpp index 5c2b39bce1f1bd..080d6ff4bc5987 100644 --- a/be/test/runtime/routine_load_task_executor_test.cpp +++ b/be/test/runtime/routine_load_task_executor_test.cpp @@ -49,23 +49,28 @@ class RoutineLoadTaskExecutorTest : public testing::Test { RoutineLoadTaskExecutorTest() = default; ~RoutineLoadTaskExecutorTest() override = default; + ExecEnv* _env = nullptr; + void SetUp() override { + _env = ExecEnv::GetInstance(); k_stream_load_begin_result = TLoadTxnBeginResult(); k_stream_load_commit_result = TLoadTxnCommitResult(); k_stream_load_rollback_result = TLoadTxnRollbackResult(); k_stream_load_put_result = TStreamLoadPutResult(); - _env.set_cluster_info(new ClusterInfo()); - _env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); - _env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env)); + _env->set_cluster_info(new ClusterInfo()); + _env->set_new_load_stream_mgr(NewLoadStreamMgr::create_unique()); + _env->set_stream_load_executor(StreamLoadExecutor::create_unique(_env)); config::max_routine_load_thread_pool_size = 1024; config::max_consumer_num_per_group = 3; } - void TearDown() override { delete _env.cluster_info(); } - - ExecEnv _env; + void TearDown() override { + delete _env->cluster_info(); + _env->clear_new_load_stream_mgr(); + _env->clear_stream_load_executor(); + } }; TEST_F(RoutineLoadTaskExecutorTest, exec_task) { @@ -92,7 +97,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) { task.__set_kafka_load_info(k_info); - RoutineLoadTaskExecutor executor(&_env); + RoutineLoadTaskExecutor executor(_env); Status st; st = executor.init(1024 * 1024); EXPECT_TRUE(st.ok()); diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 810f3e0c28f548..44cccb7fec16c4 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -68,7 +68,6 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance()); doris::ExecEnv::GetInstance()->set_process_profile( doris::ProcessProfile::create_global_instance()); - doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_shared()); doris::ExecEnv::GetInstance()->set_storage_page_cache( doris::StoragePageCache::create_global_cache(1 << 30, 10, 0)); doris::ExecEnv::GetInstance()->set_segment_loader(new doris::SegmentLoader(1000, 1000)); diff --git a/be/test/vec/exec/vwal_scanner_test.cpp b/be/test/vec/exec/vwal_scanner_test.cpp index 5c4056a8c24104..2e6d4bf5cdea76 100644 --- a/be/test/vec/exec/vwal_scanner_test.cpp +++ b/be/test/vec/exec/vwal_scanner_test.cpp @@ -79,6 +79,7 @@ class VWalScannerTest : public testing::Test { WARN_IF_ERROR(io::global_local_filesystem()->delete_directory(_wal_dir), fmt::format("fail to delete dir={}", _wal_dir)); SAFE_STOP(_env->_wal_manager); + _env->clear_wal_mgr(); } protected: @@ -286,7 +287,7 @@ void VWalScannerTest::init() { _env->_cluster_info->master_fe_addr.hostname = "host name"; _env->_cluster_info->master_fe_addr.port = _backend_id; _env->_cluster_info->backend_id = 1001; - _env->_wal_manager = WalManager::create_shared(_env, _wal_dir); + _env->set_wal_mgr(WalManager::create_unique(_env, _wal_dir)); std::string base_path; auto st = _env->_wal_manager->_init_wal_dirs_info(); st = _env->_wal_manager->create_wal_path(_db_id, _tb_id, _txn_id_1, _label_1, base_path,