虽然一般来说,线程或多或少独立于其他线程来处理任务,但在许多情况下,人们会希望在线程之间传递数据,甚至控制其他线程,例如从中央任务调度器线程。本章介绍如何使用 C++ 11 线程应用编程接口完成这些任务。
本章涵盖的主题包括:
- 使用互斥、锁和类似的同步结构
- 使用条件变量和信号来控制线程
- 在线程之间安全地传递和共享数据
并发的核心问题是确保对共享资源的安全访问,即使在线程间通信时也是如此。还有线程能够相互通信和同步的问题。
多线程编程之所以面临如此大的挑战,是因为能够跟踪线程之间的每一次交互,并确保每一种形式的访问都是安全的,同时不会陷入死锁和数据竞争的陷阱。
在本章中,我们将看一个相当复杂的涉及任务调度器的例子。这是一种高并发、高吞吐量的情况,其中许多不同的需求与许多潜在的陷阱结合在一起,我们稍后会看到。
线程间大量同步和通信的多线程的一个很好的例子是任务调度。这里,目标是接受传入的任务,并尽快将它们分配给工作线程。
在这种情况下,许多不同的方法都是可能的。通常有工作线程在活动循环中运行,不断轮询中央队列以寻找新任务。这种方法的缺点包括在所述轮询上浪费处理器周期,以及在所使用的同步机制(通常是互斥体)上形成拥塞。此外,当工作线程数量增加时,这种主动轮询方法的扩展性非常差。
理想情况下,每个工作线程都会无所事事地等待,直到再次需要它。为了实现这一点,我们必须从另一个角度来处理这个问题:不是从工作线程的角度,而是从队列的角度。很像操作系统的调度程序,它知道需要处理的任务以及可用的工作线程。
在这种方法中,中央调度器实例将接受新任务,并主动将它们分配给工作线程。所述调度器实例还可以管理这些工作线程,例如它们的数量和优先级,这取决于传入任务的数量和任务的类型或其他属性。
就其核心而言,我们的调度程序或调度程序非常简单,就像一个队列,所有的调度逻辑都内置在其中,如下图所示:
从前面的高级视图中可以看出,这并没有什么大不了的。然而,正如我们稍后将看到的,实际实现确实有许多复杂之处。
像往常一样,我们从包含在main.cpp
中的main
功能开始:
#include "dispatcher.h"
#include "request.h"
#include <iostream>
#include <string>
#include <csignal>
#include <thread>
#include <chrono>
using namespace std;
sig_atomic_t signal_caught = 0;
mutex logMutex;
我们包含的自定义头是我们的调度器实现的头,以及我们将使用的request
类。
在全局范围内,我们定义了一个与信号处理程序一起使用的原子变量,以及一个将同步我们的日志记录方法的输出(在标准输出上)的互斥体:
void sigint_handler(int sig) {
signal_caught = 1;
}
我们的信号处理函数(针对SIGINT
信号)只是设置了我们之前定义的全局原子变量:
void logFnc(string text) {
logMutex.lock();
cout << text << "\n";
logMutex.unlock();
}
在我们的日志记录函数中,我们使用全局互斥来确保对标准输出的写入是同步的:
int main() {
signal(SIGINT, &sigint_handler);
Dispatcher::init(10);
在main
功能中,我们为SIGINT
安装了信号处理程序,允许我们中断应用的执行。我们还调用Dispatcher
类上的静态init()
函数来初始化它:
cout << "Initialised.\n";
int cycles = 0;
Request* rq = 0;
while (!signal_caught && cycles < 50) {
rq = new Request();
rq->setValue(cycles);
rq->setOutput(&logFnc);
Dispatcher::addRequest(rq);
cycles++ ;
}
接下来,我们设置循环,在其中我们将创建新的请求。在每个周期中,我们创建一个新的Request
实例,并使用其setValue()
函数设置一个整数值(当前周期数)。在使用静态addRequest()
功能向Dispatcher
添加这个新请求之前,我们还在请求实例上设置了日志记录功能。
该循环将继续,直到达到最大循环数,或使用 Ctrl + C 或类似方法发出SIGINT
信号:
this_thread::sleep_for(chrono::seconds(5));
Dispatcher::stop();
cout << "Clean-up done.\n";
return 0;
}
最后,我们使用线程的sleep_for()
功能和来自chrono
STL 头的chrono::seconds()
功能等待 5 秒钟。
我们在返回之前也调用Dispatcher
上的stop()
函数。
对Dispatcher
的请求总是来自纯虚拟的AbstractRequest
类:
#pragma once
#ifndef ABSTRACT_REQUEST_H
#define ABSTRACT_REQUEST_H
class AbstractRequest {
//
public:
virtual void setValue(int value) = 0;
virtual void process() = 0;
virtual void finish() = 0;
};
#endif
这个AbstractRequest
类定义了一个具有三个函数的 API,一个派生类总是要实现这三个函数。在这些函数中,process()
和finish()
函数是最通用的,并且可能在任何实际实现中使用。setValue()
功能专用于本演示实施,可能会进行调整或扩展以适应现实生活场景。
使用抽象类作为请求基础的优势在于,它允许Dispatcher
类处理许多不同类型的请求,只要它们都遵守这个相同的基本 API。
使用这个抽象接口,我们实现了一个基本的Request
类,如下所示:
#pragma once
#ifndef REQUEST_H
#define REQUEST_H
#include "abstract_request.h"
#include <string>
using namespace std;
typedef void (*logFunction)(string text);
class Request : public AbstractRequest {
int value;
logFunction outFnc;
public: void setValue(int value) { this->value = value; }
void setOutput(logFunction fnc) { outFnc = fnc; }
void process();
void finish();
};
#endif
在其头文件中,我们首先定义函数指针的格式。之后,我们实现请求 API,并在基础 API 中添加setOutput()
函数,接受一个函数指针进行日志记录。这两个 setter 函数只是将提供的参数分配给它们各自的私有类成员。
接下来,类函数实现如下:
#include "request.h"
void Request::process() {
outFnc("Starting processing request " + std::to_string(value) + "...");
//
}
void Request::finish() {
outFnc("Finished request " + std::to_string(value));
}
这两种实现都非常基础;他们只是使用函数指针来输出一个指示工作线程状态的字符串。
在实际实现中,可以将业务逻辑添加到process()
函数中,其中finish()
函数包含完成请求的任何功能,例如将地图写入字符串。
接下来是Worker
班。这包含将由Dispatcher
调用以处理请求的逻辑。
#pragma once
#ifndef WORKER_H
#define WORKER_H
#include "abstract_request.h"
#include <condition_variable>
#include <mutex>
using namespace std;
class Worker {
condition_variable cv;
mutex mtx;
unique_lock<mutex> ulock;
AbstractRequest* request;
bool running;
bool ready;
public:
Worker() { running = true; ready = false; ulock = unique_lock<mutex>(mtx); }
void run();
void stop() { running = false; }
void setRequest(AbstractRequest* request) { this->request = request; ready = true; }
void getCondition(condition_variable* &cv);
};
#endif
虽然向Dispatcher
添加请求不需要任何特殊的逻辑,但是Worker
类确实需要使用条件变量来与调度程序同步。对于 C++ 11 线程 API,这需要一个条件变量、一个互斥体和一个唯一锁。
唯一锁封装了互斥锁,最终将与条件变量一起使用,我们稍后会看到。
除此之外,我们定义方法来启动和停止工作进程,设置新的处理请求,并获取对其内部条件变量的访问。
接下来,实现的其余部分编写如下:
#include "worker.h"
#include "dispatcher.h"
#include <chrono>
using namespace std;
void Worker::getCondition(condition_variable* &cv) {
cv = &(this)->cv;
}
void Worker::run() {
while (running) {
if (ready) {
ready = false;
request->process();
request->finish();
}
if (Dispatcher::addWorker(this)) {
// Use the ready loop to deal with spurious wake-ups.
while (!ready && running) {
if (cv.wait_for(ulock, chrono::seconds(1)) == cv_status::timeout) {
// We timed out, but we keep waiting unless
// the worker is
// stopped by the dispatcher.
}
}
}
}
}
除了条件变量的getter
函数外,我们定义了run()
函数,dispatcher
将在启动时为每个工作线程运行该函数。
它的主循环只是检查stop()
函数还没有被调用,这会将运行的布尔值设置为false
,并结束工作线程。这由Dispatcher
在关闭时使用,允许它终止工作线程。因为布尔值通常是原子的,所以设置和检查可以同时进行,而没有风险或不需要互斥。
继续,对ready
变量的检查是为了确保在线程第一次运行时请求实际上正在等待。在工作线程的第一次运行中,不会有任何请求等待,因此,试图处理一个请求会导致崩溃。在Dispatcher
设置新请求时,该布尔变量将被设置为true
。
如果请求正在等待,则ready
变量将再次设置为false
,之后请求实例将调用其process()
和finish()
函数。这将在工作线程的线程上运行请求的业务逻辑,并最终完成它。
最后,工作线程使用其静态addWorker()
函数将自己添加到调度程序中。如果没有新的请求可用,该函数将返回false
,并使工作线程等待直到有新的请求可用。否则,工作线程将继续处理Dispatcher
在其上设置的新请求。
如果被要求等待,我们进入一个新的循环。这个循环将确保当条件变量被唤醒时,这是因为我们得到了Dispatcher
( ready
变量设置为true
)的信号,而不是因为虚假的唤醒。
最后,我们使用之前创建的唯一锁实例以及超时时间来输入条件变量的实际wait()
函数。如果发生超时,我们可以终止线程,或者继续等待。在这里,我们选择什么都不做,只是重新进入等待循环。
作为最后一项,我们有Dispatcher
类本身:
#pragma once
#ifndef DISPATCHER_H
#define DISPATCHER_H
#include "abstract_request.h"
#include "worker.h"
#include <queue>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;
class Dispatcher {
static queue<AbstractRequest*> requests;
static queue<Worker*> workers;
static mutex requestsMutex;
static mutex workersMutex;
static vector<Worker*> allWorkers;
static vector<thread*> threads;
public:
static bool init(int workers);
static bool stop();
static void addRequest(AbstractRequest* request);
static bool addWorker(Worker* worker);
};
#endif
大部分看起来都很熟悉。正如你现在已经猜到的,这是一个完全静态的类。
接下来,它的实现如下:
#include "dispatcher.h"
#include <iostream>
using namespace std;
queue<AbstractRequest*> Dispatcher::requests;
queue<Worker*> Dispatcher::workers;
mutex Dispatcher::requestsMutex;
mutex Dispatcher::workersMutex;
vector<Worker*> Dispatcher::allWorkers;
vector<thread*> Dispatcher::threads;
bool Dispatcher::init(int workers) {
thread* t = 0;
Worker* w = 0;
for (int i = 0; i < workers; ++ i) {
w = new Worker;
allWorkers.push_back(w);
t = new thread(&Worker::run, w);
threads.push_back(t);
}
return true;
}
设置静态类成员后,定义init()
函数。它启动指定数量的工作线程,在各自的向量数据结构中保持对每个工作线程和线程实例的引用:
bool Dispatcher::stop() {
for (int i = 0; i < allWorkers.size(); ++ i) {
allWorkers[i]->stop();
}
cout << "Stopped workers.\n";
for (int j = 0; j < threads.size(); ++ j) {
threads[j]->join();
cout << "Joined threads.\n";
}
}
在stop()
函数中,每个工作实例都有其被调用的stop()
函数。这将导致每个工作线程终止,正如我们之前在Worker
类描述中看到的。
最后,在返回之前,我们等待每个线程加入(即完成):
void Dispatcher::addRequest(AbstractRequest* request) {
workersMutex.lock();
if (!workers.empty()) {
Worker* worker = workers.front();
worker->setRequest(request);
condition_variable* cv;
worker->getCondition(cv);
cv->notify_one();
workers.pop();
workersMutex.unlock();
}
else {
workersMutex.unlock();
requestsMutex.lock();
requests.push(request);
requestsMutex.unlock();
}
}
addRequest()
功能是事情变得有趣的地方。在这个函数中,添加了一个新的请求。接下来会发生什么取决于工作线程是否在等待新的请求。如果没有工作线程在等待(工作队列为空),请求将被添加到请求队列中。
互斥锁的使用确保了对这些队列的访问安全进行,因为工作线程将同时尝试访问这两个队列。
这里需要注意的一个重要问题是僵局的可能性。也就是说,两个线程将持有一个资源的锁,第二个线程等待第一个线程释放它的锁,然后再释放它自己的锁。在单个作用域中使用多个互斥体的每种情况都有这种可能性。
在这个函数中,死锁的可能性在于释放工作互斥体上的锁,并且当获得请求互斥体上的锁时。在这个函数持有 workers 互斥体并试图获得 requests 锁的情况下(当没有 worker 线程可用时),有可能另一个线程持有 requests 互斥体(寻找新的请求来处理),同时试图获得 workers 互斥体(没有找到请求并将其自身添加到 workers 队列中)。
这里的解决方案很简单:在获取下一个互斥体之前释放一个互斥体。在感觉必须持有多个互斥锁的情况下,检查和测试代码中潜在的死锁是最重要的。在这种特殊情况下,当不再需要工作互斥锁时,或者在获得请求互斥锁之前,会显式释放工作互斥锁,从而防止死锁。
这段代码的另一个重要方面是它向工作线程发出信号的方式。正如在 if/else 块的第一部分中可以看到的那样,当 workers 队列不为空时,从队列中取出一个 worker,对其设置请求,然后引用其条件变量并发出信号或通知。
在内部,条件变量使用我们之前在Worker
类定义中交给它的互斥体来保证对它的原子访问。当对条件变量调用notify_one()
函数(在其他 API 中一般称为signal()
)时,它会通知等待条件变量返回并继续的线程队列中的第一个线程。
在Worker
类run()
函数中,我们将等待这个通知事件。收到请求后,工作线程将继续处理新的请求。线程引用将从队列中移除,直到它处理完请求后再次添加自己:
bool Dispatcher::addWorker(Worker* worker) {
bool wait = true;
requestsMutex.lock();
if (!requests.empty()) {
AbstractRequest* request = requests.front();
worker->setRequest(request);
requests.pop();
wait = false;
requestsMutex.unlock();
}
else {
requestsMutex.unlock();
workersMutex.lock();
workers.push(worker);
workersMutex.unlock();
}
return wait;
}
使用最后一个函数,工作线程将在处理完请求后将其自身添加到队列中。它类似于前面的函数,因为传入的工作进程首先与可能在请求队列中等待的任何请求进行主动匹配。如果没有可用的工作队列,则该工作队列将被添加到工作队列中。
这里需要注意的是,我们返回一个布尔值,该值指示调用线程是否应该等待一个新的请求,或者它是否已经在尝试将自己添加到队列时收到了一个新的请求。
虽然这段代码没有前一个函数复杂,但由于在同一范围内处理两个互斥体,它仍然存在同样的潜在死锁问题。在这里,我们也首先释放我们持有的互斥体,然后再获取下一个互斥体。
这个Dispatcher
例子的 makefile 还是很基础的——它收集了当前文件夹中的所有 C++ 源文件,并使用g++
将其编译成二进制文件:
GCC := g++
OUTPUT := dispatcher_demo
SOURCES := $(wildcard *.cpp)
CCFLAGS := -std=c++ 11 -g3
all: $(OUTPUT)
$(OUTPUT):
$(GCC) -o $(OUTPUT) $(CCFLAGS) $(SOURCES)
clean:
rm $(OUTPUT)
.PHONY: all
编译应用后,运行它会为总共 50 个请求生成以下输出:
$ ./dispatcher_demo.exe
Initialised.
Starting processing request 1...
Starting processing request 2...
Finished request 1
Starting processing request 3...
Finished request 3
Starting processing request 6...
Finished request 6
Starting processing request 8...
Finished request 8
Starting processing request 9...
Finished request 9
Finished request 2
Starting processing request 11...
Finished request 11
Starting processing request 12...
Finished request 12
Starting processing request 13...
Finished request 13
Starting processing request 14...
Finished request 14
Starting processing request 7...
Starting processing request 10...
Starting processing request 15...
Finished request 7
Finished request 15
Finished request 10
Starting processing request 16...
Finished request 16
Starting processing request 17...
Starting processing request 18...
Starting processing request 0...
在这一点上,我们已经可以清楚地看到,即使每个请求几乎不需要花费时间来处理,这些请求显然是并行执行的。第一个请求(请求 0)仅在第十六个请求之后才开始处理,而第二个请求早在第九个请求之后就已经完成了。
决定首先处理哪个线程以及哪个请求的因素取决于操作系统调度程序和基于硬件的调度,如第 2 章、处理器和操作系统上的多线程实现所述。这清楚地表明,即使在单个平台上,对多线程应用的执行方式也几乎没有什么假设。
Starting processing request 5...
Finished request 5
Starting processing request 20...
Finished request 18
Finished request 20
Starting processing request 21...
Starting processing request 4...
Finished request 21
Finished request 4
在前面的代码中,第四个和第五个请求也以延迟的方式结束。
Starting processing request 23...
Starting processing request 24...
Starting processing request 22...
Finished request 24
Finished request 23
Finished request 22
Starting processing request 26...
Starting processing request 25...
Starting processing request 28...
Finished request 26
Starting processing request 27...
Finished request 28
Finished request 27
Starting processing request 29...
Starting processing request 30...
Finished request 30
Finished request 29
Finished request 17
Finished request 25
Starting processing request 19...
Finished request 0
至此,第一个请求终于完成。这可能表明与连续请求相比,第一个请求的初始化时间将总是被延迟。多次运行应用可以证实这一点。重要的是,如果处理的顺序是相关的,这种随机性不会对一个人的应用产生负面影响。
Starting processing request 33...
Starting processing request 35...
Finished request 33
Finished request 35
Starting processing request 37...
Starting processing request 38...
Finished request 37
Finished request 38
Starting processing request 39...
Starting processing request 40...
Starting processing request 36...
Starting processing request 31...
Finished request 40
Finished request 39
Starting processing request 32...
Starting processing request 41...
Finished request 32
Finished request 41
Starting processing request 42...
Finished request 31
Starting processing request 44...
Finished request 36
Finished request 42
Starting processing request 45...
Finished request 44
Starting processing request 47...
Starting processing request 48...
Finished request 48
Starting processing request 43...
Finished request 47
Finished request 43
Finished request 19
Starting processing request 34...
Finished request 34
Starting processing request 46...
Starting processing request 49...
Finished request 46
Finished request 49
Finished request 45
请求 19 也变得相当延迟,再次表明多线程应用是多么不可预测。如果我们在这里并行处理一个大数据集,每个请求中有大量数据,我们可能必须在某些时候暂停,以解决这些延迟,否则,我们的输出缓存可能会变得太大。
因为这样做会对应用的性能产生负面影响,所以人们可能不得不关注低级优化,以及特定处理器内核上的线程调度,以防止这种情况发生。
Stopped workers.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Joined threads.
Clean-up done.
开始时启动的所有 10 个工作线程在这里终止,我们称之为Dispatcher
的stop()
函数。
在本章给出的示例中,我们看到了除了同步线程之外,如何在线程之间共享信息——这表现为我们从主线程传递到调度器的请求的形式,每个请求从调度器传递到不同的线程。
线程之间共享数据背后的基本思想是,要共享的数据以两个或更多线程都可以访问的方式存在于某个地方。在此之后,我们必须确保只有一个线程可以修改数据,并且数据在被读取时不会被修改。通常,我们会使用互斥体或类似的东西来确保这一点。
读写锁在这里是一种可能的优化,因为它们允许多个线程同时从单个数据源读取。如果一个应用中有多个工作线程重复读取相同的信息,那么使用读写锁会比使用基本互斥锁更有效,因为读取数据的尝试不会阻塞其他线程。
因此,读写锁可以用作互斥锁的更高级版本,也就是说,使其行为适应访问类型。在内部,它建立在互斥体(或信号量)和条件变量上。
共享指针首先通过 Boost 库提供,并在 C++ 11 中引入,它是内存管理的抽象,对堆分配的实例使用引用计数。它们是部分线程安全的,因为可以创建多个共享指针实例,但是被引用的对象本身不是线程安全的。
然而,根据应用,这可能就足够了。为了使它们适当地线程安全,可以使用原子。我们将在第 8 章、原子操作-使用硬件中更详细地了解这一点。
在本章中,我们研究了如何以安全的方式在线程之间传递数据,作为相当复杂的调度器实现的一部分。我们还查看了所述调度程序的异步处理结果,并考虑了一些在线程间传递数据的潜在替代方案和优化。
此时,您应该能够在线程之间安全地传递数据,并同步对其他共享资源的访问。
在下一章中,我们将研究本机 C++ 线程和原语 API。