-
Notifications
You must be signed in to change notification settings - Fork 724
Threading and Concurrency
- Introduction
- Thread Pool Implementations
- Synchronization Primitives
- Task Encapsulation
- Parallel Processing Algorithms
- Practical Examples
- Thread Safety and Performance
- Conclusion
The Hikyuu quantitative trading framework provides a comprehensive threading and concurrency infrastructure designed to handle CPU-intensive quantitative analysis tasks efficiently. This documentation covers the thread pool implementations, synchronization primitives, and parallel processing utilities that enable high-performance concurrent execution for data loading, backtesting, and strategy execution. The system is optimized for quantitative finance workloads that require parallel processing of large datasets and complex calculations.
Section sources
- GlobalThreadPool.h
- GlobalMQThreadPool.h
The GlobalThreadPool is a centralized thread pool implementation with a single shared task queue. It is designed for scenarios where tasks are independent and do not have dependencies on each other. The thread pool creates a fixed number of worker threads that consume tasks from the shared queue. This implementation is suitable for general-purpose parallel processing where load balancing is handled by the central queue.
classDiagram
class GlobalThreadPool {
+worker_num() size_t
+remain_task_count() size_t
+submit(FunctionType f) auto
+done() bool
+stop() void
+join() void
}
class ThreadSafeQueue {
+push(T&& item) void
+wait_and_pop(T& value) void
+try_pop(T& value) bool
+empty() bool
+size() size_t
}
GlobalThreadPool --> ThreadSafeQueue : "has"
Diagram sources
- GlobalThreadPool.h
- ThreadSafeQueue.h
Section sources
- GlobalThreadPool.h
The GlobalMQThreadPool is a distributed thread pool implementation with multiple task queues, one for each worker thread. When submitting a task, it is placed in the queue with the fewest tasks to achieve load balancing. This approach reduces contention compared to a single shared queue and provides better cache locality. The implementation is particularly effective for workloads with varying task execution times.
classDiagram
class GlobalMQThreadPool {
+worker_num() size_t
+remain_task_count() size_t
+submit(FunctionType f) auto
+done() bool
+stop() void
+join() void
}
class ThreadSafeQueue {
+push(T&& item) void
+wait_and_pop(T& value) void
+try_pop(T& value) bool
+empty() bool
+size() size_t
}
GlobalMQThreadPool --> ThreadSafeQueue : "has"
ThreadSafeQueue "1..*" -- "1" GlobalMQThreadPool : "worker queues"
Diagram sources
- GlobalMQThreadPool.h
- ThreadSafeQueue.h
Section sources
- GlobalMQThreadPool.h
The GlobalMQStealThreadPool implements a work-stealing algorithm with multiple queues. Each worker thread has its own queue and can "steal" tasks from other threads' queues when its own queue is empty. This approach provides excellent load balancing for irregular workloads and recursive algorithms. The work-stealing mechanism ensures that all threads remain busy, maximizing CPU utilization.
sequenceDiagram
participant Client
participant ThreadPool
participant Worker1
participant Worker2
participant Worker3
Client->>ThreadPool : submit(task)
ThreadPool->>Worker1 : Add to least loaded queue
loop Task Execution
Worker1->>Worker1 : Execute local tasks
alt Queue empty
Worker1->>Worker2 : Steal task from tail
Worker1->>Worker3 : Steal task from tail
end
end
ThreadPool->>Client : All tasks completed
Diagram sources
- GlobalMQStealThreadPool.h
- MQStealQueue.h
Section sources
- GlobalMQStealThreadPool.h
The ThreadSafeQueue is a thread-safe FIFO queue implementation that uses mutex and condition variables for synchronization. It provides blocking and non-blocking operations for task submission and retrieval. The queue is used as the underlying data structure for task distribution in the thread pool implementations.
classDiagram
class ThreadSafeQueue {
-m_mutex std : : mutex
-m_queue std : : queue~T~
-m_cond std : : condition_variable
+push(T&& item) void
+wait_and_pop(T& value) void
+try_pop(T& value) bool
+empty() bool
+size() const size_t
+clear() void
}
Diagram sources
- ThreadSafeQueue.h
Section sources
- ThreadSafeQueue.h
The MQStealQueue extends the thread-safe queue with work-stealing capabilities. It uses a deque (double-ended queue) to allow tasks to be added to the front (push_front) and stolen from the back (try_steal). This design enables the work-stealing algorithm where idle threads can steal tasks from the tail of busy threads' queues.
classDiagram
class MQStealQueue {
-m_mutex std : : mutex
-m_queue std : : deque~T~
-m_cond std : : condition_variable
+push(T&& item) void
+push_front(T&& data) void
+wait_and_pop(T& value) void
+try_pop(T& value) bool
+try_steal(T& res) bool
+empty() bool
+size() const size_t
}
Diagram sources
- MQStealQueue.h
Section sources
- MQStealQueue.h
The InterruptFlag is a thread-safe boolean flag used for signaling thread interruption. It wraps an atomic boolean and provides thread-safe operations for setting and checking the flag state. This primitive is used by thread pools to signal worker threads to stop execution gracefully.
classDiagram
class InterruptFlag {
-m_flag std : : atomic_bool
+InterruptFlag()
+set() void
+isSet() const bool
+operator bool() const
}
Diagram sources
- InterruptFlag.h
Section sources
- InterruptFlag.h
The FuncWrapper class provides a type-erased wrapper for callable objects (functions, lambdas, functors) that enables move semantics. This allows different types of tasks to be stored uniformly in the thread pool's task queues. The wrapper uses the "type erasure" pattern with a polymorphic base class and template-derived classes.
classDiagram
class FuncWrapper {
-impl std : : unique_ptr~impl_base~
+FuncWrapper()
+FuncWrapper(F&& f)
+operator()() void
+FuncWrapper(FuncWrapper&&)
+operator=(FuncWrapper&&) FuncWrapper&
+isNullTask() const bool
}
class impl_base {
<<abstract>>
+call() void
}
class impl_type~F~ {
-f F
+call() void
}
FuncWrapper --> impl_base : "has"
impl_base <|-- impl_type~F~ : "inherits"
Diagram sources
- FuncWrapper.h
Section sources
- FuncWrapper.h
The threading utilities include algorithm extensions for parallel processing, such as parallel_for_index and parallel_for_range. These functions provide high-level interfaces for parallelizing loops and range-based operations. They automatically partition the work and distribute it across the thread pool.
flowchart TD
Start([parallel_for_index]) --> Partition["Partition range into sub-ranges"]
Partition --> Submit["Submit tasks to thread pool"]
Submit --> Execute["Execute function on each sub-range"]
Execute --> Collect["Collect results from all tasks"]
Collect --> Combine["Combine results into final output"]
Combine --> End([Return result])
Diagram sources
- algorithm.h
Section sources
- algorithm.h
The threading utilities enable efficient parallel loading of market data. By distributing data loading tasks across multiple threads, the system can significantly reduce the time required to load large datasets from various sources.
sequenceDiagram
participant MainThread
participant ThreadPool
participant DataLoader1
participant DataLoader2
participant DataLoader3
MainThread->>ThreadPool : Create GlobalMQThreadPool
MainThread->>ThreadPool : submit(load_data_task1)
MainThread->>ThreadPool : submit(load_data_task2)
MainThread->>ThreadPool : submit(load_data_task3)
ThreadPool->>DataLoader1 : Execute task1
ThreadPool->>DataLoader2 : Execute task2
ThreadPool->>DataLoader3 : Execute task3
DataLoader1->>MainThread : Return data1
DataLoader2->>MainThread : Return data2
DataLoader3->>MainThread : Return data3
ThreadPool->>MainThread : join()
Section sources
- test_ThreadPool.cpp
The framework supports concurrent backtesting of multiple trading strategies. Each strategy can be executed in parallel, allowing for rapid evaluation of different parameter sets or strategy variations.
sequenceDiagram
participant StrategyManager
participant ThreadPool
participant Backtest1
participant Backtest2
participant Backtest3
StrategyManager->>ThreadPool : Create GlobalMQStealThreadPool
StrategyManager->>ThreadPool : submit(run_backtest1)
StrategyManager->>ThreadPool : submit(run_backtest2)
StrategyManager->>ThreadPool : submit(run_backtest3)
ThreadPool->>Backtest1 : Execute backtest1
ThreadPool->>Backtest2 : Execute backtest2
ThreadPool->>Backtest3 : Execute backtest3
Backtest1->>StrategyManager : Return results1
Backtest2->>StrategyManager : Return results2
Backtest3->>StrategyManager : Return results3
ThreadPool->>StrategyManager : join()
Section sources
- misc.cpp
Complex trading strategies can leverage multi-threading for real-time execution. The work-stealing thread pool ensures efficient load balancing when processing multiple securities or timeframes simultaneously.
flowchart TD
Start([Strategy Execution]) --> Distribute["Distribute securities to threads"]
Distribute --> Process["Process each security in parallel"]
Process --> Analyze["Analyze market data"]
Analyze --> Generate["Generate trading signals"]
Generate --> Execute["Execute trades"]
Execute --> Monitor["Monitor positions"]
Monitor --> Adjust["Adjust strategy parameters"]
Adjust --> End([Strategy Complete])
Section sources
- misc.cpp
The threading utilities are designed with thread safety as a primary concern. All shared data structures use appropriate synchronization mechanisms to prevent race conditions. The FuncWrapper ensures that tasks can be safely moved between threads, while the ThreadSafeQueue provides atomic operations for task submission and retrieval.
The framework employs several strategies to prevent deadlocks:
- Using non-blocking operations when possible
- Consistent lock ordering
- Timeout mechanisms for blocking operations
- Work-stealing algorithms that avoid circular dependencies
The threading infrastructure is optimized for CPU-intensive quantitative analysis tasks:
- Work-stealing algorithms maximize CPU utilization
- Multiple queue implementations reduce contention
- Type-erased task wrappers minimize overhead
- Efficient memory management with smart pointers
Section sources
- GlobalMQStealThreadPool.h
- GlobalMQThreadPool.h
The Hikyuu threading and concurrency utilities provide a robust foundation for high-performance quantitative analysis. The various thread pool implementations offer flexibility for different workload patterns, from simple parallel processing to complex recursive algorithms. The synchronization primitives ensure thread safety while minimizing overhead, and the task encapsulation mechanisms enable efficient execution of diverse workloads. These utilities are essential for handling the computational demands of modern quantitative trading strategies, enabling faster backtesting, real-time strategy execution, and efficient data processing.