-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathreclaimer_task_supervisor.cc
More file actions
91 lines (79 loc) · 3.29 KB
/
reclaimer_task_supervisor.cc
File metadata and controls
91 lines (79 loc) · 3.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include "kv_cache_manager/manager/reclaimer_task_supervisor.h"
#include <assert.h>
#include <chrono>
#include "kv_cache_manager/common/logger.h"
namespace kv_cache_manager {
namespace {
constexpr auto kDefaultSupervisorSleepTime = std::chrono::milliseconds(100);
constexpr auto kDefaultFutureWaitTime = std::chrono::microseconds(200);
} // namespace
ReclaimerTaskSupervisor::ReclaimerTaskSupervisor(std::shared_ptr<SchedulePlanExecutor> schedule_plan_executor)
: schedule_plan_executor_(schedule_plan_executor) {
assert(schedule_plan_executor_);
}
ReclaimerTaskSupervisor::~ReclaimerTaskSupervisor() { Stop(); }
void ReclaimerTaskSupervisor::Start() {
supervisor_ = std::thread([this]() { this->WorkLoop(); });
}
void ReclaimerTaskSupervisor::Stop() {
stop_.store(true, std::memory_order_relaxed);
if (supervisor_.joinable()) {
supervisor_.join();
}
}
void ReclaimerTaskSupervisor::Submit(const std::string &trace_id, CacheMetaDelRequest &&request) {
auto cell = std::make_shared<ReclaimerTaskSupervisorCell>();
cell->trace_id = trace_id;
cell->instance_id = request.instance_id;
cell->result = schedule_plan_executor_->Submit(request);
if (cell->result.valid()) {
cell_queue_.Push(cell);
}
}
void ReclaimerTaskSupervisor::Submit(const std::string &trace_id, CacheLocationDelRequest &&request) {
auto cell = std::make_shared<ReclaimerTaskSupervisorCell>();
cell->trace_id = trace_id;
cell->instance_id = request.instance_id;
cell->result = schedule_plan_executor_->Submit(request);
if (cell->result.valid()) {
cell_queue_.Push(cell);
}
}
void ReclaimerTaskSupervisor::WorkLoop() {
while (!stop_.load(std::memory_order_relaxed)) {
if (cell_queue_.Empty()) {
std::this_thread::sleep_for(kDefaultSupervisorSleepTime);
continue;
}
std::shared_ptr<ReclaimerTaskSupervisorCell> cell;
if (cell_queue_.Pop(&cell)) {
auto status = cell->result.wait_for(kDefaultFutureWaitTime);
if (status == std::future_status::ready) {
auto del_result = cell->result.get();
if (del_result.status != ErrorCode::EC_OK) {
// retry
CacheLocationDelRequest request;
request.instance_id = cell->instance_id;
request.delay = std::chrono::seconds(0);
for (const auto &meta : del_result.fail_metas) {
request.block_keys.push_back(meta.block_key);
request.location_ids.push_back(meta.location_ids);
}
cell->result = schedule_plan_executor_->Submit(request);
if (cell->result.valid()) {
cell_queue_.Push(cell);
}
} else {
KVCM_LOG_INFO("delete task finish : instance_id[%s] trace_id [%s] ec[%d] message[%s]",
cell->instance_id.c_str(),
cell->trace_id.c_str(),
del_result.status,
del_result.error_message.c_str());
}
} else {
cell_queue_.Push(cell);
}
}
}
}
} // namespace kv_cache_manager