From 414d2b04b7704ee98b836bf4b6238be4f8a67f08 Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Mon, 17 Feb 2025 20:30:48 +0800 Subject: [PATCH 1/3] feat: reduce partition ingest time (#2194) The main reason is that during the ingest phase, the time between meta's RPC_BULK_LOAD transmissions is too long (once every 10 seconds). Shortening this time interval can cut down the partition write-blocking time. --- src/common/bulk_load_common.cpp | 3 ++- src/common/bulk_load_common.h | 1 + src/meta/meta_bulk_load_service.cpp | 9 ++++++--- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/common/bulk_load_common.cpp b/src/common/bulk_load_common.cpp index fca0dbf2de..367494c107 100644 --- a/src/common/bulk_load_common.cpp +++ b/src/common/bulk_load_common.cpp @@ -20,7 +20,8 @@ namespace dsn { namespace replication { const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info"); -const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10; +const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10000; +const int32_t bulk_load_constant::BULK_LOAD_INGEST_REQUEST_INTERVAL = 150; const std::string bulk_load_constant::BULK_LOAD_METADATA("bulk_load_metadata"); const std::string bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR("bulk_load"); const int32_t bulk_load_constant::PROGRESS_FINISHED = 100; diff --git a/src/common/bulk_load_common.h b/src/common/bulk_load_common.h index 5e5ed9bf50..06da731efa 100644 --- a/src/common/bulk_load_common.h +++ b/src/common/bulk_load_common.h @@ -47,6 +47,7 @@ class bulk_load_constant public: static const std::string BULK_LOAD_INFO; static const int32_t BULK_LOAD_REQUEST_INTERVAL; + static const int32_t BULK_LOAD_INGEST_REQUEST_INTERVAL; static const std::string BULK_LOAD_METADATA; static const std::string BULK_LOAD_LOCAL_ROOT_DIR; static const int32_t PROGRESS_FINISHED; diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index 33570381bf..84e6bab2a0 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -587,11 +587,14 @@ void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name FAIL_POINT_INJECT_F("meta_bulk_load_resend_request", [](std::string_view) {}); zauto_read_lock l(_lock); if (is_app_bulk_loading_unlocked(pid.get_app_id())) { + int32_t interval = _partition_bulk_load_info[pid].status == bulk_load_status::BLS_INGESTING + ? bulk_load_constant::BULK_LOAD_INGEST_REQUEST_INTERVAL + : bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL; tasking::enqueue(LPC_META_STATE_NORMAL, _meta_svc->tracker(), std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid), 0, - std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL)); + std::chrono::milliseconds(interval)); } } @@ -1267,7 +1270,7 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g _meta_svc->tracker(), std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid), pid.thread_hash(), - std::chrono::seconds(5)); + std::chrono::milliseconds(bulk_load_constant::BULK_LOAD_INGEST_REQUEST_INTERVAL)); return; } @@ -1279,7 +1282,7 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g std::bind( &bulk_load_service::send_ingestion_request, this, app_name, pid, primary, meta_ballot), 0, - std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL)); + std::chrono::milliseconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL)); } // ThreadPool: THREAD_POOL_DEFAULT From d9ab3653806d2b8a417d60d8847a49e0a12ad1a5 Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Tue, 18 Feb 2025 11:05:35 +0800 Subject: [PATCH 2/3] follow clang-format --- src/meta/meta_bulk_load_service.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index 84e6bab2a0..e88f696f98 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -1266,11 +1266,12 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g if (!try_partition_ingestion(pc, app->helpers->contexts[pid.get_partition_index()])) { LOG_WARNING( "app({}) partition({}) couldn't execute ingestion, wait and try later", app_name, pid); - tasking::enqueue(LPC_META_STATE_NORMAL, - _meta_svc->tracker(), - std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid), - pid.thread_hash(), - std::chrono::milliseconds(bulk_load_constant::BULK_LOAD_INGEST_REQUEST_INTERVAL)); + tasking::enqueue( + LPC_META_STATE_NORMAL, + _meta_svc->tracker(), + std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid), + pid.thread_hash(), + std::chrono::milliseconds(bulk_load_constant::BULK_LOAD_INGEST_REQUEST_INTERVAL)); return; } From 3e5391cbc730527810c61fc93366d40cc51ef467 Mon Sep 17 00:00:00 2001 From: lupengfan1 Date: Tue, 18 Feb 2025 11:34:44 +0800 Subject: [PATCH 3/3] follow clang-tidy --- src/common/bulk_load_common.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/common/bulk_load_common.cpp b/src/common/bulk_load_common.cpp index 367494c107..e4a0f04b09 100644 --- a/src/common/bulk_load_common.cpp +++ b/src/common/bulk_load_common.cpp @@ -17,13 +17,11 @@ #include "bulk_load_common.h" -namespace dsn { -namespace replication { +namespace dsn::replication { const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info"); const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10000; const int32_t bulk_load_constant::BULK_LOAD_INGEST_REQUEST_INTERVAL = 150; const std::string bulk_load_constant::BULK_LOAD_METADATA("bulk_load_metadata"); const std::string bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR("bulk_load"); const int32_t bulk_load_constant::PROGRESS_FINISHED = 100; -} // namespace replication -} // namespace dsn +} // namespace dsn::replication