Skip to content

Commit ac299a9

Browse files
authored
Merge pull request #445 from fasiondog/feature/dev
feat(indicator): 重构指标动态周期计算的并行执行逻辑
2 parents dc846ef + 9b43467 commit ac299a9

File tree

8 files changed

+55
-229
lines changed

8 files changed

+55
-229
lines changed

hikyuu_cpp/hikyuu/GlobalInitializer.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,16 @@ void GlobalInitializer::init() {
7777
TA_Initialize();
7878
#endif
7979

80+
size_t cpu_num = std::thread::hardware_concurrency() * 3 / 2;
81+
if (cpu_num > 128) {
82+
cpu_num = 128;
83+
} else if (cpu_num > 64) {
84+
cpu_num = cpu_num * 10 / 8;
85+
}
86+
init_global_task_group(cpu_num);
87+
8088
DataDriverFactory::init();
8189
StockManager::instance();
82-
IndicatorImp::initDynEngine();
8390
getGlobalSpotAgent();
8491
}
8592

@@ -117,7 +124,6 @@ void GlobalInitializer::clean() {
117124
sysinfo_clean();
118125
releaseScheduler();
119126
releaseGlobalSpotAgent();
120-
IndicatorImp::releaseDynEngine();
121127

122128
#if !HKU_OS_OSX
123129
// 主动停止异步数据加载任务组,否则 hdf5 在 linux 下会报关闭异常

hikyuu_cpp/hikyuu/indicator/IndicatorImp.cpp

Lines changed: 13 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#include "Indicator.h"
1414
#include "IndParam.h"
1515
#include "../Stock.h"
16-
#include "../GlobalInitializer.h"
1716
#include "imp/ICval.h"
1817
#include "imp/IContext.h"
1918

@@ -24,7 +23,6 @@ BOOST_CLASS_EXPORT(hku::IndicatorImp)
2423
namespace hku {
2524

2625
bool IndicatorImp::ms_enable_increment_calculate{true};
27-
GlobalStealThreadPool *IndicatorImp::ms_tg = nullptr;
2826

2927
string HKU_API getOPTypeName(IndicatorImp::OPType op) {
3028
string name;
@@ -104,27 +102,6 @@ string HKU_API getOPTypeName(IndicatorImp::OPType op) {
104102
return name;
105103
}
106104

107-
void IndicatorImp::initDynEngine() {
108-
size_t cpu_num = std::thread::hardware_concurrency() * 3 / 2;
109-
if (cpu_num > 128) {
110-
cpu_num = 128;
111-
} else if (cpu_num > 64) {
112-
cpu_num = cpu_num * 10 / 8;
113-
}
114-
115-
// 由于 GlobalInitializer 机制,目前借用在此处初始化全局任务组
116-
init_global_task_group(cpu_num);
117-
ms_tg = get_global_task_group();
118-
}
119-
120-
void IndicatorImp::releaseDynEngine() {
121-
HKU_TRACE("releaseDynEngine");
122-
// 目前的 GlobalInitializer 机制,global_task_group 实际可能已经释放
123-
// 可能导致 double free, 这里只停止,不负责释放
124-
// release_global_task_group();
125-
// ms_tg = nullptr;
126-
}
127-
128105
HKU_API std::ostream &operator<<(std::ostream &os, const IndicatorImp &imp) {
129106
os << imp.str();
130107
return os;
@@ -1908,10 +1885,8 @@ void IndicatorImp::_dyn_calculate(const Indicator &ind) {
19081885

19091886
const value_t *param_data = ind_param->data();
19101887

1911-
static const size_t minCircleLength = 400;
1912-
size_t workerNum = ms_tg->worker_num();
1913-
if (total < minCircleLength || isSerial() || workerNum == 1) {
1914-
// HKU_INFO("single_thread");
1888+
static constexpr size_t minCircleLength = 400;
1889+
if (total < minCircleLength || isSerial()) {
19151890
for (size_t i = ind.discard(); i < total; i++) {
19161891
if (std::isnan(param_data[i])) {
19171892
_set(Null<value_t>(), i);
@@ -1924,39 +1899,17 @@ void IndicatorImp::_dyn_calculate(const Indicator &ind) {
19241899
return;
19251900
}
19261901

1927-
// HKU_INFO("multi_thread");
1928-
size_t circleLength = minCircleLength;
1929-
if (minCircleLength * workerNum < total) {
1930-
size_t tailCount = total % workerNum;
1931-
circleLength = tailCount == 0 ? total / workerNum : total / workerNum + 1;
1932-
}
1933-
1934-
std::vector<std::future<void>> tasks;
1935-
for (size_t group = 0; group < workerNum; group++) {
1936-
size_t first = circleLength * group;
1937-
if (first >= total) {
1938-
break;
1939-
}
1940-
tasks.push_back(
1941-
ms_tg->submit([this, &ind, first, circleLength, total, group, param_data]() {
1942-
size_t endPos = first + circleLength;
1943-
if (endPos > total) {
1944-
endPos = total;
1945-
}
1946-
for (size_t i = circleLength * group; i < endPos; i++) {
1947-
if (std::isnan(param_data[i])) {
1948-
_set(Null<value_t>(), i);
1949-
} else {
1950-
size_t step = size_t(param_data[i]);
1951-
_dyn_run_one_step(ind, i, step);
1952-
}
1953-
}
1954-
}));
1955-
}
1956-
1957-
for (auto &task : tasks) {
1958-
task.get();
1959-
}
1902+
global_parallel_for_index_void(
1903+
ind.discard(), total,
1904+
[&ind, param_data, this](size_t i) {
1905+
if (std::isnan(param_data[i])) {
1906+
_set(Null<value_t>(), i);
1907+
} else {
1908+
size_t step = size_t(param_data[i]);
1909+
_dyn_run_one_step(ind, i, step);
1910+
}
1911+
},
1912+
minCircleLength);
19601913

19611914
_update_discard();
19621915
}

hikyuu_cpp/hikyuu/indicator/IndicatorImp.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,6 @@ class HKU_API IndicatorImp : public enable_shared_from_this<IndicatorImp> {
309309

310310
protected:
311311
static bool ms_enable_increment_calculate;
312-
static GlobalStealThreadPool* ms_tg;
313312

314313
#if HKU_SUPPORT_SERIALIZATION
315314
private:

hikyuu_cpp/hikyuu/indicator/imp/IAma.cpp

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -182,43 +182,10 @@ void IAma::_dyn_calculate(const Indicator& ind) {
182182
size_t total = ind.size();
183183
HKU_IF_RETURN(0 == total || m_discard >= total, void());
184184

185-
static const size_t minCircleLength = 400;
186-
size_t workerNum = ms_tg->worker_num();
187-
if (total < minCircleLength || workerNum == 1) {
188-
for (size_t i = ind.discard(); i < total; i++) {
189-
_dyn_one_circle(ind, i, n[i], fast_n[i], slow_n[i]);
190-
}
191-
_update_discard();
192-
return;
193-
}
194-
195-
size_t circleLength = minCircleLength;
196-
if (minCircleLength * workerNum < total) {
197-
size_t tailCount = total % workerNum;
198-
circleLength = tailCount == 0 ? total / workerNum : total / workerNum + 1;
199-
}
185+
global_parallel_for_index_void(
186+
ind.discard(), total, [&](size_t i) { _dyn_one_circle(ind, i, n[i], fast_n[i], slow_n[i]); },
187+
400);
200188

201-
std::vector<std::future<void>> tasks;
202-
for (size_t group = 0; group < workerNum; group++) {
203-
size_t first = circleLength * group;
204-
if (first >= total) {
205-
break;
206-
}
207-
tasks.push_back(
208-
ms_tg->submit([this, &ind, &n, &fast_n, &slow_n, first, circleLength, total, group]() {
209-
size_t endPos = first + circleLength;
210-
if (endPos > total) {
211-
endPos = total;
212-
}
213-
for (size_t i = circleLength * group; i < endPos; i++) {
214-
_dyn_one_circle(ind, i, n[i], fast_n[i], slow_n[i]);
215-
}
216-
}));
217-
}
218-
219-
for (auto& task : tasks) {
220-
task.get();
221-
}
222189
_update_discard();
223190
}
224191

hikyuu_cpp/hikyuu/indicator/imp/IMacd.cpp

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -116,43 +116,8 @@ void IMacd::_dyn_calculate(const Indicator& ind) {
116116
size_t total = ind.size();
117117
HKU_IF_RETURN(0 == total || m_discard >= total, void());
118118

119-
static const size_t minCircleLength = 400;
120-
size_t workerNum = ms_tg->worker_num();
121-
if (total < minCircleLength || workerNum == 1) {
122-
for (size_t i = ind.discard(); i < total; i++) {
123-
_dyn_one_circle(ind, i, n1[i], n2[i], n3[i]);
124-
}
125-
_update_discard();
126-
return;
127-
}
128-
129-
size_t circleLength = minCircleLength;
130-
if (minCircleLength * workerNum < total) {
131-
size_t tailCount = total % workerNum;
132-
circleLength = tailCount == 0 ? total / workerNum : total / workerNum + 1;
133-
}
134-
135-
std::vector<std::future<void>> tasks;
136-
for (size_t group = 0; group < workerNum; group++) {
137-
size_t first = circleLength * group;
138-
if (first >= total) {
139-
break;
140-
}
141-
tasks.push_back(
142-
ms_tg->submit([this, &ind, &n1, &n2, &n3, first, circleLength, total, group]() {
143-
size_t endPos = first + circleLength;
144-
if (endPos > total) {
145-
endPos = total;
146-
}
147-
for (size_t i = circleLength * group; i < endPos; i++) {
148-
_dyn_one_circle(ind, i, n1[i], n2[i], n3[i]);
149-
}
150-
}));
151-
}
152-
153-
for (auto& task : tasks) {
154-
task.get();
155-
}
119+
global_parallel_for_index_void(ind.discard(), total,
120+
[&](size_t i) { _dyn_one_circle(ind, i, n1[i], n2[i], n3[i]); });
156121

157122
_update_discard();
158123
}

hikyuu_cpp/hikyuu/indicator/imp/ISaftyLoss.cpp

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -112,43 +112,9 @@ void ISaftyLoss::_dyn_calculate(const Indicator& ind) {
112112
size_t total = ind.size();
113113
HKU_IF_RETURN(0 == total || m_discard >= total, void());
114114

115-
static const size_t minCircleLength = 400;
116-
size_t workerNum = ms_tg->worker_num();
117-
if (total < minCircleLength || workerNum == 1) {
118-
for (size_t i = ind.discard(); i < total; i++) {
119-
_dyn_one_circle(ind, i, n1[i], n2[i], p[i]);
120-
}
121-
_update_discard();
122-
return;
123-
}
124-
125-
size_t circleLength = minCircleLength;
126-
if (minCircleLength * workerNum < total) {
127-
size_t tailCount = total % workerNum;
128-
circleLength = tailCount == 0 ? total / workerNum : total / workerNum + 1;
129-
}
115+
global_parallel_for_index_void(
116+
ind.discard(), total, [&](size_t i) { _dyn_one_circle(ind, i, n1[i], n2[i], p[i]); }, 400);
130117

131-
std::vector<std::future<void>> tasks;
132-
for (size_t group = 0; group < workerNum; group++) {
133-
size_t first = circleLength * group;
134-
if (first >= total) {
135-
break;
136-
}
137-
tasks.push_back(
138-
ms_tg->submit([this, &ind, &n1, &n2, &p, first, circleLength, total, group]() {
139-
size_t endPos = first + circleLength;
140-
if (endPos > total) {
141-
endPos = total;
142-
}
143-
for (size_t i = circleLength * group; i < endPos; i++) {
144-
_dyn_one_circle(ind, i, n1[i], n2[i], p[i]);
145-
}
146-
}));
147-
}
148-
149-
for (auto& task : tasks) {
150-
task.get();
151-
}
152118
_update_discard();
153119
}
154120

hikyuu_cpp/hikyuu/indicator/imp/ISma.cpp

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -79,42 +79,9 @@ void ISma::_dyn_calculate(const Indicator& ind) {
7979
size_t total = ind.size();
8080
HKU_IF_RETURN(0 == total || m_discard >= total, void());
8181

82-
static const size_t minCircleLength = 400;
83-
size_t workerNum = ms_tg->worker_num();
84-
if (total < minCircleLength || workerNum == 1) {
85-
for (size_t i = ind.discard(); i < total; i++) {
86-
_dyn_one_circle(ind, i, n[i], m[i]);
87-
}
88-
_update_discard();
89-
return;
90-
}
91-
92-
size_t circleLength = minCircleLength;
93-
if (minCircleLength * workerNum < total) {
94-
size_t tailCount = total % workerNum;
95-
circleLength = tailCount == 0 ? total / workerNum : total / workerNum + 1;
96-
}
82+
global_parallel_for_index_void(
83+
ind.discard(), total, [&](size_t i) { _dyn_one_circle(ind, i, n[i], m[i]); }, 400);
9784

98-
std::vector<std::future<void>> tasks;
99-
for (size_t group = 0; group < workerNum; group++) {
100-
size_t first = circleLength * group;
101-
if (first >= total) {
102-
break;
103-
}
104-
tasks.push_back(ms_tg->submit([this, &ind, &n, &m, first, circleLength, group, total]() {
105-
size_t endPos = first + circleLength;
106-
if (endPos > total) {
107-
endPos = total;
108-
}
109-
for (size_t i = circleLength * group; i < endPos; i++) {
110-
_dyn_one_circle(ind, i, n[i], m[i]);
111-
}
112-
}));
113-
}
114-
115-
for (auto& task : tasks) {
116-
task.get();
117-
}
11885
_update_discard();
11986
}
12087

0 commit comments

Comments
 (0)