diff --git a/src/common/bulk_load_common.cpp b/src/common/bulk_load_common.cpp index fca0dbf2de..e4a0f04b09 100644 --- a/src/common/bulk_load_common.cpp +++ b/src/common/bulk_load_common.cpp @@ -17,12 +17,11 @@ #include "bulk_load_common.h" -namespace dsn { -namespace replication { +namespace dsn::replication { const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info"); -const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10; +const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10000; +const int32_t bulk_load_constant::BULK_LOAD_INGEST_REQUEST_INTERVAL = 150; const std::string bulk_load_constant::BULK_LOAD_METADATA("bulk_load_metadata"); const std::string bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR("bulk_load"); const int32_t bulk_load_constant::PROGRESS_FINISHED = 100; -} // namespace replication -} // namespace dsn +} // namespace dsn::replication diff --git a/src/common/bulk_load_common.h b/src/common/bulk_load_common.h index 5e5ed9bf50..06da731efa 100644 --- a/src/common/bulk_load_common.h +++ b/src/common/bulk_load_common.h @@ -47,6 +47,7 @@ class bulk_load_constant public: static const std::string BULK_LOAD_INFO; static const int32_t BULK_LOAD_REQUEST_INTERVAL; + static const int32_t BULK_LOAD_INGEST_REQUEST_INTERVAL; static const std::string BULK_LOAD_METADATA; static const std::string BULK_LOAD_LOCAL_ROOT_DIR; static const int32_t PROGRESS_FINISHED; diff --git a/src/meta/meta_bulk_load_service.cpp b/src/meta/meta_bulk_load_service.cpp index 33570381bf..e88f696f98 100644 --- a/src/meta/meta_bulk_load_service.cpp +++ b/src/meta/meta_bulk_load_service.cpp @@ -587,11 +587,14 @@ void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name FAIL_POINT_INJECT_F("meta_bulk_load_resend_request", [](std::string_view) {}); zauto_read_lock l(_lock); if (is_app_bulk_loading_unlocked(pid.get_app_id())) { + int32_t interval = _partition_bulk_load_info[pid].status == bulk_load_status::BLS_INGESTING + ? bulk_load_constant::BULK_LOAD_INGEST_REQUEST_INTERVAL + : bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL; tasking::enqueue(LPC_META_STATE_NORMAL, _meta_svc->tracker(), std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid), 0, - std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL)); + std::chrono::milliseconds(interval)); } } @@ -1263,11 +1266,12 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g if (!try_partition_ingestion(pc, app->helpers->contexts[pid.get_partition_index()])) { LOG_WARNING( "app({}) partition({}) couldn't execute ingestion, wait and try later", app_name, pid); - tasking::enqueue(LPC_META_STATE_NORMAL, - _meta_svc->tracker(), - std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid), - pid.thread_hash(), - std::chrono::seconds(5)); + tasking::enqueue( + LPC_META_STATE_NORMAL, + _meta_svc->tracker(), + std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid), + pid.thread_hash(), + std::chrono::milliseconds(bulk_load_constant::BULK_LOAD_INGEST_REQUEST_INTERVAL)); return; } @@ -1279,7 +1283,7 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g std::bind( &bulk_load_service::send_ingestion_request, this, app_name, pid, primary, meta_ballot), 0, - std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL)); + std::chrono::milliseconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL)); } // ThreadPool: THREAD_POOL_DEFAULT