Skip to content

Latest commit

 

History

History
1019 lines (683 loc) · 36.2 KB

File metadata and controls

1019 lines (683 loc) · 36.2 KB

五、本机 C++ 线程和原语

从 2011 年 C++ 标准修订版开始,多线程 API 正式成为 C++ 标准模板库 ( STL )的一部分。这意味着任何新的 C++ 应用都可以使用线程、线程原语和同步机制,而不需要安装第三方库,也不需要依赖操作系统的 API。

本章将介绍该本机 API 中提供的多线程功能,以及 2014 标准增加的功能。将显示一些详细使用这些功能的示例。

本章的主题包括以下内容:

  • C++ 的 STL 中多线程应用编程接口所涵盖的特性
  • 每个功能使用的详细示例

STL 线程应用编程接口

第 3 章C++ 多线程 API中,我们查看了开发多线程 C++ 应用时可用的各种 API。在第 4 章线程同步和通信中,我们使用本机 C++ 线程应用编程接口实现了一个多线程调度器应用。

助推。线程应用编程接口

通过包含来自 STL 的<thread>头,我们获得了对std::thread类的访问,该类具有由其他头提供的互斥(互斥等)工具。这个应用编程接口本质上与Boost.Thread的多线程应用编程接口相同,主要区别在于对线程的更多控制(通过超时、线程组和线程中断进行连接),以及在诸如互斥体和条件变量等原语之上实现的一些额外的锁类型。

**一般来说,当 C++ 11 不支持时,或者当这些额外的Boost.Thread特性是一个人的应用的要求,并且不容易添加时,Boost.Thread应该作为一个后备。由于Boost.Thread建立在可用(本机)线程支持的基础上,与 C++ 11 STL 实现相比,它也可能增加开销。

2011 年标准

C++ 标准(通常称为 C++ 11)的 2011 年修订版增加了广泛的新功能,其中最关键的是增加了本机多线程支持,它增加了在 C++ 中创建、管理和使用线程的能力,而无需使用第三方库。

该标准将核心语言的内存模型标准化,以允许多线程共存,并支持线程本地存储等功能。最初的支持是在 C++ 03 标准中添加的,但 C++ 11 标准是第一个充分利用这一点的标准。

如前所述,实际的线程应用编程接口本身是在 STL 中实现的。C++ 11 (C++ 0x)标准的目标之一是在 STL 中拥有尽可能多的新特性,而不是作为核心语言的一部分。因此,为了使用线程、互斥体和同族,必须首先包含相关的 STL 头。

致力于新多线程应用编程接口的标准委员会每个人都有自己的目标集,因此,一些人想要的一些特性没有进入最终标准。这包括终止另一个线程或线程取消等功能,POSIX 代表强烈反对这种做法,因为取消线程可能会导致正在销毁的线程中的资源清理问题。

以下是此应用编程接口实现提供的功能:

  • std::thread
  • std::mutex
  • std::recursive_mutex
  • std::condition_variable
  • std::condition_variable_any
  • std::lock_guard
  • std::unique_lock
  • std::packaged_task
  • std::async
  • std::future

稍后,我们将查看这些特性的详细示例。首先,我们将看到 C++ 标准的下一个修订版对这个初始集增加了什么。

C++ 14

2014 标准向标准库中添加了以下功能:

  • std::shared_lock
  • std::shared_timed_mutex

这两者都在<shared_mutex> STL 标题中定义。因为锁是基于互斥的,所以共享锁依赖于共享互斥。

C++ 17

2017 标准为标准库增加了另一组功能,即:

  • std::shared_mutex
  • std::scoped_lock

这里,作用域锁是一个互斥包装器,它提供了一个 RAII 风格的机制,在作用域块期间拥有一个互斥体。

STL 组织

在 STL 中,我们可以找到以下标题组织及其提供的功能:

| 页眉 | 提供 | | <thread> | std::thread班。std::this_thread命名空间下的方法:

  • yield
  • get_id
  • sleep_for
  • sleep_until

| | <mutex> | 类:

  • mutex
  • timed_mutex
  • recursive_mutex
  • recursive_timed_mutex
  • lock_guard
  • scoped_lock (c++ 17)
  • unique_lock

函数:

  • try_lock lock
  • call_once
  • std::swap ( std::unique_lock)

| | <shared_mutex> | 类:

  • shared_mutex (c++ 17)
  • shared_timed_mutex (c++ 14)
  • shared_lock (c++ 14)

函数:

  • std::swap ( std::shared_lock)

| | <future> | 类:

  • promise
  • packaged_task
  • future
  • shared_future

功能:

  • async
  • future_category
  • std::swap ( std::promise)
  • std::swap ( std::packaged_task)

| | <condition_variable> | 类:

  • condition_variable
  • condition_variable_any

功能:

  • notify_all_at_thread_exit

|

在上表中,我们可以看到每个报头提供的功能,以及 2014 年和 2017 年标准引入的特性。在接下来的部分中,我们将详细了解每个函数和类。

线程类

thread类是整个线程 API 的核心;它包装了底层操作系统的线程,并提供了我们启动和停止线程所需的功能。

通过包含<thread>标题,可以访问该功能。

基本用途

创建线程后,它会立即启动:

#include <thread> 

void worker() { 
   // Business logic. 
} 

int main () { 
   std::thread t(worker);
   return 0; 
} 

前面这段代码将启动线程,然后立即终止应用,因为我们没有等待新线程完成执行。

为了正确地做到这一点,我们需要等待线程完成,或者如下重新加入:

#include <thread> 

void worker() { 
   // Business logic. 
} 

int main () { 
   std::thread t(worker); 
   t.join(); 
   return 0; 
} 

最后这段代码将执行,等待新线程完成,然后返回。

传递参数

还可以将参数传递给新线程。这些参数值必须是可移动构造的,这意味着它是一个具有移动或复制构造函数的类型(对于右值引用被调用)。实际上,这是所有基本类型和大多数(用户定义的)类的情况:

#include <thread> 
#include <string> 

void worker(int n, std::string t) { 
   // Business logic. 
} 

int main () { 
   std::string s = "Test"; 
   int i = 1; 
   std::thread t(worker, i, s); 
   t.join(); 
   return 0; 
} 

在前面的代码中,我们将一个整数和字符串传递给thread函数。该函数将接收两个变量的副本。当传递引用或指针时,随着生命周期问题、数据竞争等成为潜在问题,事情变得更加复杂。

返回值

传递给thread类构造函数的函数返回的任何值都会被忽略。要向创建新线程的线程返回信息,必须使用线程间同步机制(如互斥体)和某种共享变量。

移动线程

2011 年标准在<utility>表头增加了std::move。使用这个模板方法,可以在对象之间移动资源。这意味着它还可以移动线程实例:

#include <thread> 
#include <string> 
#include <utility> 

void worker(int n, string t) { 
   // Business logic. 
} 

int main () { 
   std::string s = "Test"; 
   std::thread t0(worker, 1, s); 
   std::thread t1(std::move(t0)); 
   t1.join(); 
   return 0; 
} 

在这个版本的代码中,我们先创建一个线程,然后再将其移动到另一个线程。线程 0 因此不再存在(因为它立即结束),并且thread函数的执行在我们创建的新线程中恢复。

因此,我们不必等待第一个线程重新加入,而只需等待第二个线程。

线程标识

每个线程都有一个与之关联的标识符。该标识或句柄是由 STL 实现提供的唯一标识符。可以通过调用thread类实例的get_id()函数,或者通过调用std::this_thread::get_id()获取调用该函数的线程的 ID 来获得:

#include <iostream>
 #include <thread>
 #include <chrono>
 #include <mutex>

 std::mutex display_mutex;

 void worker() {
     std::thread::id this_id = std::this_thread::get_id();

     display_mutex.lock();
     std::cout << "thread " << this_id << " sleeping...\n";
     display_mutex.unlock();

     std::this_thread::sleep_for(std::chrono::seconds(1));
 }

 int main() {
    std::thread t1(worker);
    std::thread::id t1_id = t1.get_id();

    std::thread t2(worker);
    std::thread::id t2_id = t2.get_id();

    display_mutex.lock();
    std::cout << "t1's id: " << t1_id << "\n";
    std::cout << "t2's id: " << t2_id << "\n";
    display_mutex.unlock();

    t1.join();
    t2.join();

    return 0;
 } 

这段代码将产生类似如下的输出:

t1's id: 2
t2's id: 3
thread 2 sleeping...
thread 3 sleeping...

这里,可以看到相对于初始线程(ID 1),内部线程 ID 是一个整数(std::thread::id类型)。这与大多数本机线程标识(如 POSIX 的那些)相当。这些也可以使用native_handle()获得。该函数将返回底层本机线程句柄。当希望使用 STL 实现中不可用的非常特定的 PThread 或 Win32 线程功能时,它特别有用。

睡着的

可以使用两种方法中的任何一种来延迟线程的执行(休眠)。一个是sleep_for(),它至少延迟了指定的持续时间,但可能更长:

#include <iostream> 
#include <chrono> 
#include <thread> 
        using namespace std::chrono_literals;

        typedef std::chrono::time_point<std::chrono::high_resolution_clock> timepoint; 
int main() { 
         std::cout << "Starting sleep.\n"; 

         timepoint start = std::chrono::high_resolution_clock::now(); 

         std::this_thread::sleep_for(2s); 

         timepoint end = std::chrono::high_resolution_clock::now(); 
         std::chrono::duration<double, std::milli> elapsed = end - 
         start; 
         std::cout << "Slept for: " << elapsed.count() << " ms\n"; 
} 

上面这段代码展示了如何休眠大约 2 秒钟,在当前操作系统上使用最高精度的计数器测量精确的持续时间。

请注意,我们可以直接指定秒数,使用秒后修复。这是一个 C++ 14 特性,被添加到<chrono>头。对于 C++ 11 版本,必须创建一个 std::chrono::seconds 的实例,并将其传递给sleep_for()函数。

另一种方法是sleep_until(),取类型为std::chrono::time_point<Clock, Duration>的单个参数。使用此功能,可以将线程设置为睡眠,直到达到指定的时间点。由于操作系统的调度优先级,该唤醒时间可能不是指定的准确时间。

产量

可以向操作系统指示当前线程可以被重新调度,以便其他线程可以代替运行。为此,使用std::this_thread::yield()功能。该功能的确切结果取决于底层操作系统实现及其调度程序。在先进先出调度器的情况下,调用线程很可能会被放在队列的后面。

这是一个高度专门化的功能,有特殊的用例。在没有首先验证它对应用性能的影响之前,不应该使用它。

派遣

启动线程后,可以在线程对象上调用detach()。这有效地将新线程与调用线程分离,这意味着前者将继续执行,即使在调用线程退出之后。

交换

使用swap(),无论是作为独立的方法还是作为线程实例的函数,都可以交换线程对象的底层线程句柄:

#include <iostream> 
#include <thread> 
#include <chrono> 

void worker() { 
   std::this_thread::sleep_for(std::chrono::seconds(1)); 
} 

int main() { 
         std::thread t1(worker); 
         std::thread t2(worker); 

         std::cout << "thread 1 id: " << t1.get_id() << "\n"; 
         std::cout << "thread 2 id: " << t2.get_id() << "\n"; 

         std::swap(t1, t2); 

         std::cout << "Swapping threads..." << "\n"; 

         std::cout << "thread 1 id: " << t1.get_id() << "\n"; 
         std::cout << "thread 2 id: " << t2.get_id() << "\n"; 

         t1.swap(t2); 

         std::cout << "Swapping threads..." << "\n"; 

         std::cout << "thread 1 id: " << t1.get_id() << "\n"; 
         std::cout << "thread 2 id: " << t2.get_id() << "\n"; 

         t1.join(); 
         t2.join(); 
} 

这段代码的可能输出如下所示:

thread 1 id: 2
thread 2 id: 3
Swapping threads...
thread 1 id: 3
thread 2 id: 2
Swapping threads...
thread 1 id: 2
thread 2 id: 3

这样做的效果是,每个线程的状态与另一个线程的状态交换,本质上是交换它们的身份。

互斥(体)…

<mutex>头包含多种类型的互斥体和锁。互斥类型是最常用的类型,它提供了基本的锁定/解锁功能,没有任何进一步的复杂性。

基本用途

互斥锁的核心目标是排除同时访问的可能性,以防止数据损坏,并防止由于使用非线程安全例程而导致的崩溃。

需要使用互斥体的一个例子是下面的代码:

#include <iostream> 
#include <thread> 

void worker(int i) { 
         std::cout << "Outputting this from thread number: " << i << "\n"; 
} 

int main() { 
         std::thread t1(worker, 1);
         std::thread t2(worker, 2); 

         t1.join(); 
   t2.join(); 

   return 0; 
} 

如果您尝试按原样运行前面的代码,您会注意到来自两个线程的文本输出将被混合在一起,而不是一个接一个地输出。原因是标准输出(无论是 C 还是 C++ 风格)不是线程安全的。虽然应用不会崩溃,但输出将是一片混乱。

对此的解决方法很简单,如下所示:

#include <iostream> 
#include <thread> 
#include <mutex> 

std::mutex globalMutex; 

void worker(int i) { 
   globalMutex.lock(); 
         std::cout << "Outputting this from thread number: " << i << "\n"; 
   globalMutex.unlock(); 
} 

int main() { 
         std::thread t1(worker, 1);
         std::thread t2(worker, 2); 

         t1.join(); 
   t2.join(); 

   return 0; 
} 

在这种情况下,每个线程首先需要访问mutex对象。由于只有一个线程可以访问mutex对象,另一个线程将最终等待第一个线程完成对标准输出的写入,两个字符串将按照预期一个接一个地出现。

无阻塞锁定

有可能不希望线程阻塞并等待mutex对象变得可用:例如,当一个线程只想知道一个请求是否已经被另一个线程处理,等待它完成是没有用的。

为此,互斥体附带了try_lock()函数,该函数正是这么做的。

在下面的示例中,我们可以看到两个线程试图递增同一个计数器,但是当一个线程无法立即获得对共享计数器的访问时,它就会递增自己的计数器:

#include <chrono> 
#include <mutex> 
#include <thread> 
#include <iostream> 

std::chrono::milliseconds interval(50); 

std::mutex mutex; 
int shared_counter = 0;
int exclusive_counter = 0; 

void worker0() { 
   std::this_thread::sleep_for(interval);

         while (true) { 
               if (mutex.try_lock()) { 
                     std::cout << "Shared (" << job_shared << ")\n"; 
                     mutex.unlock(); 
                     return; 
               } 
         else { 
                     ++ exclusive_counter; 
                           std::cout << "Exclusive (" << exclusive_counter << ")\n"; 
                           std::this_thread::sleep_for(interval); 
               } 
         } 
} 

void worker1() { 
   mutex.lock(); 
         std::this_thread::sleep_for(10 * interval); 
         ++ shared_counter; 
         mutex.unlock(); 
} 

int main() { 
         std::thread t1(worker0); 
         std::thread t2(worker1); 

         t1.join(); 
         t2.join(); 
}

前面这个例子中的两个线程都运行不同的worker函数,但是它们都有一个共同的事实,那就是它们会休眠一段时间,并在醒来时尝试获取共享计数器的互斥量。如果他们这样做,他们会增加计数器,但只有第一个工人会输出这个事实。

第一个工作进程在没有获得共享计数器时也进行日志记录,但只增加了它的独占计数器。结果输出可能如下所示:

Exclusive (1)
Exclusive (2)
Exclusive (3)
Shared (1)
Exclusive (4)

定时互斥

定时互斥体是一种常规的互斥体类型,但是增加了一些函数,这些函数在试图获取锁的时间段内提供一种控制,即try_lock_fortry_lock_until

前者在返回结果(真或假)之前,试图在指定的时间段内获得锁(std::chrono对象)。后者会等到未来某个特定的时间点再返回结果。

这些函数的使用主要在于在常规互斥体的阻塞(lock)和非阻塞(try_lock)方法之间提供一条中间路径。人们可能希望仅使用单个线程来等待多个任务,而不知道某个任务何时变得可用,或者某个任务可能会在某个时间点过期,此时等待它不再有意义。

锁护板

锁保护是一个简单的互斥包装器,它处理在mutex对象上获取锁,以及当锁保护超出范围时释放锁。这是一个很有帮助的机制,可以确保人们不会忘记释放互斥锁,并且当人们不得不在多个位置释放同一个互斥锁时,有助于减少代码中的混乱。

例如,重构 big if/else 块可以减少需要释放互斥锁的情况,但是只使用锁保护包装器而不用担心这些细节会容易得多:

#include <thread> 
#include <mutex> 
#include <iostream> 

int counter = 0; 
std::mutex counter_mutex; 

void worker() { 
         std::lock_guard<std::mutex> lock(counter_mutex); 
   if (counter == 1) { counter += 10; } 
   else if (counter >= 10) { counter += 15; } 
   else if (counter >= 50) { return; } 
         else { ++ counter; } 

   std::cout << std::this_thread::get_id() << ": " << counter << '\n'; 
} 

int main() { 
    std::cout << __func__ << ": " << counter << '\n'; 

    std::thread t1(worker); 
    std::thread t2(worker); 

    t1.join(); 
    t2.join(); 

    std::cout << __func__ << ": " << counter << '\n'; 
} 

在前面的例子中,我们看到我们有一个小的 if/else 块,其中一个条件导致worker函数立即返回。如果没有锁保护,我们必须确保在从函数返回之前也在这种情况下解锁了互斥锁。

然而,有了锁卫士,我们就不必担心这些细节,这使得我们可以专注于业务逻辑,而不用担心互斥管理。

唯一锁

唯一锁是通用互斥包装器。它类似于定时互斥体,但具有额外的特性,主要是所有权的概念。与其他锁类型不同,唯一锁不一定拥有它所包装的互斥体,如果它包含任何互斥体的话。使用swap()功能,可以在唯一锁实例之间转移互斥锁以及所述互斥锁的所有权。

一个唯一的锁实例是否拥有其互斥体的所有权,以及它是否被锁定,首先是在创建锁时确定的,这可以从它的构造函数中看到。例如:

std::mutex m1, m2, m3; 
std::unique_lock<std::mutex> lock1(m1, std::defer_lock); 
std::unique_lock<std::mutex> lock2(m2, std::try_lock); 
std::unique_lock<std::mutex> lock3(m3, std::adopt_lock); 

最后一段代码中的第一个构造函数不会锁定所分配的互斥体(延迟)。第二次尝试使用try_lock()锁定互斥体。最后,第三个构造函数假设它已经拥有所提供的互斥体。

除此之外,其他构造函数允许定时互斥的功能。也就是说,它将等待一段时间,直到到达某个时间点,或者直到获得锁。

最后,使用release()函数打破锁和互斥之间的关联,并返回一个指向mutex对象的指针。然后调用者负责释放互斥体上任何剩余的锁,并进一步处理它。

这种类型的锁不会经常单独使用,因为它非常通用。大多数其他类型的互斥体和锁都没有那么复杂,并且可能在 99%的情况下满足所有需求。因此,独特锁的复杂性既是好处也是风险。

然而,它通常被 C++ 11 线程应用编程接口的其他部分使用,例如条件变量,我们稍后会看到。

唯一锁可能有用的一个方面是作为作用域锁,允许使用作用域锁,而不必依赖 C++ 17 标准中的本机作用域锁。请看这个例子:

#include <mutex>
std::mutex my_mutex
int count = 0;
int function() {
         std::unique_lock<mutex> lock(my_mutex);
   count++ ;
}  

当我们进入函数时,我们用全局互斥实例创建一个新的 unique_lock。互斥体此时被锁定,之后我们可以执行任何关键操作。

当函数作用域结束时,调用 unique_lock 的析构函数,这将导致互斥锁再次解锁。

作用域锁

在 2017 年标准中首次引入,作用域锁是一个互斥体包装器,它获取对所提供互斥体的访问(锁定),并确保当作用域锁超出作用域时,它被解锁。它不同于锁保护,因为它不是一个而是多个互斥锁的包装器。

当在一个作用域中处理多个互斥体时,这可能很有用。使用作用域锁的一个原因是为了避免意外引入死锁和其他令人不快的复杂情况,例如,一个互斥体被作用域锁锁定,另一个锁仍在等待,而另一个线程实例的情况正好相反。

作用域锁的一个属性是它试图避免这种情况,理论上使这种类型的锁死锁安全。

递归互斥

递归互斥是互斥的另一个子类型。即使它具有与常规互斥体完全相同的功能,它也允许最初锁定互斥体的调用线程重复锁定同一个互斥体。通过这样做,互斥体不会对其他线程可用,直到拥有它的线程解锁互斥体的次数与锁定它的次数一样多。

使用递归互斥体的一个很好的理由是,例如当使用递归函数时。对于常规互斥体,需要发明某种入口点,在进入递归函数之前锁定互斥体。

对于递归互斥体,递归函数的每次迭代都会再次锁定递归互斥体,并且在完成一次迭代后,它会解锁互斥体。结果,互斥体将被解锁,并且解锁的次数相同。

因此,一个潜在的复杂问题是,标准中没有定义递归互斥锁可以被锁定的最大次数。当达到实现的限制时,如果试图锁定它,将抛出std::system_error,或者在使用非阻塞try_lock函数时返回 false。

递归定时互斥

递归定时互斥体,顾名思义,是定时互斥体和递归互斥体功能的融合。因此,它允许使用定时条件函数递归锁定互斥体。

尽管这增加了确保互斥锁在线程锁定的时候被解锁的次数的挑战,但是它仍然为更复杂的算法提供了可能性,比如前面提到的任务处理程序。

共享互斥体

<shared_mutex>表头是 2014 年标准首次增加的,增加了shared_timed_mutex类。随着 2017 年的标准,还增加了shared_mutex类。

共享互斥头从 C++ 17 开始就存在了。除了通常的互斥访问之外,这个mutex类还增加了对互斥体提供共享访问的能力。例如,这允许多线程提供对资源的读访问,而写线程仍然能够获得独占访问。这类似于 Pthreads 的读写锁。

添加到该互斥类型的函数如下:

  • lock_shared()
  • try_lock_shared()
  • unlock_shared()

这个互斥体的共享功能的使用应该是不言自明的。理论上,无限数量的读取器可以获得对互斥体的读取权限,同时确保在任何时候只有一个线程可以写入资源。

共享定时互斥

这个头文件从 C++ 14 开始就有了。它通过以下功能向定时互斥体添加共享锁定功能:

  • lock_shared()
  • try_lock_shared()
  • try_lock_shared_for()
  • try_lock_shared_until()
  • unlock_shared()

顾名思义,这个类本质上是共享互斥体和定时互斥体的合并。有趣的是,它是在更基本的共享互斥体之前被添加到标准中的。

条件变量

本质上,条件变量提供了一种机制,通过这种机制,一个线程的执行可以被另一个线程控制。这是通过拥有一个共享变量来实现的,一个线程将等待这个变量,直到另一个线程发出信号。这是我们在第 4 章线程同步和通信中看到的调度器实现的重要部分。

对于 C++ 11 API,条件变量及其相关功能在<condition_variable>头中定义。

条件变量的基本用法可以从第 4 章线程同步和通信中的调度程序代码中总结出来。

 #include "abstract_request.h"

 #include <condition_variable>
 #include <mutex> 

using namespace std;

 class Worker {
    condition_variable cv;
    mutex mtx;
    unique_lock<mutex> ulock;
    AbstractRequest* request;
    bool running;
    bool ready;
    public:
    Worker() { running = true; ready = false; ulock = unique_lock<mutex>(mtx); }
    void run();
    void stop() { running = false; }
    void setRequest(AbstractRequest* request) { this->request = request; ready = true; }
    void getCondition(condition_variable* &cv);
 }; 

在构造函数中,如前面的Worker类声明中所定义的,我们看到了 C++ 11 API 中条件变量的初始化方式。步骤如下:

  1. 创建一个condition_variablemutex实例。
  2. 将互斥体分配给一个新的unique_lock实例。使用我们在这里用于锁的构造函数,分配的互斥体也在分配时被锁定。
  3. 条件变量现在可以使用了:
#include <chrono>
using namespace std;
void Worker::run() {
    while (running) {
        if (ready) {
            ready = false;
            request->process();
            request->finish();
        }
        if (Dispatcher::addWorker(this)) {
            while (!ready && running) {
                if (cv.wait_for(ulock, chrono::seconds(1)) == 
                cv_status::timeout) {
                    // We timed out, but we keep waiting unless the 
                    worker is
                    // stopped by the dispatcher.
                }
            }
        }
    }
} 

这里,我们使用条件变量的wait_for()函数,传递我们之前创建的唯一锁实例和我们想要等待的时间量。我们在这里等 1 秒钟。如果我们在这个等待中超时,我们可以在一个连续的循环中重新进入等待(就像这里所做的那样),或者继续执行。

也可以使用简单的wait()功能执行阻塞等待,或者使用wait_for()等待某个时间点。

如前所述,当我们第一次查看这段代码时,这个工作代码使用ready布尔变量的原因是为了检查它是否真的是另一个线程发出了条件变量的信号,而不仅仅是一个虚假的唤醒。大多数条件变量实现(包括 C++ 11)都很容易出现这种情况,这是一个不幸的复杂情况。

由于这些随机的唤醒事件,有必要有某种方法来确保我们确实是故意醒来的。在调度程序代码中,这是通过让唤醒工作线程的线程也设置一个工作线程可以唤醒的Boolean值来实现的。

我们是否超时,或被通知,或遭受虚假唤醒可以用cv_status枚举来检查。这个枚举知道这两种可能的情况:

  • timeout
  • no_timeout

信号或通知本身非常简单:

void Dispatcher::addRequest(AbstractRequest* request) {
    workersMutex.lock();
    if (!workers.empty()) {
          Worker* worker = workers.front();
          worker->setRequest(request);
          condition_variable* cv;
          worker->getCondition(cv);
          cv->notify_one();
          workers.pop();
          workersMutex.unlock();
    }
    else {
          workersMutex.unlock();
          requestsMutex.lock();
          requests.push(request);
          requestsMutex.unlock();
    }
          } 

在前面这个来自Dispatcher类的函数中,我们试图获得一个可用的工作线程实例。如果找到,我们将获得对工作线程条件变量的引用,如下所示:

void Worker::getCondition(condition_variable* &cv) {
    cv = &(this)->cv;
 } 

在工作线程上设置新请求也会将ready变量的值更改为 true,从而允许工作线程检查是否确实允许继续。

最后,条件变量被通知任何等待它的线程现在可以继续使用notify_one()。这个特殊的函数将向先进先出队列中的第一个线程发出信号,让这个条件变量继续。这里,只有一个线程会被通知,但是如果有多个线程在等待同一个条件变量,调用notify_all()将允许先进先出队列中的所有线程继续。

条件变量任意

condition_variable_any类是condition_variable类的推广。它与后者的不同之处在于,它允许在unique_lock<mutex>之外使用其他互斥机制。唯一的要求是使用的锁满足BasicLockable要求,也就是说提供了lock()unlock()功能。

线程退出时通知所有人

std::notify_all_at_thread_exit()函数允许(分离的)线程通知其他线程它已经完全完成,并且正在销毁其范围内(线程本地)的所有对象。它的功能是在发送所提供的条件变量之前,将所提供的锁移动到内部存储中。

结果就好像锁被解锁了,条件变量上调用了notify_all()

一个基本的(非功能性的)例子如下:

#include <mutex> 
#include <thread> 
#include <condition_variable> 
using namespace std; 

mutex m; 
condition_variable cv;
bool ready = false; 
ThreadLocal result;

void worker() { 
   unique_lock<mutex> ulock(m); 
   result = thread_local_method(); 
         ready = true; 
         std::notify_all_at_thread_exit(cv, std::move(ulock)); 
} 

int main() { 
         thread t(worker); 
         t.detach(); 

         // Do work here. 

         unique_lock<std::mutex> ulock(m); 
         while(!ready) { 
               cv.wait(ulock); 
         } 

         // Process result 
} 

这里,工作线程执行一个创建线程本地对象的方法。因此,主线程必须先等待分离的工作线程完成。如果后者在主线程完成任务时还没有完成,它将使用全局条件变量进入等待状态。在工作线程中,设置ready布尔后调用std::notify_all_at_thread_exit()

这实现了双重目的。调用函数后,不允许更多线程等待条件变量。它还允许主线程等待分离的工作线程的结果变得可用。

将来的

C++ 11 线程支持 API 的最后一部分在<future>中定义。它提供了一系列的类,这些类实现了更高级别的多线程概念,这些概念更倾向于简单的异步处理,而不是多线程体系结构的实现。

这里我们必须区分两个概念:未来和承诺。前者是读者/消费者将使用的最终结果(未来产品)。后者是编剧/制片人使用的。

未来的一个基本例子是:

#include <iostream>
#include <future>
#include <chrono>

bool is_prime (int x) {
  for (int i = 2; i < x; ++ i) if (x%i==0) return false;
  return true;
}

int main () {
  std::future<bool> fut = std::async (is_prime, 444444443);
  std::cout << "Checking, please wait";
  std::chrono::milliseconds span(100);
  while (fut.wait_for(span) == std::future_status::timeout) {               std::cout << '.' << std::flush;
   }

  bool x = fut.get();
  std::cout << "\n444444443 " << (x?"is":"is not") << " prime.\n";
  return 0;
}

这段代码异步调用一个函数,给它传递一个参数(潜在的质数)。然后,它进入一个活动循环,同时等待从异步函数调用接收到的未来完成。它在其等待功能上设置 100 ms 超时。

一旦未来结束(等待函数没有返回超时值),我们就获得了结果值,在这种情况下告诉我们,我们为函数提供的值实际上是一个质数。

在本章的异步部分,我们将更多地了解异步函数调用。

承诺

一个promise允许一个人在线程之间转移状态。例如:

#include <iostream> 
#include <functional>
#include <thread> 
#include <future> 

void print_int (std::future<int>& fut) {
  int x = fut.get();
  std::cout << "value: " << x << '\n';
}

int main () {
  std::promise<int> prom;
  std::future<int> fut = prom.get_future();
  std::thread th1 (print_int, std::ref(fut));
  prom.set_value (10);                            
  th1.join();
  return 0;

前面的代码使用传递给工作线程的promise实例将一个值传递给另一个线程,在本例中是一个整数。新线程等待着我们从承诺中创造的未来,以及它从主线程中收到的未来来完成。

当我们设定承诺的价值时,承诺就完成了。这就完成了未来并完成了工作线程。

在这个特殊的例子中,我们对future对象使用阻塞等待,但是也可以使用wait_for()wait_until(),分别等待一个时间段或一个时间点,就像我们在前面的例子中看到的未来一样。

共享未来

一个shared_future就像一个普通的future对象,但是可以复制,允许多个线程读取它的结果。

创建shared_future类似于常规的future.

std::promise<void> promise1; 
std::shared_future<void> sFuture(promise1.get_future()); 

最大的区别是正则future传递给了它的构造函数。

之后,所有可以访问future对象的线程都可以等待它,并获取它的值。这也可以用于以类似于条件变量的方式向线程发送信号。

打包的任务

packaged_task是任何可调用目标(函数、绑定、lambda 或其他函数对象)的包装器。它允许异步执行,结果在future对象中可用。它类似于std::function,但会自动将其结果传输到一个future对象。

例如:

#include <iostream> 
#include <future> 
#include <chrono>
#include <thread>

using namespace std; 

int countdown (int from, int to) { 
   for (int i = from; i != to; --i) { 
         cout << i << '\n'; 
         this_thread::sleep_for(chrono::seconds(1)); 
   } 

   cout << "Finished countdown.\n"; 
   return from - to; 
} 

int main () { 
   packaged_task<int(int, int)> task(countdown);
   future<int> result = task.get_future();
   thread t (std::move(task), 10, 0);

   //  Other logic. 

   int value = result.get(); 

   cout << "The countdown lasted for " << value << " seconds.\n"; 

   t.join(); 
   return 0; 
} 

上面这段代码实现了一个简单的倒计时功能,从 10 数到 0。创建任务并获得对其future对象的引用后,我们将其与worker函数的参数一起推送到线程上。

倒计时工作线程的结果一结束就变得可用。我们可以像使用promise一样使用future对象的等待功能。

异步ˌ非同步(asynchronous)

promisepackaged_task更直接的版本可以在std::async()找到。这是一个简单的函数,它接受一个可调用的对象(函数、bind、lambda 等)以及它的任何参数,并返回一个future对象。

以下是async()功能的基本示例:

#include <iostream>
#include <future>

using namespace std; 

bool is_prime (int x) { 
   cout << "Calculating prime...\n"; 
   for (int i = 2; i < x; ++ i) { 
         if (x % i == 0) { 
               return false; 
         } 
   } 

   return true; 
} 

int main () { 
   future<bool> pFuture = std::async (is_prime, 343321); 

   cout << "Checking whether 343321 is a prime number.\n"; 

   // Wait for future object to be ready. 

   bool result = pFuture.get(); 
   if (result) {
         cout << "Prime found.\n"; 
   } 
   else { 
         cout << "No prime found.\n"; 
   } 

   return 0; 
} 

前面代码中的worker函数确定提供的整数是否为素数。正如我们所看到的,结果代码比使用packaged_taskpromise简单得多。

启动策略

除了基本版本的std::async(),之外,还有第二个版本,允许用户指定启动策略作为其第一个参数。这是类型为std::launch的位掩码值,可能有以下值:

* launch::async 
* launch::deferred 

async标志意味着立即为worker函数创建一个新的线程和执行上下文。deferred标志表示推迟到wait()get()被调用到future对象上。指定这两个标志会导致函数根据当前系统情况自动选择方法。

std::async()版本没有明确指定位掩码值,默认为后者,自动方法。

原子学

对于多线程,原子的使用也非常重要。为此,C++ 11 STL 提供了一个<atomic>头。本主题在 C 第 8 章原子操作-使用硬件中有广泛的介绍。

摘要

在本章中,我们探讨了 C++ 11 API 中的全部多线程支持,以及 C++ 14 和 C++ 17 中添加的特性。

我们看到了如何使用描述和示例代码来使用每个特性。我们现在可以使用本机 C++ 多线程应用编程接口来实现多线程、线程安全的代码,以及使用异步执行功能来加速和并行执行功能。

在下一章中,我们将了解多线程代码实现中不可避免的下一步:调试和验证生成的应用。**