Skip to content

Commit 69a80b9

Browse files
MrPresent-HanMrPresent-Han
andauthored
enhance: resize high priority wqthreadpool dynamically(#40838) (#41549)
related: #40838 Signed-off-by: MrPresent-Han <chun.han@gmail.com> Co-authored-by: MrPresent-Han <chun.han@gmail.com>
1 parent 8af350d commit 69a80b9

File tree

15 files changed

+140
-75
lines changed

15 files changed

+140
-75
lines changed

internal/core/src/common/Common.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
namespace milvus {
2121

2222
int64_t FILE_SLICE_SIZE = DEFAULT_INDEX_FILE_SLICE_SIZE;
23-
int64_t HIGH_PRIORITY_THREAD_CORE_COEFFICIENT =
23+
float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT =
2424
DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
25-
int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT =
25+
float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT =
2626
DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
27-
int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT =
27+
float LOW_PRIORITY_THREAD_CORE_COEFFICIENT =
2828
DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
2929
int CPU_NUM = DEFAULT_CPU_NUM;
3030
int64_t EXEC_EVAL_EXPR_BATCH_SIZE = DEFAULT_EXEC_EVAL_EXPR_BATCH_SIZE;
@@ -40,21 +40,21 @@ SetIndexSliceSize(const int64_t size) {
4040
}
4141

4242
void
43-
SetHighPriorityThreadCoreCoefficient(const int64_t coefficient) {
43+
SetHighPriorityThreadCoreCoefficient(const float coefficient) {
4444
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient;
4545
LOG_INFO("set high priority thread pool core coefficient: {}",
4646
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT);
4747
}
4848

4949
void
50-
SetMiddlePriorityThreadCoreCoefficient(const int64_t coefficient) {
50+
SetMiddlePriorityThreadCoreCoefficient(const float coefficient) {
5151
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient;
5252
LOG_INFO("set middle priority thread pool core coefficient: {}",
5353
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT);
5454
}
5555

5656
void
57-
SetLowPriorityThreadCoreCoefficient(const int64_t coefficient) {
57+
SetLowPriorityThreadCoreCoefficient(const float coefficient) {
5858
LOW_PRIORITY_THREAD_CORE_COEFFICIENT = coefficient;
5959
LOG_INFO("set low priority thread pool core coefficient: {}",
6060
LOW_PRIORITY_THREAD_CORE_COEFFICIENT);

internal/core/src/common/Common.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424
namespace milvus {
2525

2626
extern int64_t FILE_SLICE_SIZE;
27-
extern int64_t HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
28-
extern int64_t MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
29-
extern int64_t LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
27+
extern float HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
28+
extern float MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
29+
extern float LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
3030
extern int CPU_NUM;
3131
extern int64_t EXEC_EVAL_EXPR_BATCH_SIZE;
3232
extern int64_t JSON_KEY_STATS_COMMIT_INTERVAL;
@@ -36,13 +36,13 @@ void
3636
SetIndexSliceSize(const int64_t size);
3737

3838
void
39-
SetHighPriorityThreadCoreCoefficient(const int64_t coefficient);
39+
SetHighPriorityThreadCoreCoefficient(const float coefficient);
4040

4141
void
42-
SetMiddlePriorityThreadCoreCoefficient(const int64_t coefficient);
42+
SetMiddlePriorityThreadCoreCoefficient(const float coefficient);
4343

4444
void
45-
SetLowPriorityThreadCoreCoefficient(const int64_t coefficient);
45+
SetLowPriorityThreadCoreCoefficient(const float coefficient);
4646

4747
void
4848
SetCpuNum(const int core);

internal/core/src/common/Consts.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@ const char DEAFULT_QUERY_ID[] = "0";
5656
const char DEFAULT_TASK_ID[] = "0";
5757

5858
const int64_t DEFAULT_FIELD_MAX_MEMORY_LIMIT = 128 << 20; // bytes
59-
const int64_t DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = 10;
60-
const int64_t DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = 5;
61-
const int64_t DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT = 1;
59+
const float DEFAULT_HIGH_PRIORITY_THREAD_CORE_COEFFICIENT = 10.0;
60+
const float DEFAULT_MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT = 5.0;
61+
const float DEFAULT_LOW_PRIORITY_THREAD_CORE_COEFFICIENT = 1.0;
6262

6363
const int64_t DEFAULT_INDEX_FILE_SLICE_SIZE = 16 << 20; // bytes
6464

internal/core/src/common/init_c.cpp

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,10 @@
1414
// See the License for the specific language governing permissions and
1515
// limitations under the License.
1616

17-
#include <memory>
1817
#include <mutex>
19-
2018
#include "common/init_c.h"
21-
22-
#include <string>
23-
#include "common/Slice.h"
2419
#include "common/Common.h"
2520
#include "common/Tracer.h"
26-
#include "log/Log.h"
2721

2822
std::once_flag flag1, flag2, flag3, flag4, flag5, flag6, flag7, flag8, flag9,
2923
flag10;
@@ -36,32 +30,30 @@ InitIndexSliceSize(const int64_t size) {
3630
}
3731

3832
void
39-
InitHighPriorityThreadCoreCoefficient(const int64_t value) {
33+
InitHighPriorityThreadCoreCoefficient(const float value) {
4034
std::call_once(
4135
flag2,
42-
[](int64_t value) {
36+
[](float value) {
4337
milvus::SetHighPriorityThreadCoreCoefficient(value);
4438
},
4539
value);
4640
}
4741

4842
void
49-
InitMiddlePriorityThreadCoreCoefficient(const int64_t value) {
43+
InitMiddlePriorityThreadCoreCoefficient(const float value) {
5044
std::call_once(
5145
flag4,
52-
[](int64_t value) {
46+
[](float value) {
5347
milvus::SetMiddlePriorityThreadCoreCoefficient(value);
5448
},
5549
value);
5650
}
5751

5852
void
59-
InitLowPriorityThreadCoreCoefficient(const int64_t value) {
53+
InitLowPriorityThreadCoreCoefficient(const float value) {
6054
std::call_once(
6155
flag5,
62-
[](int64_t value) {
63-
milvus::SetLowPriorityThreadCoreCoefficient(value);
64-
},
56+
[](float value) { milvus::SetLowPriorityThreadCoreCoefficient(value); },
6557
value);
6658
}
6759

internal/core/src/common/init_c.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ void
2828
InitIndexSliceSize(const int64_t);
2929

3030
void
31-
InitHighPriorityThreadCoreCoefficient(const int64_t);
31+
InitHighPriorityThreadCoreCoefficient(const float);
3232

3333
void
34-
InitMiddlePriorityThreadCoreCoefficient(const int64_t);
34+
InitMiddlePriorityThreadCoreCoefficient(const float);
3535

3636
void
37-
InitLowPriorityThreadCoreCoefficient(const int64_t);
37+
InitLowPriorityThreadCoreCoefficient(const float);
3838

3939
void
4040
InitDefaultExprEvalBatchSize(int64_t val);

internal/core/src/storage/ThreadPool.h

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,24 @@ namespace milvus {
3333

3434
class ThreadPool {
3535
public:
36-
explicit ThreadPool(const int thread_core_coefficient, std::string name)
36+
explicit ThreadPool(const float thread_core_coefficient, std::string name)
3737
: shutdown_(false), name_(std::move(name)) {
3838
idle_threads_size_ = 0;
3939
current_threads_size_ = 0;
40-
min_threads_size_ = CPU_NUM;
41-
max_threads_size_ = CPU_NUM * thread_core_coefficient;
40+
min_threads_size_ = 1;
41+
max_threads_size_.store(std::max(
42+
1,
43+
static_cast<int>(std::round(CPU_NUM * thread_core_coefficient))));
4244

4345
// only IO pool will set large limit, but the CPU helps nothing to IO operations,
4446
// we need to limit the max thread num, each thread will download 16~64 MiB data,
4547
// according to our benchmark, 16 threads is enough to saturate the network bandwidth.
46-
if (min_threads_size_ > 16) {
47-
min_threads_size_ = 16;
48-
}
49-
if (max_threads_size_ > 16) {
50-
max_threads_size_ = 16;
48+
if (max_threads_size_.load() > 16) {
49+
max_threads_size_.store(16);
5150
}
5251
LOG_INFO("Init thread pool:{}", name_)
5352
<< " with min worker num:" << min_threads_size_
54-
<< " and max worker num:" << max_threads_size_;
53+
<< " and max worker num:" << max_threads_size_.load();
5554
Init();
5655
}
5756

@@ -80,7 +79,7 @@ class ThreadPool {
8079

8180
size_t
8281
GetMaxThreadNum() {
83-
return max_threads_size_;
82+
return max_threads_size_.load();
8483
}
8584

8685
template <typename F, typename... Args>
@@ -100,7 +99,7 @@ class ThreadPool {
10099

101100
if (idle_threads_size_ > 0) {
102101
condition_lock_.notify_one();
103-
} else if (current_threads_size_ < max_threads_size_) {
102+
} else if (current_threads_size_ < max_threads_size_.load()) {
104103
// Dynamic increase thread number
105104
std::thread t(&ThreadPool::Worker, this);
106105
assert(threads_.find(t.get_id()) == threads_.end());
@@ -117,11 +116,18 @@ class ThreadPool {
117116
void
118117
FinishThreads();
119118

119+
void
120+
Resize(int new_size) {
121+
//no need to hold mutex here as we don't require
122+
//max_threads_size to take effect instantly, just guaranteed atomic
123+
max_threads_size_.store(new_size);
124+
}
125+
120126
public:
121127
int min_threads_size_;
122128
int idle_threads_size_;
123129
int current_threads_size_;
124-
int max_threads_size_;
130+
std::atomic<int> max_threads_size_;
125131
bool shutdown_;
126132
static constexpr size_t WAIT_SECONDS = 2;
127133
SafeQueue<std::function<void()>> work_queue_;

internal/core/src/storage/ThreadPools.cpp

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,8 @@ namespace milvus {
1919

2020
std::map<ThreadPoolPriority, std::unique_ptr<ThreadPool>>
2121
ThreadPools::thread_pool_map;
22-
std::map<ThreadPoolPriority, int64_t> ThreadPools::coefficient_map;
2322
std::map<ThreadPoolPriority, std::string> ThreadPools::name_map;
2423
std::shared_mutex ThreadPools::mutex_;
25-
ThreadPools ThreadPools::threadPools;
26-
bool ThreadPools::has_setup_coefficients = false;
2724

2825
void
2926
ThreadPools::ShutDown() {
@@ -38,19 +35,44 @@ ThreadPool&
3835
ThreadPools::GetThreadPool(milvus::ThreadPoolPriority priority) {
3936
std::unique_lock<std::shared_mutex> lock(mutex_);
4037
auto iter = thread_pool_map.find(priority);
41-
if (!ThreadPools::has_setup_coefficients) {
42-
ThreadPools::SetUpCoefficients();
43-
ThreadPools::has_setup_coefficients = true;
44-
}
4538
if (iter != thread_pool_map.end()) {
4639
return *(iter->second);
4740
} else {
48-
int64_t coefficient = coefficient_map[priority];
41+
float coefficient = 1.0;
42+
switch (priority) {
43+
case milvus::ThreadPoolPriority::HIGH:
44+
coefficient = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
45+
break;
46+
case milvus::ThreadPoolPriority::MIDDLE:
47+
coefficient = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
48+
break;
49+
default:
50+
coefficient = LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
51+
break;
52+
}
4953
std::string name = name_map[priority];
5054
auto result = thread_pool_map.emplace(
5155
priority, std::make_unique<ThreadPool>(coefficient, name));
5256
return *(result.first->second);
5357
}
5458
}
5559

60+
void
61+
ThreadPools::ResizeThreadPool(milvus::ThreadPoolPriority priority,
62+
float ratio) {
63+
int size = static_cast<int>(std::round(milvus::CPU_NUM * ratio));
64+
if (size < 1) {
65+
LOG_ERROR("Failed to resize threadPool, size:{}", size);
66+
return;
67+
}
68+
std::unique_lock<std::shared_mutex> lock(mutex_);
69+
auto iter = thread_pool_map.find(priority);
70+
if (iter == thread_pool_map.end()) {
71+
LOG_ERROR("Failed to find threadPool, priority:{}", priority);
72+
return;
73+
}
74+
iter->second->Resize(size);
75+
LOG_INFO("Resized threadPool priority:{}, size:{}", priority, size);
76+
}
77+
5678
} // namespace milvus

internal/core/src/storage/ThreadPools.h

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ class ThreadPools {
3333
static ThreadPool&
3434
GetThreadPool(ThreadPoolPriority priority);
3535

36+
static void
37+
ResizeThreadPool(ThreadPoolPriority priority, float ratio);
38+
3639
~ThreadPools() {
3740
ShutDown();
3841
}
@@ -43,25 +46,12 @@ class ThreadPools {
4346
name_map[MIDDLE] = "middle_priority_thread_pool";
4447
name_map[LOW] = "low_priority_thread_pool";
4548
}
46-
static void
47-
SetUpCoefficients() {
48-
coefficient_map[HIGH] = HIGH_PRIORITY_THREAD_CORE_COEFFICIENT;
49-
coefficient_map[MIDDLE] = MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT;
50-
coefficient_map[LOW] = LOW_PRIORITY_THREAD_CORE_COEFFICIENT;
51-
LOG_INFO("Init ThreadPools, high_priority_co={}, middle={}, low={}",
52-
HIGH_PRIORITY_THREAD_CORE_COEFFICIENT,
53-
MIDDLE_PRIORITY_THREAD_CORE_COEFFICIENT,
54-
LOW_PRIORITY_THREAD_CORE_COEFFICIENT);
55-
}
5649
void
5750
ShutDown();
5851
static std::map<ThreadPoolPriority, std::unique_ptr<ThreadPool>>
5952
thread_pool_map;
60-
static std::map<ThreadPoolPriority, int64_t> coefficient_map;
6153
static std::map<ThreadPoolPriority, std::string> name_map;
6254
static std::shared_mutex mutex_;
63-
static ThreadPools threadPools;
64-
static bool has_setup_coefficients;
6555
};
6656

6757
} // namespace milvus

internal/core/src/storage/storage_c.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "storage/RemoteChunkManagerSingleton.h"
2020
#include "storage/LocalChunkManagerSingleton.h"
2121
#include "storage/MmapManager.h"
22+
#include "storage/ThreadPools.h"
2223

2324
CStatus
2425
GetLocalUsedSize(const char* c_dir, int64_t* size) {
@@ -116,3 +117,9 @@ void
116117
CleanRemoteChunkManagerSingleton() {
117118
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Release();
118119
}
120+
121+
void
122+
ResizeTheadPool(int64_t priority, float ratio) {
123+
milvus::ThreadPools::ResizeThreadPool(
124+
static_cast<milvus::ThreadPoolPriority>(priority), ratio);
125+
}

internal/core/src/storage/storage_c.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ InitMmapManager(CMmapConfig c_mmap_config);
3636
void
3737
CleanRemoteChunkManagerSingleton();
3838

39+
void
40+
ResizeTheadPool(int64_t priority, float ratio);
41+
3942
#ifdef __cplusplus
4043
};
4144
#endif

0 commit comments

Comments
 (0)