Skip to content

Latest commit

 

History

History
565 lines (426 loc) · 29.1 KB

09.md

File metadata and controls

565 lines (426 loc) · 29.1 KB

九、设计并发数据结构

在前一章中,我们谈到了 C++ 中并发和多线程的基础。并行代码设计中最大的挑战之一是正确处理数据竞争。线程同步和编排不是一个容易掌握的话题,尽管我们可能认为它是最重要的话题。虽然我们可以在对数据竞争有丝毫怀疑的任何地方使用同步原语,如互斥体,但这不是我们建议的最佳实践。

设计并发代码的更好方法是不惜一切代价避免锁。这不仅会提高应用的性能,还会使它比以前更安全。说起来容易做起来难——无锁编程是我们在本章中介绍的一个具有挑战性的主题。特别是,我们将进一步深入设计无锁算法和数据结构的基础。这是许多优秀开发人员不断研究的一个难题。我们将触及无锁编程的基础,这将让您了解如何以有效的方式构造代码。阅读本章后,您将能够更好地描绘数据竞赛的问题,并获得设计并发算法和数据结构所需的基本知识。这也可能有助于您的一般设计技能来构建容错系统。

本章将涵盖以下主题:

  • 了解数据竞争和基于锁的解决方案
  • 在 C++ 代码中使用原子
  • 设计无锁数据结构

技术要求

带有-std=c++ 2a选项的 g++ 编译器用于编译本章中的示例。你可以在https://github.com/PacktPublishing/Expert-CPP找到本章使用的源文件。

仔细看看数据竞赛

正如已经多次指出的,数据竞争是程序员不惜一切代价试图避免的情况。在前一章中,我们讨论了死锁和避免死锁的方法。我们在前一章中使用的最后一个例子是创建一个线程安全的单例模式。假设我们使用一个类来创建数据库连接(一个经典的例子)。

下面是跟踪数据库连接的模式的简单实现。每次需要访问数据库时保持单独的连接不是一个好的做法。相反,我们重用现有的连接来从程序的不同部分查询数据库:

namespace Db {
  class ConnectionManager 
  {
  public:
    static std::shared_ptr<ConnectionManager> get_instance()
 {
 if (instance_ == nullptr) {
 instance_.reset(new ConnectionManager());
 }
 return instance_;
 }

    // Database connection related code omitted
  private:
    static std::shared_ptr<ConnectionManager> instance_{nullptr};
  };
}

让我们更详细地讨论这个例子。在前一章中,我们引入了锁定来保护get_instance()函数免受数据竞争的影响。让我们详细说明我们这样做的原因。为了简化示例,我们对以下四条线感兴趣:

get_instance()
  if (_instance == nullptr)
    instance_.reset(new)
  return instance_;

现在,假设我们运行一个访问get_instance()函数的线程。我们将其命名为Thread A,它执行的第一行是条件语句,如图所示:

get_instance()
  if (_instance == nullptr)   <--- Thread A
    instance_.reset(new)
  return instance_;

它将一行行地执行指令。我们更感兴趣的是第二个线程(标记为Thread B),它开始执行与Thread A并发的功能。在函数的并发执行过程中,可能会出现以下情况:

get_instance()
  if (_instance == nullptr)   <--- Thread B (checking)
    instance_.reset(new)      <--- Thread A (already checked)
  return instance_;

Thread B对比instance_nullptr得到肯定的结果。Thread A通过了相同的检查,并将instance_设置为新对象。而从Thread A的角度来看一切都很好,只是通过了条件检查,重置instances,将继续下一行返回instance_。然而,就在Thread B的价值发生变化之前,它与instance_进行了比较。因此,Thread B也继续设置instance_的值:

get_instance()
  if (_instance == nullptr)   
    instance_.reset(new)      <--- Thread B (already checked)
  return instance_;           <--- Thread A (returns)

前面的问题是Thread B在已经设置好之后重置instance_。同样,我们将get_instance()视为单一操作;它由几个指令组成,每个指令由一个线程顺序执行。为了使两个线程不相互干扰,操作不应该由一个以上的指令组成。

我们关注数据竞争的原因是前面代码块中显示的差距。线与线之间的间隙会让线相互干扰。当您使用同步原语(如互斥体)设计解决方案时,您应该描绘出您错过的所有间隙,因为解决方案可能不是正确的。以下修改使用了互斥体和前一章中讨论的double-checked锁定模式:

static std::shared_ptr<ConnectionManager> get_instance()
{
  if (instance_ == nullptr) {
    // mutex_ is declared in the private section
 std::lock_guard lg{mutex_};
 if (instance_ == nullptr) { // double-checking
 instance_.reset(new ConnectionManager());
 }
  }
  return instance_;
}

以下是当两个线程试图访问instance_对象时发生的情况:

get_instance()
  if (instance_ == nullptr)     <--- Thread B
    lock mutex                  <--- Thread A (locks the mutex)
    if (instance_ == nullptr)
      instance_.reset(new)
    unlock mutex
  return instance_

现在,即使两个线程都通过了第一次检查,其中一个也会锁定互斥体。当其中一个线程试图锁定互斥体时,另一个线程将重置实例。为了确保它没有被设置,我们使用了第二个检查(这就是为什么它被称为双重检查锁定):

get_instance()
  if (instance_ == nullptr)
    lock mutex                  <--- Thread B (tries to lock, waits)
    if (instance_ == nullptr)   <--- Thread A (double check)
      instance_.reset(new)      
    unlock mutex
  return instance_

Thread A完成设置instance_后,它会解锁互斥体,这样Thread B就可以继续锁定和重置instance_:

get_instance()
  if (instance_ == nullptr)
    lock mutex                  <--- Thread B (finally locks the mutex)
    if (instance_ == nullptr)   <--- Thread B (check is not passed)
      instance_.reset(new)      
    unlock mutex                <--- Thread A (unlocked the mutex)
  return instance_              <--- Thread A (returns)  

根据经验,您应该始终从代码的字里行间寻找答案。两个语句之间总是有一个间隙,这个间隙会使两个或多个线程相互干扰。下一节将详细讨论一个递增数字的经典例子。

同步增量

几乎每本涉及线程同步主题的书都使用递增数字的经典示例作为数据竞赛示例。这本书也不例外。示例如下:

#include <thread>

int counter = 0;

void foo()
{
 counter++ ;
}

int main()
{
  std::jthread A{foo};
  std::jthread B{foo};
  std::jthread C{[]{foo();}};
  std::jthread D{
    []{
      for (int ix = 0; ix < 10; ++ ix) { foo(); }
    }
  };
}

我们增加了几个线程,使示例更加复杂。前面的代码只不过是使用四个不同的线程增加counter变量。乍一看,在任何时间点,只有一个线程递增counter。然而,正如我们在上一节中提到的,我们应该注意并寻找代码中的漏洞。foo()功能似乎少了一个。增量运算符的行为如下(作为伪代码):

auto res = counter;
counter = counter + 1;
return res;

现在,我们已经发现了本不该存在的空白。所以现在,在任何时间点,只有一个线程执行前面三条指令中的一条。也就是说,类似下面这样的事情是可能的:

auto res = counter;     <--- thread A
counter = counter + 1;  <--- thread B
return res;             <--- thread C

因此,例如,thread B可能会修改counter的值,而thread A会读取其先前的值。这意味着thread A将在thread B完成后为counter分配一个新的增量值。混乱带来了混乱,我们的大脑迟早会爆炸,试图理解操作的顺序。作为一个经典的例子,我们将继续使用线程锁定机制来解决这个问题。这里有一个流行的解决方案:

#include <thread>
#include <mutex>

int counter = 0;
std::mutex m;

void foo()
{
 std::lock_guard g{m};
  counter++ ;
}

int main()
{
  // code omitted for brevity
}

到达lock_guard的线程首先锁定mutex,如下图所示:

lock mutex;             <--- thread A, B, D wait for the locked mutex 
auto res = counter;     <--- thread C has locked the mutex
counter = counter + 1;
unlock mutex;           *<--- A, B, D are blocked until C reaches here*
return res;             

使用锁定的问题在于性能。理论上,我们使用线程来加速程序执行,更具体地说,数据处理。在大数据集合的情况下,使用多线程可能会显著提高程序的性能。但是,在多线程环境中,我们首先处理并发访问,因为用多个线程访问集合可能会导致集合损坏。例如,让我们看看线程安全的堆栈实现。

实现线程安全堆栈

第 6 章中调用堆栈数据结构适配器,深入了解 STL 中的数据结构和算法。我们将使用锁实现堆栈的线程安全版本。堆栈有两个基本操作,pushpop。它们都修改了容器的状态。如您所知,堆栈本身不是容器;它是一个适配器,包装了一个容器,并提供了一个合适的接口来访问。我们将通过引入线程安全将std::stack包装在一个新的类中。除了建造和破坏功能外,std::stack还提供以下功能:

  • top():访问栈顶元素
  • empty():如果堆栈为空,则返回真
  • size():返回当前堆栈的大小
  • push():在堆栈中插入一个新项目(在顶部)
  • emplace():在堆栈顶部的适当位置构造一个元素
  • pop():移除堆栈的顶部元素
  • swap():用另一个堆栈交换内容

我们将保持简单,专注于线程安全的思想,而不是构建一个强大的全功能堆栈。这里主要关注的是修改底层数据结构的函数。我们的兴趣在于push()pop()功能。如果几个线程相互干扰,这些函数可能会破坏数据结构。因此,下面的声明是表示线程安全堆栈的类:

template <typename T>
class safe_stack
{
public:
  safe_stack();
  safe_stack(const safe_stack& other);
  void push(T value); // we will std::move it instead of copy-referencing
  void pop();
  T& top();
  bool empty() const;

private:
  std::stack<T> wrappee_;
  mutable std::mutex mutex_;
};

请注意,我们将mutex_声明为可变的,因为我们将其锁定在empty()常量函数中。可以说,这是一个比去掉empty()元素更好的设计选择。但是,现在您应该知道,对任何数据成员使用可变参数都表明我们做出了错误的设计选择。反正safe_stack的客户端代码不会太在意实现的内在细节;它甚至不知道堆栈使用互斥来同步并发访问。

现在让我们看看它的成员函数的实现以及一个简短的描述。让我们从复制构造函数开始:

safe_stack::safe_stack(const safe_stack& other)
{
  std::lock_guard<std::mutex> lock(other.mutex_);
  wrappee_ = other.wrappee_;
}

请注意,我们锁定了另一个堆栈的互斥体。尽管看起来不公平,但我们需要确保在复制另一个堆栈时,它的底层数据不会被修改。

接下来,我们来看看push()功能的实现。很明显很简单;我们锁定互斥体并将数据推入底层堆栈:

void safe_stack::push(T value)
{
  std::lock_guard<std::mutex> lock(mutex_);
  // note how we std::move the value
  wrappee_.push(std::move(value));
}

几乎所有的函数都以相同的方式合并线程同步:锁定互斥体、执行任务和解锁互斥体。这确保了在任何时候只有一个线程在访问数据。也就是说,为了保护数据不受竞争条件的影响,我们必须确保函数不变量不会被破坏。

If you are not a fan of typing long C++ type names such as std::lock_guard<std::mutex>, use the using keyword to make short aliases for types, for example, using locker = std::guard<std::mutex>;.

现在,转到pop()函数,我们可以修改类声明,使pop()直接返回栈顶的值。我们这样做主要是因为我们不希望有人访问栈顶(带有引用),然后从另一个线程中弹出数据。因此,我们将修改pop()函数来创建一个共享对象,然后返回堆栈元素:

std::shared_ptr<T> pop()
{
  std::lock_guard<std::mutex> lock(mutex_);
  if (wrappee_.empty()) {
    throw std::exception("The stack is empty");
  }
  std::shared_ptr<T> top_element{std::make_shared<T>(std::move(wrappee_.top()))};
  wrappee_.pop();
  return top_element;
}

注意safe_stack类的声明也应该根据pop()函数的修改而改变。而且,我们不再需要top()了。

设计无锁数据结构

如果保证至少有一个线程能够取得进展,那么我们就说它是一个无锁函数。与基于锁的函数相比,在基于锁的函数中,一个线程可以阻塞另一个线程,并且它们都可能在取得进展之前等待某种条件,无锁状态确保至少一个线程取得进展。我们说使用数据同步原语的算法和数据结构是阻塞的,也就是说,一个线程被挂起,直到另一个线程执行一个操作。这意味着线程在块被移除之前无法取得进展(通常是解锁互斥)。我们的兴趣在于不使用阻塞函数的数据结构和算法。我们称其中一些为无锁的,尽管我们应该区分非阻塞算法和数据结构的类型。

使用原子类型

在本章的前面,我们介绍了源代码行之间的差距是数据竞争的原因。每当你有一个包含多个指令的操作时,你的大脑应该提醒你一个可能的问题。然而,你如何努力使操作独立和单一并不重要;大多数情况下,如果不将操作分解为涉及多个指令的步骤,就无法实现任何目标。C++ 通过提供原子类型来拯救人类。

首先,让我们理解为什么使用原子这个词。一般来说,我们把原子理解为不能分解成更小部分的东西。也就是说,原子操作是一个不能半途而废的操作:要么完成,要么不完成。原子操作的一个例子可能是整数的简单赋值:

num = 37;

如果两个线程访问这一行代码,它们都不会遇到这种半途而废的情况。换句话说,作业之间没有间隙。当然,如果num用用户定义的赋值运算符表示一个复杂的对象,同样的语句可能会有很多空白。

An atomic operation is an indivisible operation.

另一方面,一个非原子的操作可能会被视为半途而废。经典的例子是我们前面讨论的增量操作。在 C++ 中,对原子类型的所有操作也是原子的。这意味着我们可以通过使用原子类型来避免行间的空白。在使用原子之前,我们可以通过使用互斥来创建原子操作。例如,我们可以考虑以下原子函数:

void foo()
{
  mutex.lock();
  int a{41};
  int b{a + 1};
  mutex.unlock();
}

真正的原子操作和我们刚才做的假操作的区别在于原子操作不需要锁。这实际上是一个很大的区别,因为同步机制,如互斥体,包含了开销和性能损失。更准确地说,原子类型利用低级机制来确保指令的独立和原子执行。标准原子类型在<atomic>标题中定义。然而,标准原子类型也可能使用内部锁定。为了确保它们不使用内部锁定,标准库中的所有原子类型都公开了is_lock_free()函数。

The only atomic type that doesn't have the is_lock_free() member function is std::atomic_flag. The operations on this type are required to be lock-free. It's a Boolean flag and most of the time it is used as a base to implement other lock-free types.

也就是说,如果对obj的操作是用原子指令直接完成的,则obj.is_lock_free()返回true。如果返回 false,则表示使用了内部锁定。还有:如果原子类型对所有支持的硬件都是无锁的,则static constexpr功能is_always_lock_free()返回true。由于函数是constexpr,它允许我们在编译时定义类型是否是无锁的。这是一个很大的进步,并且很好地影响了代码的组织和执行。例如,std::atomic<int>::is_always_lock_free()返回true,因为std::atomic<int>很可能总是无锁的。

In Greek, a means not and tomo means cut. The word atom comes from the Greek atomos, which translates to uncuttable. That is, by atomic we consider indivisible smallest units. We use atomic types and operations to avoid gaps between instructions.

我们对原子类型使用专门化,例如,std::atomic<long>;但是,您可以参考下表来获得原子类型的更方便的名称。表的左侧列包含原子类型,右侧列包含其专门化:

| 原子型 | 专精 | | atomic_bool | std::atomic<bool> | | atomic_char | std::atomic<char> | | atomic_schar | std::atomic<signed char> | | atomic_uchar | std::atomic<unsigned char> | | atomic_int | std::atomic<int> | | atomic_uint | std::atomic<unsigned> | | atomic_short | std::atomic<short> | | atomic_ushort | std::atomic<unsigned short> | | atomic_long | std::atomic<long> | | atomic_ulong | std::atomic<unsigned long> | | atomic_llong | std::atomic<long long> | | atomic_ullong | std::atomic<unsigned long long> | | atomic_char16_t | std::atomic<char16_t> | | atomic_char32_t | std::atomic<char32_t> | | atomic_wchar_t | std::atomic<wchar_t> |

上表代表基本原子类型。常规类型和原子类型之间的根本区别是我们可以对它们应用的操作类型。现在让我们更详细地讨论原子操作。

对原子类型的操作

回想一下我们在上一节中讨论的差距。原子类型的目标是要么消除指令之间的间隙,要么提供负责将几条指令组合在一起并包装成一条指令的操作。以下是对原子类型的操作:

  • load()
  • store()
  • exchange()
  • compare_exchange_weak()
  • compare_exchange_strong()
  • wait()
  • notify_one()
  • notify_all()

load()操作自动加载并返回原子变量的值。store()用提供的非原子参数自动替换原子变量的值。

load()store()都类似于非原子变量的常规读取和赋值操作。每当我们访问一个对象的值时,我们就执行一个读指令。例如,以下代码打印了double变量的内容:

double d{4.2}; // "store" 4.2 into "d"
std::cout << d; // "read" the contents of "d"

在原子类型的情况下,类似的读取操作转换为:

atomic_int m;
m.store(42);             // atomically "store" the value
std::cout << m.load();   // atomically "read" the contents 

虽然前面的代码没有任何意义,但是我们包含了这个例子来表示处理原子类型的差异。访问原子变量应该通过原子操作来完成。以下代码表示load()store()exchange()功能的定义:

T load(std::memory_order order = std::memory_order_seq_cst) const noexcept;
void store(T value, std::memory_order order = 
            std::memory_order_seq_cst) noexcept;
T exchange(T value, std::memory_order order = 
            std::memory_order_seq_cst) noexcept;

可以看到,还有一个名为order的类型为std::memory_order的附加参数。我们将很快描述它。exchange()函数由store()load()函数组成,以原子方式用提供的参数替换该值,并原子方式获得前一个值。

compare_exchange_weak()compare_exchange_strong()功能的工作原理相似。以下是它们的定义:

bool compare_exchange_weak(T& expected_value, T target_value, 
                           std::memory_order order = 
                            std::memory_order_seq_cst) noexcept;
bool compare_exchange_strong(T& expected_value, T target_value,
                            std::memory_order order =
                             std::memory_order_seq_cst) noexcept;

他们将第一个参数(expected_value)与原子变量进行比较,如果它们相等,则用第二个参数(target_value)替换该变量。否则,它们会自动将值加载到第一个参数中(这就是它被引用传递的原因)。弱交易所和强交易所的区别在于compare_exchange_weak()被允许虚假失败(称为虚假失败,也就是说,即使expected_value等于基础值,函数也会将它们视为不相等。这样做是因为在某些平台上,它可以提高性能。

从 C++ 20 开始增加了wait()notify_one()notify_all()功能。wait()函数阻塞线程,直到原子对象的值修改。它需要一个参数来与原子对象的值进行比较。如果这些值相等,它将阻塞线程。要手动解锁线程,我们可以调用notify_one()notify_all()。两者的区别在于notify_one()至少解除一个被阻塞的操作,而notify_all()解除所有这样的操作。

现在,让我们讨论一下在前面声明的原子类型成员函数中遇到的内存顺序。std::memory_order定义围绕原子操作的内存访问顺序。当多个线程同时读取和写入变量时,一个线程可以以不同于另一个线程存储它们的顺序读取这些变化。原子操作的默认顺序是顺序一致的顺序,这就是std::memory_order_seq_cst的作用。订单有几种类型,包括memory_order_relaxedmemory_order_consumememory_order_acquirememory_order_releasememory_order_acq_relmemory_order_seq_cst。在下一节中,我们将设计一个使用原子类型和默认内存顺序的无锁堆栈。

设计无锁堆栈

设计堆栈时要记住的一个关键点是确保推送的值可以安全地从另一个线程返回。同样重要的是确保只有一个线程返回值。

在前几节中,我们实现了一个基于锁的堆栈,它包装了std::stack。我们知道堆栈不是真正的数据结构,而是适配器。通常,在实现堆栈时,我们选择向量或链表作为其底层数据结构。让我们看一个基于链表的无锁栈的例子。将新元素推入堆栈包括创建一个新的列表节点,将其next指针设置为当前的head节点,然后将head节点设置为指向新插入的节点。

If you are confused by the terms head or next pointer, revisit Chapter 6, Digging into Data Structures and Algorithms in STL, where we discussed linked lists in detail.

在单线程环境中,所描述的步骤很好;但是,如果有多个线程修改堆栈,我们应该开始担心了。让我们找到push()操作的陷阱。当一个新元素被推入堆栈时,有三个主要步骤:

  1. node* new_elem = new node(data);
  2. new_elem->next = head_;
  3. head_ = new_elem;

在第一步中,我们声明将要插入到基础链表中的新节点。第二步描述我们将它插入到列表的前面——这就是为什么新节点的next指针指向head_。最后,由于head_指针代表列表的起点,我们应该重置它的值以指向新添加的节点,如步骤 3 中所做的。

节点类型是我们在堆栈中用来表示列表节点的内部结构。以下是它的定义:

template <typename T>
class lock_free_stack
{
private:
 struct node {
 T data;
 node* next;
 node(const T& d) : data(d) {}
 }  node* head_;
// the rest of the body is omitted for brevity
};

我们建议您做的第一件事是在代码中寻找间隙——不是在前面的代码中,而是在我们描述的将新元素推入堆栈的步骤中。仔细看看。假设两个线程同时添加节点。步骤 2 中的一个线程设置新元素的下一个指针指向head_。另一个线程使head_指向另一个新元素。很明显,这可能会导致数据损坏。对于第 2 步和第 3 步来说,一个线程拥有相同的head_是至关重要的。为了解决步骤 2 和 3 之间的竞争情况,我们应该使用原子比较/交换操作来保证head_在我们之前读取其值时没有被修改。由于我们需要原子地访问头部指针,下面是我们如何修改lock_free_stack类中的head_ member:

template <typename T>
class lock_free_stack
{
private:
  // code omitted for brevity
 std::atomic<node*> head_;  // code omitted for brevity
};

以下是我们如何围绕原子head_指针实现无锁push():

void push(const T& data)
{
  node* new_elem = new node(data);
  new_elem->next = head_.load();
  while (!head_.compare_exchange_weak(new_elem->next, new_elem));
}

我们使用compare_exchange_weak()来确保head_指针的值与我们存储在new_elem->next中的值相同。如果是,我们设置为new_elem。一旦compare_exchange_weak()成功,我们确定该节点已经成功插入到列表中。

看看我们如何通过使用原子操作来访问节点。类型为T - std::atomic<T*>的指针的原子形式提供了相同的接口。除此之外,std::atomic<T*>还提供了指向算术运算fetch_add()fetch_sub()的指针。他们对存储的地址进行原子加法和减法运算。这里有一个例子:

struct some_struct {};
any arr[10];
std::atomic<some_struct*> ap(arr);
some_struct* old = ap.fetch_add(2);
// now old is equal to arr
// ap.load() is equal to &arr[2]

我们有意将指针命名为old,因为fetch_add()将数字加到指针的地址上,并返回old值。这就是为什么old指向的地址与arr指向的地址相同。

在下一节中,我们将介绍更多关于原子类型的操作。现在,让我们回到我们的无锁堆栈。到pop()一个元素,也就是去掉一个节点,我们需要读取head_设置到head_的下一个元素,如下图:

void pop(T& popped_element)
{
  node* old_head = head_;
  popped_element = old_head->data;
  head_ = head_->next;
  delete old_head;
}

现在,好好看看前面的代码。想象几个线程同时执行它。如果从堆栈中移除项目的两个线程读取相同的head_值会怎么样?这一点和其他一些比赛条件使我们实现了以下目标:

void pop(T& popped_element)
{
  node* old_head = head_.load();
  while (!head_.compare_exchange_weak(old_head, old_head->next));
  popped_element = old_head->data;
}

我们在前面的代码中应用了与push()函数几乎相同的逻辑。前面的代码并不完美;它应该得到加强。我们建议您努力修改它以消除内存泄漏。

我们已经看到,无锁实现非常依赖原子类型和操作。我们在上一节中讨论的操作不是最终的。现在让我们发现更多的原子操作。

更多原子操作

在前一节中,我们在指向用户定义类型的指针上使用了std::atomic<>。也就是说,我们为列表节点声明了以下结构:

// the node struct is internal to 
// the lock_free_stack class defined above
struct node
{
  T data;
  node* next;
};

节点结构是用户定义的类型。虽然在上一节中我们实例化了std::atomic<node*>,但是以同样的方式,我们可以为几乎任何用户定义的类型实例化std::atomic<>,也就是std::atomic<T>。但是需要注意的是std::atomic<T>的界面仅限于以下功能:

  • load()
  • store()
  • exchange()
  • compare_exchange_weak()
  • compare_exchange_strong()
  • wait()
  • notify_one()
  • notify_all()

现在,让我们根据底层类型的具体情况,查看原子类型上可用操作的完整列表。

std::atomic<>用整数类型(如整数或指针)实例化,除了前面列出的操作外,还有以下操作:

  • fetch_add()
  • fetch_sub()
  • fetch_or()
  • fetch_and()
  • fetch_xor()

另外,除了增量(++ )和减量(--)外,还有以下运算符:+=-=|=&=^=

最后,还有一种称为atomic_flag的特殊原子类型,有两种可用操作:

  • clear()
  • test_and_set()

你应该考虑一下原子操作。clear()功能清除,而test_and_set()将值更改为true并返回前一个值。

摘要

在本章中,我们介绍了一个相当简单的堆栈设计示例。还有更复杂的例子需要研究和效仿。当我们讨论设计并发堆栈时,我们看了两个版本,其中一个代表无锁堆栈。与基于锁的解决方案相比,无锁数据结构和算法是程序员的最终目标,因为它们提供了避免数据竞争的机制,甚至无需同步资源。

我们还介绍了原子类型和操作,您可以在项目中使用它们来确保指令不可分割。正如您已经知道的,如果一个指令是原子的,就不需要担心它的同步。我们强烈建议您继续研究该主题,并构建更加健壮和复杂的无锁数据结构。在下一章中,我们将看到如何设计全球通用的应用。

问题

  1. 为什么我们要在多线程单例实现中检查两次实例?
  2. 在基于锁的堆栈的复制构造函数的实现中,我们锁定了另一个堆栈的互斥体。为什么呢?
  3. 什么是原子类型和原子操作?
  4. 为什么我们对原子类型使用load()store()
  5. std::atomic<T*>支持哪些附加操作?

进一步阅读