Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions hikyuu_cpp/hikyuu/GlobalInitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,16 @@ void GlobalInitializer::init() {
TA_Initialize();
#endif

size_t cpu_num = std::thread::hardware_concurrency() * 3 / 2;
if (cpu_num > 128) {
cpu_num = 128;
} else if (cpu_num > 64) {
cpu_num = cpu_num * 10 / 8;
}
init_global_task_group(cpu_num);

DataDriverFactory::init();
StockManager::instance();
IndicatorImp::initDynEngine();
getGlobalSpotAgent();
}

Expand Down Expand Up @@ -117,7 +124,6 @@ void GlobalInitializer::clean() {
sysinfo_clean();
releaseScheduler();
releaseGlobalSpotAgent();
IndicatorImp::releaseDynEngine();

#if !HKU_OS_OSX
// 主动停止异步数据加载任务组,否则 hdf5 在 linux 下会报关闭异常
Expand Down
73 changes: 13 additions & 60 deletions hikyuu_cpp/hikyuu/indicator/IndicatorImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "Indicator.h"
#include "IndParam.h"
#include "../Stock.h"
#include "../GlobalInitializer.h"
#include "imp/ICval.h"
#include "imp/IContext.h"

Expand All @@ -24,7 +23,6 @@ BOOST_CLASS_EXPORT(hku::IndicatorImp)
namespace hku {

bool IndicatorImp::ms_enable_increment_calculate{true};
GlobalStealThreadPool *IndicatorImp::ms_tg = nullptr;

string HKU_API getOPTypeName(IndicatorImp::OPType op) {
string name;
Expand Down Expand Up @@ -104,27 +102,6 @@ string HKU_API getOPTypeName(IndicatorImp::OPType op) {
return name;
}

void IndicatorImp::initDynEngine() {
size_t cpu_num = std::thread::hardware_concurrency() * 3 / 2;
if (cpu_num > 128) {
cpu_num = 128;
} else if (cpu_num > 64) {
cpu_num = cpu_num * 10 / 8;
}

// 由于 GlobalInitializer 机制,目前借用在此处初始化全局任务组
init_global_task_group(cpu_num);
ms_tg = get_global_task_group();
}

void IndicatorImp::releaseDynEngine() {
HKU_TRACE("releaseDynEngine");
// 目前的 GlobalInitializer 机制,global_task_group 实际可能已经释放
// 可能导致 double free, 这里只停止,不负责释放
// release_global_task_group();
// ms_tg = nullptr;
}

HKU_API std::ostream &operator<<(std::ostream &os, const IndicatorImp &imp) {
os << imp.str();
return os;
Expand Down Expand Up @@ -1908,10 +1885,8 @@ void IndicatorImp::_dyn_calculate(const Indicator &ind) {

const value_t *param_data = ind_param->data();

static const size_t minCircleLength = 400;
size_t workerNum = ms_tg->worker_num();
if (total < minCircleLength || isSerial() || workerNum == 1) {
// HKU_INFO("single_thread");
static constexpr size_t minCircleLength = 400;
if (total < minCircleLength || isSerial()) {
for (size_t i = ind.discard(); i < total; i++) {
if (std::isnan(param_data[i])) {
_set(Null<value_t>(), i);
Expand All @@ -1924,39 +1899,17 @@ void IndicatorImp::_dyn_calculate(const Indicator &ind) {
return;
}

// HKU_INFO("multi_thread");
size_t circleLength = minCircleLength;
if (minCircleLength * workerNum < total) {
size_t tailCount = total % workerNum;
circleLength = tailCount == 0 ? total / workerNum : total / workerNum + 1;
}

std::vector<std::future<void>> tasks;
for (size_t group = 0; group < workerNum; group++) {
size_t first = circleLength * group;
if (first >= total) {
break;
}
tasks.push_back(
ms_tg->submit([this, &ind, first, circleLength, total, group, param_data]() {
size_t endPos = first + circleLength;
if (endPos > total) {
endPos = total;
}
for (size_t i = circleLength * group; i < endPos; i++) {
if (std::isnan(param_data[i])) {
_set(Null<value_t>(), i);
} else {
size_t step = size_t(param_data[i]);
_dyn_run_one_step(ind, i, step);
}
}
}));
}

for (auto &task : tasks) {
task.get();
}
global_parallel_for_index_void(
ind.discard(), total,
[&ind, param_data, this](size_t i) {
if (std::isnan(param_data[i])) {
_set(Null<value_t>(), i);
} else {
size_t step = size_t(param_data[i]);
_dyn_run_one_step(ind, i, step);
}
},
minCircleLength);

_update_discard();
}
Expand Down
1 change: 0 additions & 1 deletion hikyuu_cpp/hikyuu/indicator/IndicatorImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ class HKU_API IndicatorImp : public enable_shared_from_this<IndicatorImp> {

protected:
static bool ms_enable_increment_calculate;
static GlobalStealThreadPool* ms_tg;

#if HKU_SUPPORT_SERIALIZATION
private:
Expand Down
39 changes: 3 additions & 36 deletions hikyuu_cpp/hikyuu/indicator/imp/IAma.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,43 +182,10 @@ void IAma::_dyn_calculate(const Indicator& ind) {
size_t total = ind.size();
HKU_IF_RETURN(0 == total || m_discard >= total, void());

static const size_t minCircleLength = 400;
size_t workerNum = ms_tg->worker_num();
if (total < minCircleLength || workerNum == 1) {
for (size_t i = ind.discard(); i < total; i++) {
_dyn_one_circle(ind, i, n[i], fast_n[i], slow_n[i]);
}
_update_discard();
return;
}

size_t circleLength = minCircleLength;
if (minCircleLength * workerNum < total) {
size_t tailCount = total % workerNum;
circleLength = tailCount == 0 ? total / workerNum : total / workerNum + 1;
}
global_parallel_for_index_void(
ind.discard(), total, [&](size_t i) { _dyn_one_circle(ind, i, n[i], fast_n[i], slow_n[i]); },
400);

std::vector<std::future<void>> tasks;
for (size_t group = 0; group < workerNum; group++) {
size_t first = circleLength * group;
if (first >= total) {
break;
}
tasks.push_back(
ms_tg->submit([this, &ind, &n, &fast_n, &slow_n, first, circleLength, total, group]() {
size_t endPos = first + circleLength;
if (endPos > total) {
endPos = total;
}
for (size_t i = circleLength * group; i < endPos; i++) {
_dyn_one_circle(ind, i, n[i], fast_n[i], slow_n[i]);
}
}));
}

for (auto& task : tasks) {
task.get();
}
_update_discard();
}

Expand Down
39 changes: 2 additions & 37 deletions hikyuu_cpp/hikyuu/indicator/imp/IMacd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,43 +116,8 @@ void IMacd::_dyn_calculate(const Indicator& ind) {
size_t total = ind.size();
HKU_IF_RETURN(0 == total || m_discard >= total, void());

static const size_t minCircleLength = 400;
size_t workerNum = ms_tg->worker_num();
if (total < minCircleLength || workerNum == 1) {
for (size_t i = ind.discard(); i < total; i++) {
_dyn_one_circle(ind, i, n1[i], n2[i], n3[i]);
}
_update_discard();
return;
}

size_t circleLength = minCircleLength;
if (minCircleLength * workerNum < total) {
size_t tailCount = total % workerNum;
circleLength = tailCount == 0 ? total / workerNum : total / workerNum + 1;
}

std::vector<std::future<void>> tasks;
for (size_t group = 0; group < workerNum; group++) {
size_t first = circleLength * group;
if (first >= total) {
break;
}
tasks.push_back(
ms_tg->submit([this, &ind, &n1, &n2, &n3, first, circleLength, total, group]() {
size_t endPos = first + circleLength;
if (endPos > total) {
endPos = total;
}
for (size_t i = circleLength * group; i < endPos; i++) {
_dyn_one_circle(ind, i, n1[i], n2[i], n3[i]);
}
}));
}

for (auto& task : tasks) {
task.get();
}
global_parallel_for_index_void(ind.discard(), total,
[&](size_t i) { _dyn_one_circle(ind, i, n1[i], n2[i], n3[i]); });

_update_discard();
}
Expand Down
38 changes: 2 additions & 36 deletions hikyuu_cpp/hikyuu/indicator/imp/ISaftyLoss.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,43 +112,9 @@ void ISaftyLoss::_dyn_calculate(const Indicator& ind) {
size_t total = ind.size();
HKU_IF_RETURN(0 == total || m_discard >= total, void());

static const size_t minCircleLength = 400;
size_t workerNum = ms_tg->worker_num();
if (total < minCircleLength || workerNum == 1) {
for (size_t i = ind.discard(); i < total; i++) {
_dyn_one_circle(ind, i, n1[i], n2[i], p[i]);
}
_update_discard();
return;
}

size_t circleLength = minCircleLength;
if (minCircleLength * workerNum < total) {
size_t tailCount = total % workerNum;
circleLength = tailCount == 0 ? total / workerNum : total / workerNum + 1;
}
global_parallel_for_index_void(
ind.discard(), total, [&](size_t i) { _dyn_one_circle(ind, i, n1[i], n2[i], p[i]); }, 400);

std::vector<std::future<void>> tasks;
for (size_t group = 0; group < workerNum; group++) {
size_t first = circleLength * group;
if (first >= total) {
break;
}
tasks.push_back(
ms_tg->submit([this, &ind, &n1, &n2, &p, first, circleLength, total, group]() {
size_t endPos = first + circleLength;
if (endPos > total) {
endPos = total;
}
for (size_t i = circleLength * group; i < endPos; i++) {
_dyn_one_circle(ind, i, n1[i], n2[i], p[i]);
}
}));
}

for (auto& task : tasks) {
task.get();
}
_update_discard();
}

Expand Down
37 changes: 2 additions & 35 deletions hikyuu_cpp/hikyuu/indicator/imp/ISma.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,42 +79,9 @@ void ISma::_dyn_calculate(const Indicator& ind) {
size_t total = ind.size();
HKU_IF_RETURN(0 == total || m_discard >= total, void());

static const size_t minCircleLength = 400;
size_t workerNum = ms_tg->worker_num();
if (total < minCircleLength || workerNum == 1) {
for (size_t i = ind.discard(); i < total; i++) {
_dyn_one_circle(ind, i, n[i], m[i]);
}
_update_discard();
return;
}

size_t circleLength = minCircleLength;
if (minCircleLength * workerNum < total) {
size_t tailCount = total % workerNum;
circleLength = tailCount == 0 ? total / workerNum : total / workerNum + 1;
}
global_parallel_for_index_void(
ind.discard(), total, [&](size_t i) { _dyn_one_circle(ind, i, n[i], m[i]); }, 400);

std::vector<std::future<void>> tasks;
for (size_t group = 0; group < workerNum; group++) {
size_t first = circleLength * group;
if (first >= total) {
break;
}
tasks.push_back(ms_tg->submit([this, &ind, &n, &m, first, circleLength, group, total]() {
size_t endPos = first + circleLength;
if (endPos > total) {
endPos = total;
}
for (size_t i = circleLength * group; i < endPos; i++) {
_dyn_one_circle(ind, i, n[i], m[i]);
}
}));
}

for (auto& task : tasks) {
task.get();
}
_update_discard();
}

Expand Down
Loading