-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathschedule_plan_executor.h
More file actions
116 lines (95 loc) · 4.29 KB
/
schedule_plan_executor.h
File metadata and controls
116 lines (95 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#pragma once
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <set>
#include <thread>
#include <vector>
#include "kv_cache_manager/common/error_code.h"
#include "kv_cache_manager/manager/meta_searcher.h"
#include "kv_cache_manager/metrics/metrics_registry.h"
namespace kv_cache_manager {
#ifndef KVCM_METRICS_FOR_SCHEDULE_PLAN_EXECUTOR
#define KVCM_METRICS_FOR_SCHEDULE_PLAN_EXECUTOR(name) \
public: \
DECLARE_METRICS_NAME_(schedule_plan_executor, name); \
DEFINE_GET_METRICS_COUNTER_(schedule_plan_executor, name) \
\
private: \
DECLARE_METRICS_COUNTER_(schedule_plan_executor, name);
#endif
class MetaIndexerManager;
class DataStorageManager;
struct CacheMetaDelRequest {
std::string instance_id;
std::vector<int64_t> block_keys;
std::chrono::microseconds delay{std::chrono::seconds(0)};
};
struct PlanExecuteResultFailMeta {
int64_t block_key;
std::vector<ErrorCode> status_vec;
std::vector<std::string> location_ids;
};
struct PlanExecuteResult {
ErrorCode status;
std::string error_message;
std::vector<PlanExecuteResultFailMeta> fail_metas;
};
struct CacheLocationDelRequest {
std::string instance_id;
std::vector<int64_t> block_keys;
std::vector<std::vector<std::string>> location_ids;
std::chrono::microseconds delay{std::chrono::seconds(0)};
};
struct ScheduledTask {
std::function<void()> task;
std::chrono::steady_clock::time_point execute_time;
uint64_t sequence_id;
bool operator<(const ScheduledTask &other) const {
if (execute_time != other.execute_time) {
return execute_time < other.execute_time;
}
// ensure strict weak ordering when execute_time is same
return sequence_id < other.sequence_id;
}
};
class SchedulePlanExecutor {
public:
explicit SchedulePlanExecutor(unsigned int thread_count,
const std::shared_ptr<MetaIndexerManager> &meta_manager,
const std::shared_ptr<DataStorageManager> &storage_manager,
const std::shared_ptr<MetricsRegistry> &metrics_registry);
~SchedulePlanExecutor();
std::future<PlanExecuteResult> Submit(const CacheMetaDelRequest &task);
std::future<PlanExecuteResult> Submit(const CacheLocationDelRequest &task);
bool SubmitNonBlocking(const CacheMetaDelRequest &req);
bool SubmitNonBlocking(const CacheLocationDelRequest &req);
private:
std::shared_ptr<MetaIndexerManager> meta_manager_;
std::shared_ptr<DataStorageManager> data_storage_manager_;
std::shared_ptr<MetricsRegistry> metrics_registry_;
std::vector<std::thread> workers_;
std::atomic<bool> stop_;
std::multiset<ScheduledTask> tasks_;
std::mutex queue_mutex_;
std::condition_variable condition_;
std::atomic<uint64_t> sequence_counter_{0};
void WorkerRoutine();
void Stop();
bool SubmitRaw(const std::function<void()> &task, std::chrono::microseconds delay);
static bool FillActualTask(const std::vector<int64_t> &batch_cas_block_keys,
const std::vector<std::vector<MetaSearcher::LocationCASTask>> &batch_cas_tasks,
const std::vector<std::vector<ErrorCode>> &batch_results,
CacheLocationDelRequest &actual_task,
std::string &error_message);
void DoLocationDelTask(const std::shared_ptr<std::promise<PlanExecuteResult>> &promise,
const CacheLocationDelRequest &task);
KVCM_METRICS_FOR_SCHEDULE_PLAN_EXECUTOR(waiting_task_count)
KVCM_METRICS_FOR_SCHEDULE_PLAN_EXECUTOR(executing_task_count)
};
#undef KVCM_METRICS_FOR_SCHEDULE_PLAN_EXECUTOR
} // namespace kv_cache_manager