Skip to content

Latest commit

 

History

History
673 lines (486 loc) · 46.8 KB

File metadata and controls

673 lines (486 loc) · 46.8 KB

四、C++ 中的异步和无锁编程

在前一章中,我们研究了现代 C++ 引入的线程库以及创建、管理和同步线程的各种方法。用线程编写代码的方式相当低级,容易出现与并发代码相关的潜在错误(死锁、实时锁等)。尽管现代 C++ 语言没有被许多程序员注意到,但它提供了一个标准的内存模型,有助于更好地编写并发代码。作为一种从基础开始的并发编程语言,一种语言必须向开发人员提供关于内存访问和运行时执行顺序的某些保证。如果我们使用诸如互斥体、条件变量和未来这样的结构来表示事件,就不需要知道内存模型。但是对内存模型及其保证的了解将帮助我们使用无锁编程技术编写更快的并发代码。可以使用称为原子操作的东西来模拟锁,我们将深入研究这种技术。

正如我们在第 2 章现代 C++ 及其关键习惯用法之旅中所讨论的,零成本抽象仍然是 C++ 编程语言最基本的原则之一。C++ 一直是系统程序员的语言,标准委员会设法在该语言支持的高级抽象机制和访问低级资源编写系统程序的能力之间取得了良好的平衡。C++ 公开了原子类型和一组关联的操作,以对程序的执行进行细粒度控制。标准委员会已经公布了内存模型的详细语义,该语言有一套库来帮助程序员利用它们。

在前一章中,我们学习了如何使用条件变量同步不同线程中的操作。本章讨论标准库提供的使用期货执行基于任务的并行的工具。在本章中,我们将介绍:

  • C++ 中基于任务的并行性
  • C++ 内存模型
  • 原子类型和原子操作
  • 同步操作和内存排序
  • 如何编写无锁数据结构

C++ 中基于任务的并行性

任务是一种可能与其他计算同时执行的计算。线程是任务的系统级表示。在前一章中,我们学习了如何通过构造一个std::thread对象来同时执行一个任务和其他任务,该对象以任务作为构造函数的参数。任务可以是任何可调用的对象,如函数、Lambda 或函子。但是这种同时使用std::thread执行函数的方法被称为基于线程的方法。并发执行的首选是基于任务的方法,这将在本章中讨论。与基于线程的方法相比,基于任务的方法的优势是在任务的(较高)概念级别上操作,而不是直接在线程和锁的较低级别上操作。基于任务的并行是通过以下标准库功能实现的:

  • 未来和承诺从与单独线程相关联的任务中返回值
  • packaged_task帮助启动任务并提供返回结果的机制
  • async()用于启动类似于函数调用的任务

未来和承诺

C++ 任务通常表现得像某种数据通道。发送端,通常称为承诺,将数据发送到接收端,通常称为未来。关于未来和承诺的重要概念是,它们能够在两个任务之间传递价值,而无需明确使用锁。值的传递由系统(运行时)本身处理。未来承诺背后的基本概念很简单;当一个任务想要将一个值传递给另一个任务时,它会将该值放入承诺中。

标准库确保与这个承诺相关的未来得到这个值。另一个任务可以从这个未来的读取这个值(下图必须从右向左读取):

如果一个调用线程需要等待一个特定的一次性事件,未来就派上用场了。表示此事件的未来使其自身对调用线程可用,并且一旦未来就绪(当值被设置为相应的承诺时),调用线程就可以访问该值。在执行过程中,未来可能会有与之相关的数据,也可能没有。一旦事件发生,数据将在未来可用,并且无法重置。

与基于任务的并行性相关联的模板类在库标题<future>中声明。标准库中有两种期货可供选择:唯一期货(std::future<>)和共享期货(std::shared_future<>)。您可以将这些分别与智能指针std::unique_ptr<>std::shared_ptr<> *、*关联起来。std::future实例是指关联事件的唯一实例。相反,std::shared_future的多个实例可能指向同一个事件。在shared_future的情况下,与公共事件相关联的所有实例将同时就绪,并且它们可以访问与该事件相关联的数据。模板参数为关联数据,没有关联数据的应使用std::future<void>std::shared_future<void>模板规范。即使线程之间的数据通信由 futures 内部管理,但 future 对象本身并不提供同步访问。如果多个线程需要访问单个std::future对象,则必须使用互斥或其他同步机制来保护它们。

std::futurestd::promise成对工作,分离任务调用并等待结果。对于一个std::future<T>对象f,我们可以使用std::future类函数get()访问与之关联的值T。同样的对于一个std::promise<T>,有两个 put 操作功能可用(set_value()set_exception())来匹配未来的get()。对于承诺对象,可以使用set_value()给它赋值,也可以使用set_exception()给它传递一个异常。例如,下面的伪代码可以帮助您了解如何在承诺中设置值(在func1中)以及如何在调用future<T>:: get()的函数中消耗东西(在func2中):

// promise associated with the task launched 
void func1(std::promise<T>& pr) 
{ 
    try 
    { 
        T val; 
        process_data(val); 
        pr.set_value(val); // Can be retrieved by future<T>::get() 
    } 
    catch(...) 
    { 
        // Can be retrieved by future<T>::get() 
        // At the future level, when we call get(), the  
        // get will propagate the exception  
        pr.set_exception(std::current_exception()); 
    } 
} 

在前一种情况下,T类型的在处理并获得结果后被设置为承诺 pr 。如果在执行过程中发生任何异常,该异常也被设置为 promise。现在,让我们看看如何访问您设置的值:

// future corresponding to task already launched 
void func2(std::future<T>& ft) 
{ 
    try 
    { 
        // An exception will be thrown here, if the corresponding  
        // promise had set an exception ..otherwise, retrieve the  
        // value sets by the promise.  
        T result = ft.get() 
    } 
    catch(...)
    { 
        // Handle exception  
    } 
} 

这里,使用作为参数传递的将来值来访问相应承诺中设置的值。与std::future()关联的get()函数检索任务执行期间存储的值。对get()的调用必须准备好捕捉通过未来传输的异常并处理它。在解释完std::packaged_task之后,我们将展示一个完整的例子,期货和承诺在行动中协同工作。

标准::打包任务

现在,让我们讨论如何在需要结果的代码中获得与未来相关的返回值。std::packaged_task是一个模板类,可以在标准库中使用,在期货和承诺的帮助下实现基于任务的并行。通过在线程中设置未来和承诺,它简化了任务的设置,而没有任何共享结果的显式锁。一个packaged_task实例为std::thread提供了一个包装器,将返回值或异常捕获到一个承诺中。std::packaged_task中的会员功能get_future()会给你对应承诺关联的未来实例。让我们看一个例子,它使用一个打包的任务来寻找一个向量中所有元素的总和(promise 的工作深入到packaged_task的实现中):

// Function to calculate the sum of elements in an integer vector 
int calc_sum(std::vector<int> v) 
{ 
    int sum = std::accumulate(v.begin(), v.end(), 0); 
    return sum; 
} 

int main() 
{ 
    // Creating a packaged_task encapsulates a function 
    std::packaged_task<int(std::vector<int>)> task(calc_sum); 

    // Fetch associated future from packaged_task 
    std::future<int> result = task.get_future(); 

    std::vector<int> nums{1,2,3,4,5,6,7,8,9,10}; 

    // Pass packaged_task to thread to run asynchronously 
    std::thread t(std::move(task), std::move(nums)); 

    t.join();
    // Fetch the result of packaged_task, the value returned by calc_sum() 
    int sum = result.get(); 

    std::cout << "Sum = " << sum << std::endl; 
    return 0; 
}

packaged_task对象以任务的类型作为其模板参数,以函数指针(calc_sum)作为构造函数参数。未来实例通过调用任务对象的get_future()函数获得。由于无法复制packaged_task实例,因此使用了显式的std::move()。这是因为它是一个资源句柄,负责它的任务可能拥有的任何资源。然后,对get()函数的调用从任务中提取结果并打印出来。

现在,让我们看看packaged_task如何与 Lambdas 一起使用:

    std::packaged_task<int(std::vector<int>)> task([](std::vector<int> 
    v) { 
        return std::accumulate(v.begin(), v.end(), 0); 
    }); 

这里,不是函数指针,而是将一个 Lambda 传递到packaged_task的构造函数中。正如您在前面几章中已经看到的,对于一小段并发运行的代码,Lambdas 会派上用场。期货背后的主要概念是能够获得结果,而不必担心管理沟通的机制。此外,这两个操作在两个不同的线程中运行,因此是并行的。

标准::异步

现代 C++ 提供了一种执行任务的机制,比如一个可能并行执行也可能不并行执行的函数。这里指的是std::async,内部管理线程细节。std::async以一个可调用对象为参数,返回一个std::future,用于存储已启动任务的结果或异常。让我们重写前面的示例,使用std::async计算向量中所有元素的总和:

// Function to calculate the sum of elements in a vector 
int calc_sum(std::vector<int> v) 
{ 
   int sum = std::accumulate(v.begin(), v.end(), 0); 
   return sum; 
} 

int main() 
{ 
   std::vector<int> nums{1,2,3,4,5,6,7,8,9,10}; 

   // task launch using std::async 
   std::future<int> result(std::async(std::launch::async, calc_sum,    std::move(nums))); 

   // Fetch the result of async, the value returned by calc_sum() 
   int sum = result.get(); 

   std::cout << "Sum = " << sum << std::endl; 
   return 0; 
} 

首先,当使用std::async进行基于任务的并行时,任务的启动和从任务中获取结果遵循简单的语法,并且与任务执行很好地分离。在前面的代码中,std::async接受三个参数:

  • async标志决定了async任务和std::launch::async的启动策略,意味着async在新的执行线程上执行任务。std::launch::deferred标志不会产生新的线程,但是会执行懒惰评估。如果两个标志都设置为std::launch::asyncstd::launch::deferred,则由实现决定是执行异步执行还是延迟评估。如果明确没有将任何启动策略传递到std::async中,那么选择执行方式还是要看实现。
  • std::async的第二个参数是一个可调用对象,它可以是函数指针、函数对象或 Lambda。在本例中,calc_sum函数是在单独线程中执行的任务。
  • 第三个参数是任务的输入参数。通常,这是一个变量参数,它可以传递任务可调用对象所需的参数数量。

现在,让我们看看async和 Lambda 是如何在同一个例子中走到一起的:

// Fetch associated future from async
std::future<int> result( async([](std::vector<int> v) {
return std::accumulate(v.begin(), v.end(), 0); 
}, std::move(nums))); 

在这个例子中,可调用对象参数内部有一个 Lambda 函数,它返回std::accumulate()的结果。像往常一样,简单的操作和 Lambda 美化了代码的整体外观,提高了可读性。

使用async,不用考虑线程和锁。但是,只要考虑异步执行计算的任务,您不知道将使用多少线程,因为这取决于内部实现,根据调用时可用的系统资源来决定。在决定使用多少线程之前,它会检查可用的空闲内核(处理器)。这指出了async的明显局限性,因为它需要用于共享需要锁的资源的任务。

C++ 内存模型

经典的 C++ 本质上是一种单线程语言。即使人们用 C++ 编写多线程程序,他们也在使用各自的平台线程工具来编写它们。现代 C++ 可以被认为是一种并发编程语言。语言标准在标准库的帮助下提供了标准的线程和任务机制(正如我们已经看到的)。因为它是标准库的一部分,所以语言规范已经定义了事情应该如何以精确的方式在平台上运行。对于线程、任务等,拥有一致的平台无关行为是一个巨大的挑战,标准委员会处理得非常好。该委员会设计并指定了一个标准的内存模型,用于在程序运行时实现一致的行为。记忆模型由两个方面组成:

  • 结构方面,涉及数据在内存中的布局方式
  • 并发方面,处理内存的并发访问

对于一个 C++ 程序,所有的数据都是由对象组成的。该语言将一个对象定义为一个存储区域,该区域用其类型和寿命来定义。对象可以是基本类型(如 int 或 double)的实例,也可以是用户定义类型的实例。有些对象可能有子对象,但有些没有。关键是每个变量都是一个对象,包括其他对象的成员对象,每个对象都至少占用一些内存位置。现在,让我们看看这与并发性有什么关系。

内存访问和并发性

对于多线程应用,所有东西都挂在这些内存位置上。如果多个线程访问不同的内存位置,一切正常。但是如果两个线程访问同一个内存位置,那么你必须非常小心。正如您在第 3 章c++ 中的语言级并发和并行中所看到的,多个线程试图从同一个内存位置读取不会带来任何麻烦,但是只要有任何线程试图修改公共内存位置中的数据,就会出现竞争条件的机会。

有问题的竞争条件只能通过在多个线程中的访问之间强制排序来避免。正如在第 3 章c++ 语言级并发和并行中所讨论的,使用互斥锁的基于锁的内存访问是一个流行的选择。另一种方法是通过强制执行两个线程中的访问顺序来利用原子操作的同步属性。在本章的后面部分,您将看到使用原子操作来强制排序。

Atomic operation appears to the rest of the system and occurs at once without being interrupted (no task switch happens during atomic operation) in concurrent programming. Atomicity is a guarantee of isolation from interrupts, signals, concurrent processes, and threads. More can be read on this topic at the Wikipedia article at https://en.wikipedia.org/wiki/Linearizability.

如果不同线程对单个内存位置的多次访问之间没有强制排序,则一次或两次访问都不是原子的。如果涉及写操作,则可能导致数据竞争,并可能导致未定义的行为。数据竞赛是一个严重的错误,必须不惜一切代价避免它。未定义的行为可以通过原子操作来避免,但这并不能防止竞争情况。原子操作确保当操作进行时线程切换永远不会发生。这是防止交叉存取内存的保证。原子操作保证了交叉存取存储器的排除(串行排序),但不能防止竞争条件(因为有可能覆盖更新)。

修改合同

当程序或进程正在执行时,系统中的所有线程应该就修改顺序(对于内存)达成一致。每个程序都是在一个环境中执行的,这个环境涉及到指令流、内存、寄存器、堆、栈、缓存、虚拟内存等等。这个修改命令是程序员和系统之间的契约,由内存模型定义。该系统包括将程序变形为可执行代码的编译器(和链接器)、执行流中指定的指令集的处理器、高速缓存和程序的相关状态。合同要求要求程序员遵守某些规则,这使得系统能够生成完全优化的程序。程序员在编写代码访问内存时必须遵守的这一组规则(或试探法)是在标准库中引入的原子类型和原子操作的帮助下实现的。

这些操作不仅是原子的,而且它们对程序的执行产生同步和顺序约束。与在第 3 章中讨论的更高级别的基于锁的同步原语(互斥体和条件变量)相比,C++ 中的语言级并发和并行可以根据您的需求定制同步和顺序约束。C++ 内存模型的重要之处在于:尽管该语言采用了许多现代编程习惯和语言特性,但 C++ 作为系统程序员的语言,对您的内存资源给予了更多的低级控制,以根据您的需要优化代码。

C++ 中的原子操作和类型

通常,非原子操作可能被其他线程视为完成了一半。如第 3 章c++ 语言级并发和并行中所讨论的,在这种情况下,与共享数据结构相关的不变性将被打破。当对共享数据结构的修改需要修改多个值时,就会发生这种情况。最好的例子是二叉树的一个部分移除的节点。如果另一个线程试图同时从这个数据结构中读取,不变量将被破坏,并可能导致未定义的行为。

使用原子操作,您不能从系统中的任何线程观察到一个半完成的操作,因为原子操作是不可分割的。如果与对象相关联的任何操作(如读取)都是原子的,那么对对象的所有修改也是原子的。C++ 提供了原子类型,因此您可以根据需要使用原子性。

原子类型

标准库定义的所有原子类型都可以在<atomic>头库找到。系统保证这些类型的原子性以及与这些类型相关的所有操作。有些操作可能不是原子的,但在这种情况下,系统会产生原子性的错觉。标准的原子类型使用成员函数is_lock_free(),允许用户确定对给定类型的操作是直接使用原子指令完成的(is_lock_free()返回true)还是由编译器和库使用内部锁完成的(is_lock_free()返回false)。

std::atomic_flag在所有原子类型中是不同的。根据标准,这种类型的操作需要是原子的。因此,这不提供is_lock_free()成员功能。这是一个非常简单的类型,允许的操作最少,例如test_and_set()(可以查询或设置)或clear()(清除值)。

根据std::atomic<>类模板的规范,其余原子类型遵循类似的签名。与std::atomic_flag相比,这些类型功能更全面,但并非所有操作都是原子的。运营的原子性也高度依赖于平台。在流行的平台上,内置类型的原子变体确实是无锁的,但这并不能保证在所有地方都是如此。

不使用std::atomic<>模板类,可以使用实现提供的直接类型,如下表所示:

| 原子型 | 对应专精 | | atomic_bool | std::atomic<bool> | | atomic_char | std::atomic<char> | | atomic_schar | std::atomic<signed char> | | atomic_uchar | std::atomic<unsigned char> | | atomic_int | std::atomic<int> | | atomic_uint | std::atomic<unsigned> | | atomic_short | std::atomic<short> | | atomic_ushort | std::atomic<unsigned short> | | atomic_long | std::atomic<long> | | atomic_ulong | std::atomic<unsigned long> | | atomic_llong | std::atomic<long long> | | atomic_ullong | std::atomic<unsigned long long> | | atomic_char16_t | std::atomic<char16_t> | | atomic_char32_t | std::atomic<char32_t> | | atomic_wchar_t | std::atomic<wchar_t> |

除了所有这些基本的原子类型,C++ 标准库还为原子类型提供了一组typedefs,与标准库中可用的typedefs相比,如std::size_t。有一个简单的模式来识别相应的原子版本typedefs:对于任何标准typedef T,使用atomic_ prefix : atomic_T。下表列出了标准原子typedefs及其相应的内置typedefs:

| 原子T0】 | 标准库 typedef | | atomic_size_t | size_t | | atomic_intptr_t | intptr_t | | atomic_uintptr_t | uintptr_t | | atomic_ptrdiff_t | ptrdiff_t | | atomic_intmax_t | intmax_t | | atomic_uintmax_t | uintmax_t | | atomic_int_least8_t | int_least8_t | | atomic_uint_least8_t | uint_least8_t | | atomic_int_least16_t | int_least16_t | | atomic_uint_least16_t | uint_least16_t | | atomic_int_least32_t | int_least32_t | | atomic_uint_least32_t | uint_least32_t | | atomic_int_least64_t | int_least64_t | | atomic_uint_least64_t | uint_least64_t | | atomic_int_fast8_t | int_fast8_t | | atomic_uint_fast8_t | uint_fast8_t | | atomic_int_fast16_t | int_fast16_t | | atomic_uint_fast16_t | uint_fast16_t | | atomic_int_fast32_t | int_fast32_t | | atomic_uint_fast32_t | uint_fast32_t | | atomic_int_fast64_t | int_fast64_t | | atomic_uint_fast64_t | uint_fast64_t |

std::atomic<>类模板不仅仅是一组专门化;它们有一个要扩展的主模板和一个用户定义类型的原子变体。作为通用模板类,支持的操作仅限于load()store()exchange()compare_exchange_weak()compare_exchange_strong()。原子类型上的每个操作都有一个可选的参数来指定所需的内存排序语义。内存排序的概念将在本章的后面部分详细介绍。现在,请记住,所有原子操作都可以分为三类:

  • **店铺运营:**这些运营可以有memory_order_relaxedmemory_order_release或者memory_order_seq_cst下单
  • **装载操作:**这些可以有memory_order_relaxedmemory_order_consumememory_order_acquirememory_order_seq_cst命令
  • **读-修改-写操作:**这些操作可以有memory_order_relaxedmemory_order_consumememory_order_acquirememory_order_releasememory_order_acq_relmemory_order_seq_cst顺序

所有原子操作的默认内存排序是memory_order_seq_cst

与传统的标准 C++ 类型相比,标准原子类型不是可复制的可分配的。这意味着它们没有复制构造函数或复制赋值运算符。除了直接成员函数之外,它们还支持从和到相应内置类型的隐式转换。对原子类型的所有操作都被定义为原子的,赋值和复制构造涉及两个对象。涉及两个不同对象的操作不能是原子的。在这两种操作中,值必须从一个对象读取,然后写入另一个对象。因此,这些操作不能被认为是原子的。

现在,让我们看看您实际上可以对每个标准原子类型执行的操作,从std::atomic_flag开始。

标准::原子标志

std::atomic_flag代表一个布尔标志,是标准库中所有原子类型中最简单的。这是每个平台上所有操作都要求无锁的唯一类型。这种类型是非常基本的,因此它仅用作构建模块。

必须始终用ATOMIC_FLAG_INIT初始化一个std::atomic_flag对象,以将状态设置为清除:

std::atomic_flag flg = ATOMIC_FLAG_INIT;

这是唯一需要这种初始化的原子类型,不管其声明的范围如何。一旦初始化,这种类型只允许三种操作:销毁它、清除它或设置对前一个值的查询。这些分别对应析构函数、clear()成员函数和test_and_set()成员函数。clear()是一个商店操作,而test_and_set()是一个读-修改-写操作,如前一节所述:

flg.clear()
bool val = flg.test_and_set(std::memory_order_relaxed);

在前面的代码片段中,clear()函数调用请求用默认的内存顺序清除标志,即std:: memory_order_seq_cst,而对test_and set()的调用使用了宽松的语义(在宽松的顺序中有更多关于这一点的内容),明确用于设置标志和检索旧值。

std::atomic_flag的原始实现使它成为自旋锁互斥的理想选择。让我们看一个自旋锁的例子:

class spin_lock
{
    std::atomic_flag flg;
    public:
    spin_lock() : flg(ATOMIC_FLAG_INIT){}
    void lock() {
        // simulates a lock here... and spin
        while (flg.test_and_set(std::memory_order_acquire));
        //----- Do some action here
        //----- Often , the code to be guarded will be sequenced as
        // sp.lock() ...... Action_to_Guard() .....sp.unlock()
    }
    void unlock() {
        //------ End of Section to be guarded
        flg.clear(std::memory_order_release); // release lock
    }
};

在前面的代码片段中,实例变量flg(属于std::atomic_flag类型)最初被清除。在锁定方法中,它试图通过测试flg来设置标志,以查看该值是否被清除。

如果该值被清除,该值将被设置,我们将退出循环。只有通过unlock()方法清除标志时,标志中的值才会复位。换句话说,该实现通过在lock()中的繁忙等待来实现互斥。

由于其局限性,std::atomic_ flag不能作为布尔原子类型,不支持任何非修改查询操作。所以,让我们看看std::atomic<bool>来补偿原子布尔标志的需求。

标准::原子

std::atomic<bool>std::atomic_flag相比是全功能原子布尔类型。但是对于这种类型,复制构造和赋值都是不可能的。std::atomic<bool>对象的值最初可以是truefalse。这种类型的对象可以从非原子bool构造或赋值:

std::atomic<bool> flg(true);
flg = false;

关于原子类型的赋值运算符,有一点需要注意,那就是运算符返回非原子类型的值,而不是返回引用的常规方案。如果返回的是一个引用而不是一个值,那么就会产生这样一种情况:赋值的结果得到了另一个线程修改的结果,也就是说,如果它依赖于赋值运算符的结果。在将赋值运算符的结果作为非原子值返回时,可以避免这种额外的加载,并且您可以推断获得的值是实际存储的值。

现在,让我们继续进行std::atomic<bool>支持的操作。首先,在std::atomic<bool>中可用的store()成员功能用于写操作(或者true或者false,它取代了std::atomic_flag的相应限制性clear()功能。此外,store()功能是原子存储操作。类似地,test_and_set()函数已被更通用的exchange()成员函数有效替代,该函数允许您用选定的新值替换存储的值并检索原始值。这是原子读-修改-写操作。然后,std::atomic<bool>通过对load()的显式调用支持一个简单的值的非修改查询,这是一个原子加载操作:

std::atomic<bool> flg;
flg.store(true);
bool val = flg.load(std::memory_order_acquire);
val = flg.exchange(false, std::memory_order_acq_rel);

除了exchange()之外,std::atomic<bool>引入了执行读-修改-写操作的操作,该操作执行流行的原子比较-交换 ( CAS )指令。如果当前值等于预期值,此操作将存储一个新值。这称为比较/交换操作。标准库原子类型中有两种操作实现:compare_exchange_weak()compare_exchange_strong()。此操作将原子变量的值与提供的期望值进行比较,如果它们相等,则存储提供的值。如果这些值不相等,预期值将更新为原子变量的实际值。比较/交换功能的返回类型为 bool ,如果执行了存储,则为true;否则,就是false

对于compare_exchange_weak(),即使期望值和原值相等,店铺也可能不成功。在这种情况下,价值交换不会发生,功能将返回false。这通常发生在缺少单个比较和交换指令的平台上,这意味着处理器不能保证操作会自动执行。在这样的机器中,执行操作的线程可能在执行与操作相关联的指令序列的中途被切换出来,并且另一个线程将被操作系统调度在它的位置上,在给定的条件下,运行的线程多于可用处理器的数量。这种情况被称为乱真故障

由于compare_exchange_weak()会导致虚假故障,因此应在循环中使用:

bool expected = false;
atomic<bool> flg;
...
while(!flg.compare_exchange_weak(expected, true));

在前面的代码中,只要预期是false,循环就继续迭代,这表示compare_exchange_weak()调用发生了虚假故障。相反,如果实际值不等于期望值,compare_exchange_strong()保证返回false。这可以避免像前面的情况那样需要循环,在前面的情况中,您想要知道与正在运行的线程相关的变量的状态。

比较/交换函数可以采用两个内存排序参数,以便在成功和失败的情况下允许内存排序语义不同。这些内存排序语义只对存储操作有效,不能用于故障情况,因为存储操作不会发生:

bool expected;
std::atomic<bool> flg;
b.compare_exchange_weak(expected, true, std::memory_order_acq_rel, std::memory_order_acquire);
b.compare_exchange_weak(expected, true, std::memory_order_release);

如果您不指定任何内存排序语义,成功和失败的情况都将采用默认的memory_order_seq_cst。如果您没有为失败指定任何顺序,那么它被假定为与成功相同,除了顺序的发布部分被省略。memory_order_acq_rel变成memory_order_acquire``memory_order_release变成memory_order_relaxed

内存排序的规格和结果将在本章的内存排序部分详细讨论。现在,让我们来看看原子积分类型作为一个组的使用。

标准原子积分类型

std::atomic<bool>类似,标准的原子整型既不能复制构造,也不能复制赋值。然而,它们可以从相应的非原子标准变体中构造和分配。除了强制的is_lock_free()成员函数外,标准的原子积分类型,如std::atomic<int>std::atomic<unsigned long long>,也有load()store()exchange()compare_exchange_weak()compare_exchange_strong()成员函数,语义与std::atomic<bool>相似。

原子类型的整型变量确实支持数学运算,如fetch_add()fetch_sub()fetch_and()fetch_or()fetch_xor()、复合赋值运算符(+=-=&=|=^=)以及带有++ --的前后递增和递减运算符。

命名函数,如fetch_add()fetch_sub(),自动执行它们的操作并返回旧值,但是复合赋值运算符返回新值。前后递增/递减按照通常的 C/C++ 约定工作:后递增/递减执行操作,但返回旧值,前递增/递减运算符执行操作并返回新值。以下简单的示例可以轻松演示这些操作的规范:

int main() 
{ 
std::atomic<int> value; 

std::cout << "Result returned from Operation: " << value.fetch_add(5) << 'n'; 
std::cout << "Result after Operation: " << value << 'n'; 

std::cout << "Result returned from Operation: " << value.fetch_sub(3) << 'n'; 
std::cout << "Result after Operation: " << value << 'n'; 

std::cout << "Result returned from Operation: " << value++ << 'n'; 
std::cout << "Result after Operation: " << value << 'n'; 

std::cout << "Result returned from Operation: " << ++ value << 'n'; 
std::cout << "Result after Operation: " << value << 'n'; 

value += 1; 
std::cout << "Result after Operation: " << value << 'n'; 

value -= 1; 
std::cout << "Result after Operation: " << value << 'n'; 
} 

这段代码的输出应该如下所示:

Result returned from Operation: 0 
Result after Operation: 5 
Result returned from Operation: 5 
Result after Operation: 2 
Result returned from Operation: 2 
Result after Operation: 3 
Result returned from Operation: 4 
Result after Operation: 4 
Result after Operation: 5 
Result after Operation: 4 

除了std::atomic_flagstd::atomic<bool>之外,第一个表中列出的所有其他原子类型都是原子积分类型。现在,让我们来看看原子指针特殊化,std::atomic<T*>

标准::原子–指针算法

除了通常的一组操作如load()store()exchange()compare_exchange_weak()compare_exchange_strong()之外,原子指针类型还加载了指针算术操作。成员函数fetch_add()fetch_sub()为类型提供操作支持,对存储的地址进行原子加法和减法,操作符+=-=,以及前后递增/递减都使用++ --操作符。

运算符的工作方式与标准非原子指针算法相同。如果objstd::atomic<some_class*>,则一个对象指向一组some_class对象的第一个条目。obj+=2将其更改为指向数组中的第三个元素,并返回指向数组中第三个元素的some_class*的原始指针。如标准原子整数类型部分所述,命名函数如fetch_add()fetch_sub对原子类型执行操作,但将指针返回到数组中的第一个元素。

原子操作的函数形式还允许在函数调用的附加参数中指定内存排序语义:

obj.fetch_add(3, std::memory_order_release);

由于fetch_add()fetch_sub都是读-修改-写操作,它们可以使用标准原子库中的任何内存排序语义。但是,对于运算符形式,无法指定内存排序,因此这些运算符将始终具有memory_order_seq_cst语义。

std::原子<>初级类模板

标准库中的主类模板允许用户创建用户定义类型 ( UDT )的原子变体。要将用户定义的类型用作原子类型,您必须在实现类之前遵循一些标准。对于一个用户定义的类 UDT,如果这个类型有一个简单的复制赋值操作符,std::atomic<UDT>是可能的。这意味着用户定义的类不应包含任何虚拟函数或虚拟基类,并且必须使用编译器生成的默认复制分配运算符。此外,用户定义类的每个基类和非静态数据成员都必须有一个简单的复制赋值运算符。这允许编译器执行memcpy() 或赋值操作的等效操作,因为没有用户编写的代码要执行。

除了赋值操作符的要求,用户定义的类型必须是位相等可比的。这意味着您必须能够使用memcmp()比较相等的实例。这种保证是确保比较/交换操作正常进行所必需的。

对于具有用户定义类型T,即std::atomic<T>的标准原子类型的实例,界面仅限于std::atomic<bool> : load()store()exchange()compare_exchange_weak()compare_exchange_strong()可用的操作,以及从类型T实例的赋值和转换。

内存排序

我们已经了解了标准库中可用的原子类型和原子操作符。在对原子类型执行操作时,我们需要为某些操作指定内存排序。现在,我们将讨论不同内存排序语义的意义和用例。原子操作背后的关键思想是跨多个线程提供数据访问的同步,这是通过强制执行顺序来实现的。例如,如果对数据的写入发生在对数据的读取之前,那么一切都会好起来。否则,你就麻烦了!标准库有六个内存排序选项,可应用于原子类型的操作:memory_order_relaxedmemory_order_consumememory_order_acquirememory_order_releasememory_order_acq_relmemory_order_seq_cst。对于原子类型上的所有原子操作,默认情况下,memory_order_seq_cst是内存顺序,除非您指定其他内容。

这六个选项可以分为三类:

  • 顺序一致排序 : memory_order_seq_cst
  • 获取-发布订单 : memory_order_consumememory_order_releasememory_order_acquirememory_order_acq_rel
  • 轻松点餐 : memory_order_relaxed

对于不同的内存排序模型,执行成本因不同的 CPU 而异。与阻止顺序一致的排序相比,不同的内存排序模型的可用性允许专家利用更细粒度的排序关系的更高性能,但是要根据需要选择合适的内存模型,应该了解这些选项如何影响程序的行为。让我们先看看顺序一致的模型。

顺序一致性

顺序一致性的概念是由莱斯利·兰波特在 1979 年定义的。顺序一致性为程序的执行提供了两个保证。首先也是最重要的,内存排序程序的指令是按源代码顺序执行的,否则编译器会保证源代码顺序的假象。然后,所有线程中的所有原子操作都有一个全局顺序。

对于程序员来说,顺序一致性的全局排序行为(所有线程中的所有操作都发生在全局时钟中)是一个有趣的高地,但也是一个缺点。

顺序一致性的有趣之处在于,代码按照我们对多个并发线程的直觉工作,但是系统需要做大量的后台工作。下面的程序是一个简单的例子,让我们了解顺序一致性:

std::string result; 
std::atomic<bool> ready(false); 

void thread1() 
{ 
    while(!ready.load(std::memory_order_seq_cst)); 
    result += "consistency"; 
} 

void thread2() 
{ 
    result = "sequential "; 
    ready=true; 
} 

int main() 
{ 
    std::thread t1(thread1); 
    std::thread t2(thread2); 
    t1.join(); 
    t2.join(); 

    std::cout << "Result : " << result << 'n'; 
} 

前面的程序在顺序一致性的帮助下同步线程thread1thread2。由于顺序一致性,执行完全是确定性的,所以这个程序的输出总是如下:

Result : sequential consistency 

这里,thread1在 while 循环中等待,直到原子变量readytrue。一旦thread2中的就绪变为true,则thread1继续执行,因此结果总是以相同的顺序用字符串更新。顺序一致性的使用允许两个线程以相同的顺序查看其他线程中的操作,因此两个线程遵循相同的全局时钟。loop 语句还有助于保持两个线程同步的时间时钟。

获取-发布语义的细节将在下一节中介绍。

获取-发布顺序

现在,让我们深入研究 C++ 标准库提供的内存排序语义。这是程序员对多线程代码排序的直觉开始消退的地方,因为在原子操作的获取-释放语义中,线程之间没有全局同步。这些语义只允许同一原子变量上的原子操作之间的同步。详细来说,在一个线程中执行的对原子变量的加载操作可以与在另一个线程中对同一原子变量进行的存储操作同步。程序员必须提取这个特征,在原子变量之间建立一个先于的关系,以便在线程之间同步。这使得使用 acquire-release 模型有点困难,但同时也更令人兴奋。acquire-release 语义缩短了实现无锁编程的旅程,因为您不需要担心线程的同步,但是不同线程中相同原子变量的同步是我们需要思考的问题。

正如我们之前解释的,获取-发布语义的关键思想是发布操作与对同一原子变量的获取操作之间的同步,以及除此之外建立一个排序常数。现在,顾名思义,获取操作包括获取锁,锁包括用于读取原子变量的操作,例如load()test_and_set()函数。因此,锁的释放是一个释放操作,包括原子操作,如store()clear()

换句话说,锁定互斥体是一个获取操作,而解锁是一个释放操作。因此,在临界区中,变量的操作不能在两个方向上进行。但是,变量可以在临界区内移动,因为变量从不受保护的区域移动到受保护的区域。如下图所示:

临界区包含单向屏障:获取屏障和释放屏障。同样的推理可以应用于启动一个线程和在一个线程上发出一个连接调用,以及与标准库可用的所有其他同步原语相关的操作。

由于同步发生在原子变量级别,而不是线程级别,让我们重新回顾一下使用std::atomic_flag实现的自旋锁:

class spin_lock 
{ 
    std::atomic_flag flg; 

public: 
    spin_lock() : flg(ATOMIC_FLAG_INIT) 
    {} 

    void lock() 
    { 
        // acquire lock and spin 
        while (flg.test_and_set(std::memory_order_acquire)); 
    } 

    void unlock() 
    { 
        // release lock 
        flg.clear(std::memory_order_release); 
    } 
}; 

在本代码中,lock()功能是一个acquire操作。现在使用显式的获取内存排序标志,而不是使用上一个示例中使用的默认顺序一致内存排序。此外,unlock()函数也是一个使用默认内存顺序的发布操作,现在已经被显式的发布语义所取代。因此,具有两个线程顺序一致性的重量级同步被轻量级和高性能的获取-释放语义所取代。

随着使用spin_lock的线程数量增加到两个以上,使用std::memory_order_acquire的一般获取语义将是不够的,因为锁定方法变成了获取-释放操作。因此,记忆模式必须改为std::memory_order_acq_rel

到目前为止,我们已经看到顺序一致的排序确保了线程之间的同步,而获取-释放排序则建立了多个线程上对同一原子变量的读写操作之间的排序。现在,让我们看看宽松内存排序的规范。

宽松排序

使用标签std::memory_order_relaxed以宽松的内存排序对原子类型执行的操作不是同步操作。与标准库中可用的其他排序选项相比,它们不会在并发内存访问之间强加顺序。宽松的内存排序语义只保证同一线程内同一原子类型的操作不能被重新排序,这种保证称为修改顺序一致性。事实上,宽松的排序只能保证原子性和修改顺序的一致性。因此,其他线程可以以不同的顺序看到这些操作。

在不需要同步或排序的地方,可以有效地使用宽松的内存排序,原子性可以成为性能提升的额外优势。一个典型的例子是递增计数器,例如 std::shared_ptr 的引用计数器,它们只需要原子性。但是减少引用计数需要与这个模板类的析构函数进行获取-释放同步。

让我们看一个简单的例子来计算以宽松排序产生的线程数:

std::atomic<int> count = {0}; 

void func() 
{ 
    count.fetch_add(1, std::memory_order_relaxed); 
} 

int main() 
{ 
    std::vector<std::thread> v; 
    for (int n = 0; n < 10; ++ n) 
    { 
        v.emplace_back(func); 
    } 
    for (auto& t : v) 
    { 
        t.join(); 
    } 

    std::cout << "Number of spawned threads : " << count << 'n'; 
} 

在这段代码中,十个线程由main()函数和一个线程函数func()产生,其中在每个线程上,原子整数值使用原子操作fetch_add()增加 1。与std::atomic<int>提供的复合赋值运算符以及后递增和前递增运算符相比,fetch_add()函数可以接受内存排序参数,它就是std::memory_order_relaxed

程序打印程序中产生的线程数,如下所示:

Number of spawned threads : 10 

对于任何其他相关的内存排序标签,程序的输出保持不变,但是宽松的内存排序确保了原子性,从而确保了性能。

到目前为止,我们已经讨论了不同内存模型的级别,以及它们对原子和非原子操作的影响。现在,让我们深入研究一个使用原子操作的无锁数据结构的实现。

无锁数据结构队列

正如我们已经知道的,实际系统中的数据通常以数据结构的形式表示,当涉及到数据结构上的并发操作时,性能是一个大问题。在第 3 章、*c++*中的语言级并发和并行,我们学习了如何编写线程安全的堆栈。然而,我们使用锁和条件变量来实现它。为了解释如何编写无锁数据结构,让我们使用生产者/消费者范例编写一个非常基本的队列系统,而不使用锁或条件变量。这肯定会提高代码的性能。我们将从头开始推广它,而不是在标准数据类型上使用包装器。在这种情况下,我们假设只有一个生产者和一个消费者:

template<typename T> 
class Lock_free_Queue 
{ 
private: 
    struct Node 
    { 
        std::shared_ptr<T> my_data; 
        Node* my_next_node; 
        Node() : my_next_node(nullptr) 
        {} 
    }; 

    std::atomic<Node*> my_head_node; 
    std::atomic<Node*> my_tail_node; 

    Node* pop_head_node() 
    { 
        Node* const old_head_node = my_head_node.load(); 
        if(old_head_node == my_tail_node.load()) 
        { 
            return nullptr; 
        } 
        my_head_node.store(old_head_node->my_next_node); 
        return old_head_node; 
    } 

Lock_free_stack类包含一个表示队列节点(名为Node)的结构,其中数据成员表示节点(my_data)的数据,指针指向下一个节点。然后,该类包含两个指向用户定义结构Node的原子指针实例,该结构已经在该类中定义。一个实例存储指向队列头节点的指针,而另一个实例指向尾节点。最后,使用private pop_head_node()函数通过调用原子存储操作来检索队列的头节点,但前提是队列至少包含一个元素。这里,原子操作遵循默认的顺序一致的内存排序语义:

public: 
Lock_free_Queue() : my_head_node(new Node), my_tail_node(my_head_node.load()) 
    {} 
    Lock_free_Queue(const Lock_free_Queue& other) = delete; 
    Lock_free_Queue& operator= (const Lock_free_Queue& other) = delete; 

    ~Lock_free_Queue() 
    { 
        while(Node* const old_head_node = my_head_node.load()) 
        { 
            my_head_node.store(old_head_node->my_next_node); 
            delete old_head_node; 
        } 
    }

构造队列对象时,头节点被实例化,尾节点指向该内存。复制构造函数和复制赋值运算符被标记为已删除,以防止它们被使用。在析构函数中,队列中的所有元素都被迭代删除:

    std::shared_ptr<T> dequeue() 
    { 
        Node* old_head_node = pop_head_node(); 
        if(!old_head_node) 
        { 
            return std::shared_ptr<T>(); 
        } 
        std::shared_ptr<T> const result(old_head_node->my_data); 
        delete old_head_node; 
        return result; 
    } 

    void enqueue(T new_value) 
    { 
        std::shared_ptr<T> new_data(std::make_shared<T>(new_value)); 
        Node* p = new Node; 
        Node* const old_tail_node = my_tail_node.load(); 
        old_tail_node->my_data.swap(new_data); 
        old_tail_node->my_next_node = p; 
        my_tail_node.store(p); 
    } 
}; 

前面的代码片段实现了标准的队列操作,即入队和出队。在这里,我们已经使用交换和存储原子操作确保了在入队和出队之间的关系之前有发生。

摘要

在本章中,我们讨论了标准库提供的编写基于任务的并行性的工具。我们看到了如何使用std::packaged_taskstd::async的期货和承诺。我们讨论了现代 C++ 语言提供的新的多线程感知内存模型。之后,我们介绍了原子类型以及与之相关的操作。我们学到的最重要的东西是语言的各种记忆排序语义。简而言之,这一章和前一章将使我们能够对反应式编程模型的并发方面进行推理。

在下一章中,我们将把注意力从语言和并发转移到反应式编程模型的标准接口上。我们将报道天文台!*