-
Notifications
You must be signed in to change notification settings - Fork 724
线程池配置
hikyuu框架提供了多种线程池实现,以满足不同的并发需求。这些线程池主要位于hikyuu_cpp/hikyuu/utilities/thread/目录下,包括:
- GlobalThreadPool:全局集中式任务队列线程池,所有任务放入一个共享队列中,工作线程从该队列中获取任务执行。
- GlobalMQThreadPool:全局分布式线程池,每个工作线程拥有独立的任务队列,任务提交时选择任务最少的队列。
- GlobalStealThreadPool:分布偷取式线程池,支持任务窃取机制,适用于递归任务场景。
- GlobalMQStealThreadPool:无集中队列多队列偷取任务池,结合了分布式队列和任务窃取的优点。
这些线程池的设计旨在为hikyuu框架提供高效的并行处理能力,特别是在大规模回测等计算密集型任务中。
Section sources
- GlobalThreadPool.h
- GlobalMQThreadPool.h
- GlobalStealThreadPool.h
- GlobalMQStealThreadPool.h
核心线程数是线程池创建时初始化的工作线程数量。在hikyuu的线程池实现中,可以通过构造函数的n参数指定核心线程数。如果未指定,线程池会默认使用std::thread::hardware_concurrency()获取的系统CPU核心数作为线程数。
explicit GlobalThreadPool(size_t n, bool until_empty = true)在hikyuu的线程池设计中,核心线程数即为最大线程数。线程池采用固定大小的设计,不会动态创建或销毁线程,这有助于减少线程创建和销毁的开销,并避免资源过度消耗。
hikyuu的线程池设计为长期运行的全局线程池,不涉及空闲线程存活时间的概念。线程池中的线程会一直运行,直到程序结束或显式调用stop()或join()方法。until_empty参数控制线程池在任务队列为空时的行为:
- 当
until_empty为true时,线程池会在任务队列为空后自动停止运行。 - 当
until_empty为false时,需要显式调用stop()方法来停止线程池。
Section sources
- GlobalThreadPool.h
- GlobalThreadPool.h
- GlobalMQThreadPool.h
- GlobalStealThreadPool.h
- GlobalMQStealThreadPool.h
线程池的初始化主要通过构造函数完成。hikyuu提供了两种初始化方式:
- 默认初始化:使用系统CPU核心数作为线程数
GlobalThreadPool() : GlobalThreadPool(std::thread::hardware_concurrency()) {}- 指定线程数初始化:通过构造函数参数指定线程数
explicit GlobalThreadPool(size_t n, bool until_empty = true)为了最大化利用硬件资源,建议将线程池大小设置为系统CPU核心数。hikyuu框架通过std::thread::hardware_concurrency()函数自动获取系统核心数,确保线程池规模与硬件资源相匹配。
在实际应用中,可以根据任务类型进行微调:
- 对于CPU密集型任务,线程数应等于或略小于CPU核心数,以避免过多的上下文切换开销。
- 对于I/O密集型任务,可以适当增加线程数,以充分利用I/O等待时间。
Section sources
- GlobalThreadPool.h
- GlobalMQThreadPool.h
- GlobalStealThreadPool.h
- GlobalMQStealThreadPool.h
在hikyuu框架中,线程池被广泛应用于大规模回测任务的并行处理。通过parallel_run_sys和parallel_run_pf等函数,可以将多个系统或投资组合的回测任务并行执行,显著提高回测效率。
vector<FundsList> HKU_API parallel_run_sys(const SystemList& system_list, const KQuery& query,
bool reset, bool resetAll) {
return parallel_for_index(0, system_list.size(), [&](size_t i) {
// 回测逻辑
});
}hikyuu提供了parallel_for_index等并行算法,这些算法底层依赖于线程池实现并行计算。通过将大任务分解为多个小任务并分配给不同的工作线程,可以充分利用多核处理器的计算能力。
inline std::vector<range_t> parallelIndexRange(size_t start, size_t end) {
// 计算并行索引范围
}在测试代码中,可以看到线程池的实际使用方式:
TEST_CASE("test_ThreadPool") {
ThreadPool tg(8);
for (int i = 0; i < 10; i++) {
tg.submit([=]() {
HKU_INFO("{}: ------------------- [{}]", i, std::this_thread::get_id());
});
}
tg.join();
}Section sources
- misc.cpp
- algorithm.h
- test_ThreadPool.cpp
对于CPU密集型任务,如复杂的数学计算、技术指标计算等,建议采用以下配置:
- 线程数:设置为CPU核心数或略少(如CPU核心数-1),以避免过多的上下文切换。
-
线程池类型:推荐使用
GlobalThreadPool或GlobalStealThreadPool,前者适用于任务独立的场景,后者适用于可能存在递归任务的场景。 - 配置示例:
// 使用系统默认核心数
GlobalThreadPool cpu_pool;
// 或指定核心数
GlobalThreadPool cpu_pool(std::thread::hardware_concurrency());对于I/O密集型任务,如数据下载、文件读写等,建议采用以下配置:
- 线程数:可以设置为CPU核心数的2-4倍,以充分利用I/O等待时间。
-
线程池类型:推荐使用
GlobalMQThreadPool或GlobalMQStealThreadPool,这些线程池的分布式队列设计更适合处理I/O密集型任务。 - 配置示例:
// I/O密集型任务,线程数可适当增加
size_t io_threads = std::thread::hardware_concurrency() * 2;
GlobalMQThreadPool io_pool(io_threads);对于同时包含CPU和I/O操作的混合型任务,建议根据任务特征进行权衡:
- 如果CPU操作占主导,按CPU密集型任务配置。
- 如果I/O操作占主导,按I/O密集型任务配置。
- 可以考虑使用多个专用线程池,分别处理CPU密集型和I/O密集型任务。
Section sources
- GlobalThreadPool.h
- GlobalMQThreadPool.h
- GlobalStealThreadPool.h
- GlobalMQStealThreadPool.h
- 合理选择线程池类型:根据任务特征选择合适的线程池类型,避免使用不匹配的线程池导致性能下降。
- 匹配硬件资源:将线程池大小与系统CPU核心数相匹配,充分利用硬件资源。
- 避免过度并行:过多的线程可能导致上下文切换开销增加,反而降低性能。
- 正确管理线程生命周期:对于长期运行的应用,使用全局线程池;对于短期任务,注意及时释放资源。
-
监控线程池状态:利用
remain_task_count()等方法监控线程池状态,及时发现性能瓶颈。 - 异常处理:在任务提交和执行过程中做好异常处理,避免因单个任务失败导致整个线程池崩溃。
通过遵循这些最佳实践,可以充分发挥hikyuu框架线程池的优势,提高大规模回测和其他计算任务的执行效率。
Section sources
- GlobalThreadPool.h
- GlobalMQThreadPool.h
- GlobalStealThreadPool.h
- GlobalMQStealThreadPool.h