-
Notifications
You must be signed in to change notification settings - Fork 724
线程池使用
Hikyuu量化框架提供了多种线程池实现,用于支持高性能的并行计算。本文档深入讲解ThreadPool和GlobalThreadPool的任务提交机制,包括如何封装可调用对象、任务队列的管理以及工作线程的调度策略。通过实际代码示例展示如何向线程池提交异步任务,并获取执行结果。重点描述在量化回测中如何利用线程池并行处理多个股票的历史数据计算,提升整体性能。
本节内容未直接分析具体源文件,因此不提供来源信息。
Hikyuu框架提供了多种线程池实现,包括集中式任务队列和分布式任务队列两种架构。这些线程池都继承自统一的接口设计,但针对不同的使用场景进行了优化。
classDiagram
class ThreadPool {
+worker_num() size_t
+submit(FunctionType f) auto
+done() bool
+remain_task_count() size_t
+stop() void
+join() void
}
class GlobalThreadPool {
+worker_num() size_t
+submit(FunctionType f) auto
+done() bool
+remain_task_count() size_t
+stop() void
+join() void
}
class GlobalMQThreadPool {
+worker_num() size_t
+submit(FunctionType f) auto
+done() bool
+remain_task_count() size_t
+stop() void
+join() void
}
class GlobalStealThreadPool {
+worker_num() size_t
+submit(FunctionType f) auto
+done() bool
+remain_task_count() size_t
+stop() void
+join() void
}
ThreadPool <|-- GlobalThreadPool
ThreadPool <|-- GlobalMQThreadPool
ThreadPool <|-- GlobalStealThreadPool
图示来源
- ThreadPool.h
- GlobalThreadPool.h
- GlobalMQThreadPool.h
- GlobalStealThreadPool.h
本节来源
- ThreadPool.h
- GlobalThreadPool.h
- GlobalMQThreadPool.h
- GlobalStealThreadPool.h
线程池的核心功能是接收并执行异步任务。Hikyuu框架中的线程池通过submit方法接收可调用对象,并返回一个future对象用于获取执行结果。
当调用submit方法时,线程池会执行以下步骤:
- 检查线程池是否处于运行状态
- 使用std::packaged_task包装可调用对象
- 获取任务的返回类型并创建对应的future
- 将包装后的任务放入任务队列
sequenceDiagram
participant Client as "客户端"
participant ThreadPool as "线程池"
participant TaskQueue as "任务队列"
participant Worker as "工作线程"
Client->>ThreadPool : submit(可调用对象)
ThreadPool->>ThreadPool : 检查运行状态
ThreadPool->>ThreadPool : 创建packaged_task
ThreadPool->>ThreadPool : 获取future对象
ThreadPool->>TaskQueue : 将任务推入队列
TaskQueue-->>ThreadPool : 确认入队
ThreadPool-->>Client : 返回future对象
loop 工作线程循环
Worker->>TaskQueue : 尝试获取任务
TaskQueue-->>Worker : 返回任务
Worker->>Worker : 执行任务
end
图示来源
- ThreadPool.h
- GlobalThreadPool.h
- GlobalMQThreadPool.h
- GlobalStealThreadPool.h
本节来源
- ThreadPool.h
- GlobalThreadPool.h
- GlobalMQThreadPool.h
- GlobalStealThreadPool.h
Hikyuu使用FuncWrapper类来统一包装各种可调用对象,包括函数指针、函数对象、lambda表达式等。这种设计实现了类型擦除,使得线程池可以接受任何可调用的对象。
classDiagram
class FuncWrapper {
+operator()() void
+isNullTask() bool
}
class impl_base {
+call() void
}
class impl_type~F~ {
-f F
+call() void
}
FuncWrapper --> impl_base : "包含"
impl_type~F~ --> impl_base : "继承"
图示来源
- FuncWrapper.h
本节来源
- FuncWrapper.h
线程池使用线程安全队列来管理待执行的任务。根据不同的线程池类型,队列管理策略有所不同:
- 集中式队列:所有工作线程共享一个全局任务队列
- 分布式队列:每个工作线程拥有自己的任务队列
- 偷取式队列:工作线程优先执行本地队列任务,空闲时从其他线程队列偷取任务
classDiagram
class ThreadSafeQueue~T~ {
+push(T&& item) void
+wait_and_pop(T& value) void
+try_pop(T& value) bool
+empty() bool
+size() size_t
+clear() void
}
class WorkStealQueue {
+push_front(data_type&& data) void
+push_back(data_type&& data) void
+try_pop(data_type& res) bool
+try_steal(data_type& res) bool
+empty() bool
+size() size_t
+clear() void
}
ThreadSafeQueue~T~ <|-- WorkStealQueue
图示来源
- ThreadSafeQueue.h
- WorkStealQueue.h
本节来源
- ThreadSafeQueue.h
- WorkStealQueue.h
集中式线程池(如ThreadPool和GlobalThreadPool)使用单一任务队列,所有工作线程从同一个队列中获取任务。这种策略简单高效,但在高并发场景下可能存在队列竞争。
flowchart TD
Start([线程启动]) --> CheckDone["检查终止标志"]
CheckDone --> |未终止| GetTask["从主队列获取任务"]
GetTask --> |获取成功| Execute["执行任务"]
Execute --> CheckDone
GetTask --> |获取失败| Break["退出循环"]
Break --> End([线程结束])
图示来源
- ThreadPool.h
- GlobalThreadPool.h
本节来源
- ThreadPool.h
- GlobalThreadPool.h
分布式线程池(如GlobalMQThreadPool)为每个工作线程分配独立的任务队列。任务提交时会选择任务最少的队列,从而实现负载均衡。
flowchart TD
Submit([提交任务]) --> FindQueue["查找任务最少的队列"]
FindQueue --> |找到空队列| SelectIndex["选择该队列"]
FindQueue --> |无空队列| FindMin["选择任务数最少的队列"]
SelectIndex --> Enqueue["将任务加入队列"]
FindMin --> Enqueue
Enqueue --> Return["返回future对象"]
图示来源
- GlobalMQThreadPool.h
本节来源
- GlobalMQThreadPool.h
偷取式线程池(如GlobalStealThreadPool)结合了本地队列和全局队列的优势。工作线程优先执行本地队列的任务,当本地队列为空时,会尝试从其他线程的队列尾部"偷取"任务。
flowchart TD
Start([执行待处理任务]) --> LocalQueue["尝试从本地队列获取任务"]
LocalQueue --> |成功| Execute["执行任务"]
LocalQueue --> |失败| MasterQueue["尝试从主队列获取任务"]
MasterQueue --> |成功| Execute
MasterQueue --> |失败| StealTask["尝试从其他线程队列偷取任务"]
StealTask --> |成功| Execute
StealTask --> |失败| Wait["等待主队列有任务"]
Wait --> LocalQueue
Execute --> Start
图示来源
- GlobalStealThreadPool.h
本节来源
- GlobalStealThreadPool.h
在量化回测中,线程池被广泛用于并行处理多个股票的历史数据计算。以Indicator计算为例,系统会将不同股票的计算任务分配给线程池并行执行。
sequenceDiagram
participant Main as "主线程"
participant ThreadPool as "线程池"
participant StockA as "股票A计算"
participant StockB as "股票B计算"
participant StockC as "股票C计算"
Main->>ThreadPool : 提交股票A计算任务
Main->>ThreadPool : 提交股票B计算任务
Main->>ThreadPool : 提交股票C计算任务
ThreadPool->>StockA : 分配工作线程执行
ThreadPool->>StockB : 分配工作线程执行
ThreadPool->>StockC : 分配工作线程执行
StockA-->>ThreadPool : 计算完成
StockB-->>ThreadPool : 计算完成
StockC-->>ThreadPool : 计算完成
ThreadPool-->>Main : 所有任务完成
图示来源
- IndicatorImp.cpp
- combinate.cpp
本节来源
- IndicatorImp.cpp
- combinate.cpp
以下是线程池在Hikyuu框架中的实际使用示例:
// 创建线程池,使用系统CPU核心数
ThreadPool tg(std::thread::hardware_concurrency());
// 提交多个异步任务
std::vector<std::future<void>> tasks;
for (const auto& stock : stock_list) {
auto future = tg.submit([stock, &indicator] {
// 计算单个股票的技术指标
indicator.calculate(stock);
});
tasks.push_back(std::move(future));
}
// 等待所有任务完成
for (auto& task : tasks) {
task.wait();
}本节来源
- StockManager.cpp
- IndicatorImp.cpp
根据应用场景选择合适的线程池类型:
- 普通计算任务:使用ThreadPool或GlobalThreadPool
- 负载均衡要求高:使用GlobalMQThreadPool
- 递归任务或任务创建任务:使用GlobalStealThreadPool
线程数量的设置应考虑以下因素:
- CPU核心数:通常设置为CPU核心数或稍多
- I/O等待:如果任务包含大量I/O操作,可以设置更多线程
- 内存消耗:每个线程都有栈空间开销,需考虑内存限制
任务粒度过小会导致线程调度开销过大,建议:
- 确保单个任务的执行时间远大于线程调度时间
- 对小任务进行批量处理
- 使用合适的任务分解策略
本节内容未直接分析具体源文件,因此不提供来源信息。
Hikyuu框架的线程池系统提供了灵活高效的并行计算能力。通过深入理解ThreadPool和GlobalThreadPool的任务提交机制、任务封装方式、队列管理和调度策略,开发者可以充分利用多核处理器的计算能力,在量化回测等计算密集型场景中显著提升性能。在实际应用中,应根据具体需求选择合适的线程池类型和配置参数,以达到最佳的性能表现。
本节内容未直接分析具体源文件,因此不提供来源信息。