Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reduce partition ingest time (#2194) #2195

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/common/bulk_load_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

#include "bulk_load_common.h"

namespace dsn {
namespace replication {
namespace dsn::replication {
const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info");
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10;
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10000;
const int32_t bulk_load_constant::BULK_LOAD_INGEST_REQUEST_INTERVAL = 150;
const std::string bulk_load_constant::BULK_LOAD_METADATA("bulk_load_metadata");
const std::string bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR("bulk_load");
const int32_t bulk_load_constant::PROGRESS_FINISHED = 100;
} // namespace replication
} // namespace dsn
} // namespace dsn::replication
1 change: 1 addition & 0 deletions src/common/bulk_load_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class bulk_load_constant
public:
static const std::string BULK_LOAD_INFO;
static const int32_t BULK_LOAD_REQUEST_INTERVAL;
static const int32_t BULK_LOAD_INGEST_REQUEST_INTERVAL;
static const std::string BULK_LOAD_METADATA;
static const std::string BULK_LOAD_LOCAL_ROOT_DIR;
static const int32_t PROGRESS_FINISHED;
Expand Down
18 changes: 11 additions & 7 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,11 +587,14 @@ void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name
FAIL_POINT_INJECT_F("meta_bulk_load_resend_request", [](std::string_view) {});
zauto_read_lock l(_lock);
if (is_app_bulk_loading_unlocked(pid.get_app_id())) {
int32_t interval = _partition_bulk_load_info[pid].status == bulk_load_status::BLS_INGESTING
? bulk_load_constant::BULK_LOAD_INGEST_REQUEST_INTERVAL
: bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL;
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid),
0,
std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL));
std::chrono::milliseconds(interval));
}
}

Expand Down Expand Up @@ -1263,11 +1266,12 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
if (!try_partition_ingestion(pc, app->helpers->contexts[pid.get_partition_index()])) {
LOG_WARNING(
"app({}) partition({}) couldn't execute ingestion, wait and try later", app_name, pid);
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid),
pid.thread_hash(),
std::chrono::seconds(5));
tasking::enqueue(
LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid),
pid.thread_hash(),
std::chrono::milliseconds(bulk_load_constant::BULK_LOAD_INGEST_REQUEST_INTERVAL));
return;
}

Expand All @@ -1279,7 +1283,7 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
std::bind(
&bulk_load_service::send_ingestion_request, this, app_name, pid, primary, meta_ballot),
0,
std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL));
std::chrono::milliseconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL));
}

// ThreadPool: THREAD_POOL_DEFAULT
Expand Down
Loading