Skip to content

Latest commit

 

History

History
1021 lines (812 loc) · 56 KB

File metadata and controls

1021 lines (812 loc) · 56 KB

八、内核同步和锁定

内核地址空间由所有用户模式进程共享,这支持对内核服务和数据结构的并发访问。为了系统的可靠运行,核心服务的实现必须是可重入的。访问全局数据结构的内核代码路径需要同步,以确保共享数据的一致性和有效性。在这一章中,我们将详细介绍内核程序员可以用来同步内核代码路径和保护共享数据免受并发访问的各种资源。

本章将涵盖以下主题:

  • 原子操作
  • 自旋锁
  • 标准互斥体
  • 等待/缠绕互斥体
  • 旗语
  • 顺序锁
  • 完成

原子操作

如果一个计算操作在系统的其他部分看来是瞬间发生的,那么它就被认为是原子。原子性保证启动的操作不可分割且不间断地执行。大多数中央处理器指令集架构定义了指令操作码,可以在内存位置上执行原子读-修改-写操作。这些操作有一个成功或失败的定义,也就是说,它们要么成功地改变了内存位置的状态,要么失败了,没有明显的效果。这些操作对于在多线程场景中原子地操作共享数据来说很方便。它们也是实现排除锁的基础构造块,排除锁用于保护共享内存位置免受并行代码路径的并发访问。

Linux 内核代码对各种用例使用原子操作,例如共享数据结构中的引用计数器 ( 用于跟踪对各种内核数据结构的并发访问)、等待通知标志,以及启用特定代码路径的数据结构的独占所有权。为了确保直接处理原子操作的内核服务的可移植性,内核提供了一个丰富的体系结构中立的接口宏和内联函数库,作为依赖于处理器的原子指令的抽象。这些中性接口下相关的特定于 CPU 的原子指令由内核代码的架构分支实现。

原子整数运算

通用原子操作接口包括对整数和位操作的支持。整数运算被实现为对称为atomic_t (32 位整数)和atomic64_t (64 位整数)的特殊内核定义类型进行运算。这些类型的定义可以在通用内核标题<linux/types.h>中找到:

typedef struct {
        int counter;
} atomic_t;

#ifdef CONFIG_64BIT
typedef struct {
        long counter;
} atomic64_t;
#endif

实现提供两组整数运算;一组适用于 32 位,另一组适用于 64 位原子变量。这些接口操作被实现为一组宏和内联函数。以下是适用于atomic_t类型变量的操作汇总列表:

| 界面宏/内联功能 | 描述 | | ATOMIC_INIT(i) | 初始化原子计数器的宏 | | atomic_read(v) | 读取原子计数器的值v | | atomic_set(v, i) | 自动将计数器v设置为i中指定的值 | | atomic_add(int i, atomic_t *v) | 自动添加i到计数器v | | atomic_sub(int i, atomic_t *v) | 从计数器v中自动减去i | | atomic_inc(atomic_t *v) | 自动递增计数器v | | atomic_dec(atomic_t *v) | 自动递减计数器v |

以下是执行相关读-修改-写(【RMW】)操作并返回结果(即返回修改后写入内存地址的值)的函数列表:

| 操作 | 描述 | | bool atomic_sub_and_test(int i, atomic_t *v) | 自动从v中减去i,如果结果为零则返回true,否则返回false | | bool atomic_dec_and_test(atomic_t *v) | 自动将v减 1,如果结果为 0,则返回true,对于所有其他情况,则返回false | | bool atomic_inc_and_test(atomic_t *v) | 自动将i添加到v中,如果结果为 0,则返回true,对于所有其他情况,则返回false | | bool atomic_add_negative(int i, atomic_t *v) | 自动将i加到v上,如果结果为负,则返回true,如果结果大于或等于零,则返回false | | int atomic_add_return(int i, atomic_t *v) | 自动将i添加到v并返回结果 | | int atomic_sub_return(int i, atomic_t *v) | 自动从v中减去i并返回结果 | | int atomic_fetch_add(int i, atomic_t *v) | 自动将i加到v并在v返回预加值 | | int atomic_fetch_sub(int i, atomic_t *v) | 自动从v中减去i,并在v返回预减值 | | int atomic_cmpxchg(atomic_t *v, int old, int new) | 读取位置v处的值,检查是否等于old*;*如果true,则在v*new*交换值,并始终返回在v读取的值 | | int atomic_xchg(atomic_t *v, int new) | 将存储在位置v的旧值与new交换,并返回旧值v |

对于所有这些操作,存在 64 位变体用于atomic64_t;这些功能有命名约定atomic64_*()

原子逐位运算

内核提供的通用原子操作接口也包括按位操作。与整数运算不同,整数运算被实现为对atomic(64)_t类型进行操作,这些位运算可以应用于任何存储位置。这些操作的参数是位或位号的位置,以及一个具有有效地址的指针。32 位机器的位范围为 0-31,64 位机器的位范围为 0-63。以下是可用的按位运算的汇总列表:

| 操作界面 | 描述 | | set_bit(int nr, volatile unsigned long *addr) | 自动将位nr设置在从addr开始的位置 | | clear_bit(int nr, volatile unsigned long *addr) | 自动清除从addr开始的位置中的位nr | | change_bit(int nr, volatile unsigned long *addr) | 在从addr开始的位置自动翻转位nr | | int test_and_set_bit(int nr, volatile unsigned long *addr) | 自动将位nr设置在从addr开始的位置,并在nr<sup class="calibre47">th</sup>位返回旧值 | | int test_and_clear_bit(int nr, volatile unsigned long *addr) | 自动清除从addr开始的位置中的位nr,并在nr <sup class="calibre47">th</sup>位返回旧值 | | int test_and_change_bit(int nr, volatile unsigned long *addr) | 在从addr开始的位置自动翻转位nr,并在nr<sup class="calibre47">th</sup>位返回旧值 |

对于具有返回类型的所有操作,返回的值是在指定的修改发生之前从内存地址中读出的位的旧状态。这些操作的非原子版本也存在;对于可能需要从互斥关键块中的代码语句开始进行位操作的情况,它们是有效和有用的。这些在内核头<linux/bitops/non-atomic.h>中声明。

引入排除锁

硬件专用的原子指令只能对 CPU 字长和双字长的数据进行操作;它们不能直接应用于自定义大小的共享数据结构。对于大多数多线程场景,通常可以观察到共享数据具有定制的大小,例如,具有各种类型的 n 元素的结构。访问此类数据的并发代码路径通常包含一堆指令,这些指令被编程为访问和操作共享数据;这种访问操作必须自动执行以防止种族冲突。为了确保这些代码块的原子性,使用了互斥锁。所有多线程环境都提供基于排除协议的排除锁的实现。这些锁定实现建立在硬件特定的原子指令之上。

Linux 内核实现了标准排除机制的操作接口,例如相互排除和读写排除。它还包含对各种其他当代轻量级和无锁同步机制的支持。大多数内核数据结构和其他共享数据元素,如共享缓冲区和设备寄存器,都通过内核提供的适当的排除锁定接口来防止并发访问。在本节中,我们将探讨可用的排除项及其实施细节。

自旋锁

自旋锁是最简单和轻量级的互斥机制之一,被大多数并发编程环境广泛实现。自旋锁实现定义了锁结构和操作锁结构的操作。锁结构主要承载原子锁计数器和其他元素,操作接口包括:

  • 一个初始化例程,将自旋锁实例初始化为默认(解锁)状态
  • 一个锁定例程,试图通过自动改变锁计数器的状态来获取自旋锁
  • 一个解锁程序,通过改变计数器到解锁状态来释放旋转锁

当调用者上下文试图在锁定(或由另一个上下文持有)时获取自旋锁时,锁函数迭代地轮询或旋转锁,直到可用,导致调用者上下文占用中央处理器,直到获取锁。正是由于这个事实,这种排除机制被恰当地命名为自旋锁。因此,建议确保关键部分中的代码是原子的或非阻塞的,以便锁可以保持一段短暂的、确定性的时间,因为很明显,长时间保持自旋锁可能被证明是灾难性的。

如上所述,自旋锁是围绕处理器特定的原子操作构建的;内核的架构分支实现核心自旋锁操作(汇编编程)。内核通过可由内核服务直接使用的通用平台中立接口包装特定于架构的实现;这使得使用自旋锁保护共享资源的服务代码具有可移植性。

通用自旋锁接口可以在内核头<linux/spinlock.h>中找到,而特定于架构的定义是<asm/spinlock.h>的一部分。通用接口提供了一系列lock()unlock()操作,每个操作都是为特定的用例实现的。我们将在接下来的章节中讨论这些接口中的每一个;现在,让我们从界面提供的lock()unlock()操作的标准和最基本变体开始讨论。下面的代码示例显示了基本 spinlock 接口的用法:

DEFINE_SPINLOCK(s_lock);
spin_lock(&s_lock);
/* critical region ... */
spin_unlock(&s_lock);

让我们来看看这些功能在幕后的实现:

static __always_inline void spin_lock(spinlock_t *lock)
{
        raw_spin_lock(&lock->rlock);
}

...
...

static __always_inline void spin_unlock(spinlock_t *lock)
{
        raw_spin_unlock(&lock->rlock);
}

内核代码实现了自旋锁操作的两种变体;一个适用于 SMP 平台,另一个适用于单处理器平台。与构建的架构和类型(SMP 和 UP)相关的自旋锁数据结构和操作在内核源代码树的不同头中定义。让我们熟悉一下这些标题的作用和重要性:

<include/linux/spinlock.h>包含泛型 spinlock/rwlock 声明。

以下标题与 SMP 平台构建相关:

  • <asm/spinlock_types.h>包含arch_spinlock_t/arch_rwlock_t和初始值设定项
  • <linux/spinlock_types.h>定义泛型类型和初始值设定项
  • <asm/spinlock.h>包含arch_spin_*()和类似的低级操作实现
  • <linux/spinlock_api_smp.h>包含_spin_*()原料药的原型
  • <linux/spinlock.h>构建最终的spin_*()应用编程接口

以下标题与单处理器(UP)平台构建相关:

  • <linux/spinlock_type_up.h>包含通用、简化的 UP 自旋锁类型
  • <linux/spinlock_types.h>定义泛型类型和初始值设定项
  • <linux/spinlock_up.h>包含arch_spin_*()和类似版本的 UP 构建(非调试、非抢占构建上的 nop)
  • <linux/spinlock_api_up.h>构建_spin_*()应用编程接口
  • <linux/spinlock.h>构建最终的spin_*()应用编程接口

通用内核头<linux/spinlock.h>包含一个条件指令,用于决定要拉取的适当(SMP 或 UP) API。

/*
 * Pull the _spin_*()/_read_*()/_write_*() functions/declarations:
 */
#if defined(CONFIG_SMP) || defined(CONFIG_DEBUG_SPINLOCK)
# include <linux/spinlock_api_smp.h>
#else
# include <linux/spinlock_api_up.h>
#endif

raw_spin_lock()raw_spin_unlock()宏根据构建配置中选择的平台类型(SMP 或 UP)动态扩展到合适版本的自旋锁操作。对于 SMP 平台,raw_spin_lock()扩展到内核源文件kernel/locking/spinlock.c中实现的__raw_spin_lock()操作。以下是用宏定义的锁定操作代码:

/*
 * We build the __lock_function inlines here. They are too large for
 * inlining all over the place, but here is only one user per function
 * which embeds them into the calling _lock_function below.
 *
 * This could be a long-held lock. We both prepare to spin for a long
 * time (making _this_ CPU preemptable if possible), and we also signal
 * towards that other CPU that it should break the lock ASAP.
 */

#define BUILD_LOCK_OPS(op, locktype)                                    \
void __lockfunc __raw_##op##_lock(locktype##_t *lock)                   \
{                                                                       \
        for (;;) {                                                      \
                preempt_disable();                                      \
                if (likely(do_raw_##op##_trylock(lock)))                \
                        break;                                          \
                preempt_enable();                                       \
                                                                        \
                if (!(lock)->break_lock)                                \
                        (lock)->break_lock = 1;                         \
                while (!raw_##op##_can_lock(lock) && (lock)->break_lock)\
                        arch_##op##_relax(&lock->raw_lock);             \
        }                                                               \
        (lock)->break_lock = 0;                                         \
} 

该例程由嵌套循环构造、外部for循环构造和内部while循环组成,循环旋转直到满足指定条件。外环中的第一个代码块试图通过调用特定于架构的##_trylock()例程来自动获取锁。请注意,在本地处理器上禁用内核抢占的情况下调用该函数。如果锁被成功获取,它将脱离循环结构,并在抢占关闭的情况下返回调用。这确保了持有锁的调用者上下文在关键部分执行期间不可抢占。这种方法还确保了在当前所有者释放锁之前,没有其他上下文可以争夺本地 CPU 上的相同锁。

但是如果无法获取锁,通过preempt_enable()调用启用抢占,调用者上下文进入内循环。这个循环是通过一个条件while实现的,该条件旋转直到发现锁定可用。循环的每次迭代都会检查锁,当它检测到锁还不可用时,它会调用特定于架构的 relax 例程(执行特定于 CPU 的 nop 指令),然后再次旋转以检查锁。回想一下,在此期间,抢占被启用;这确保了调用者上下文是可抢占的,并且不会占用 CPU 很长时间,尤其是当锁被高度竞争时,这种情况可能会发生。它还允许在同一个中央处理器上调度的两个或多个线程争用同一个锁,这可能是通过抢占彼此来实现的。

当旋转上下文通过raw_spin_can_lock()检测到锁可用时,它会跳出while循环,导致调用者迭代回外部循环的开始(for循环),在那里它再次尝试通过禁用抢占来通过##_trylock()获取锁:

/*
 * In the UP-nondebug case there's no real locking going on, so the
 * only thing we have to do is to keep the preempt counts and irq
 * flags straight, to suppress compiler warnings of unused lock
 * variables, and to add the proper checker annotations:
 */
#define ___LOCK(lock) \
  do { __acquire(lock); (void)(lock); } while (0)

#define __LOCK(lock) \
  do { preempt_disable(); ___LOCK(lock); } while (0)

#define _raw_spin_lock(lock) __LOCK(lock)

与 SMP 变体不同,UP 平台的自旋锁实现非常简单;事实上,lock 例程只是禁用内核抢占,并将调用者放入一个关键部分。这是可行的,因为在抢占被暂停的情况下,没有另一个上下文争用锁的可能性。

备用自旋锁 API

到目前为止,我们讨论的标准自旋锁操作适用于保护只能从进程上下文内核路径访问的共享资源。然而,可能会有这样的场景:从内核服务的进程和中断上下文代码中访问特定的共享资源或数据。例如,考虑一个设备驱动程序服务,它可能包含进程上下文和中断上下文例程,这两个例程都被编程为访问共享驱动程序缓冲区以执行适当的输入/输出操作。

让我们假设使用了一个自旋锁来保护驱动程序的共享资源免受并发访问,并且使用标准的spin_lock()spin_unlock()操作用适当的关键部分对驱动程序服务的所有例程(进程和中断上下文)进行编程,以寻求对共享资源的访问。该策略将通过强制排除来确保共享资源的保护,但是会在随机时间在中央处理器上造成硬锁定条件,这是由于锁定由同一中央处理器上的中断路径代码竞争,其中锁定由进程上下文路径持有。为了进一步理解这一点,让我们假设以下事件以相同的顺序发生:

  1. 驱动程序的进程上下文例程使用标准的spin_lock()调用获取锁()。
  2. 当关键部分正在执行时,中断发生并被驱动到本地中央处理器,导致进程上下文例程抢占并释放中央处理器用于中断处理程序。
  3. 驱动程序(ISR)的中断上下文路径开始并尝试获取锁(使用标准的spin_lock()调用),然后开始旋转以使可用。

在 ISR 期间,进程上下文被抢占,永远无法恢复执行,导致永远无法释放的锁定,CPU 被永不屈服的旋转中断处理程序硬锁定。

为了防止这种情况发生,进程上下文代码需要在当前处理器锁定时禁用中断。这将确保中断永远不会抢占当前上下文,直到关键部分完成并锁定释放请注意,中断仍然会发生,但会被路由到其他可用的 CPU,中断处理程序可以在这些 CPU 上旋转,直到锁定变为可用。spinlock 接口提供了一个替代锁定例程spin_lock_irqsave(),该例程禁用当前处理器上的中断以及内核抢占。下面的代码片段显示了例程的底层代码:

unsigned long __lockfunc __raw_##op##_lock_irqsave(locktype##_t *lock)  \
{                                                                       \
        unsigned long flags;                                            \
                                                                        \
        for (;;) {                                                      \
                preempt_disable();                                      \
                local_irq_save(flags);                                  \
                if (likely(do_raw_##op##_trylock(lock)))                \
                        break;                                          \
                local_irq_restore(flags);                               \
                preempt_enable();                                       \
                                                                        \
                if (!(lock)->break_lock)                                \
                        (lock)->break_lock = 1;                         \
                while (!raw_##op##_can_lock(lock) && (lock)->break_lock)\
                        arch_##op##_relax(&lock->raw_lock);             \
        }                                                               \
        (lock)->break_lock = 0;                                         \
        return flags;                                                   \
} 

local_irq_save()被调用以禁用当前处理器的硬中断;注意在获取锁失败时,如何通过调用local_irq_restore()来启用中断。请注意,调用方使用spin_lock_irqsave()拍摄的lock需要使用spin_lock_irqrestore()解锁,这将在释放锁定之前为当前处理器启用内核抢占和中断。

类似于硬中断处理程序,软中断上下文例程,如软 irqs、小任务、和其他类似的下半部分也有可能争夺由同一处理器上的进程上下文代码持有的。这可以通过在进程上下文中获取时禁用下半部分的执行来防止。spin_lock_bh()是锁定例程的另一个变体,负责在本地 CPU 上暂停中断上下文下半部分的执行。

void __lockfunc __raw_##op##_lock_bh(locktype##_t *lock)                \
{                                                                       \
        unsigned long flags;                                            \
                                                                        \
        /* */                                                           \
        /* Careful: we must exclude softirqs too, hence the */          \
        /* irq-disabling. We use the generic preemption-aware */        \
        /* function: */                                                 \
        /**/                                                            \
        flags = _raw_##op##_lock_irqsave(lock);                         \
        local_bh_disable();                                             \
        local_irq_restore(flags);                                       \
} 

local_bh_disable()暂停本地 CPU 的下半部分执行。要释放spin_lock_bh()获得的,调用者上下文需要调用spin_unlock_bh(),这将释放本地 CPU 的自旋锁和 BH 锁。

以下是内核自旋锁应用编程接口的摘要列表:

| 功能 | 描述 | | spin_lock_init() | 初始化自旋锁 | | spin_lock() | 获得锁定,在竞争中旋转 | | spin_trylock() | 尝试获取锁,在争用时返回错误 | | spin_lock_bh() | 通过暂停本地处理器上的 BH 例程获取锁定,在争用时旋转 | | spin_lock_irqsave() | 通过保存当前中断状态暂停本地处理器上的中断来获取锁定,在争用时旋转 | | spin_lock_irq() | 通过暂停本地处理器上的中断获取锁定,在争用时旋转 | | spin_unlock() | 松开锁 | | spin_unlock_bh() | 释放锁定并启用本地处理器的下半部分 | | spin_unlock_irqrestore() | 释放锁定并将本地中断恢复到以前的状态 | | spin_unlock_irq() | 释放锁定并恢复本地处理器的中断 | | spin_is_locked() | 返回锁的状态,如果持有锁,返回非零值;如果有锁,返回零 |

读写器自旋锁

到目前为止讨论的 Spinlock 实现通过在为共享数据访问而竞争的并发代码路径之间强制执行标准互斥来保护共享数据。这种形式的排除不适用于保护共享数据,共享数据通常由并发代码路径读取,写入或更新不频繁。读取器-写入器锁强制读取器和写入器路径之间的排除;这允许并发读取器共享锁,当写入器拥有锁时,读取器任务需要等待锁。Rw 锁强制并发写入器之间的标准排除,这是所期望的。

Rw 锁由内核头<linux/rwlock_types.h>中声明的struct rwlock_t表示:

typedef struct {
        arch_rwlock_t raw_lock;
#ifdef CONFIG_GENERIC_LOCKBREAK
        unsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCK
        unsigned int magic, owner_cpu;
        void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
        struct lockdep_map dep_map;
#endif
} rwlock_t;

rwlocks 可以通过宏DEFINE_RWLOCK(v_rwlock)静态初始化,也可以通过rwlock_init(v_rwlock)在运行时动态初始化。

读取器代码路径将需要调用read_lock例程。

read_lock(&v_rwlock);
/* critical section with read only access to shared data */
read_unlock(&v_rwlock);

编写器代码路径使用以下内容:

write_lock(&v_rwlock);
/* critical section for both read and write */
write_unlock(&v_lock);

当争用锁时,读和写锁例程都会旋转。该界面还提供了称为read_trylock()write_trylock()的锁定功能的非旋转版本。它还提供了中断禁用版本的锁定调用,当读或写路径碰巧在中断或下半部分上下文中执行时,这很方便。

以下是接口操作的汇总列表:

| 功能 | 描述 | | read_lock() | 标准读锁定接口,争用旋转 | | read_trylock() | 尝试获取锁,如果锁不可用,则返回错误 | | read_lock_bh() | 试图通过暂停本地 CPU 的 BH 执行来获取锁定,在争用时旋转 | | read_lock_irqsave() | 试图通过保存本地中断的当前状态来暂停当前 CPU 的中断来获取锁定,在争用时旋转 | | read_unlock() | 释放读锁定 | | read_unlock_irqrestore() | 释放锁定,并将本地中断恢复到以前的状态 | | read_unlock_bh() | 释放读锁定并在本地处理器上启用 BH | | write_lock() | 标准写锁接口,争用时旋转 | | write_trylock() | 尝试获取锁,在争用时返回错误 | | write_lock_bh() | 试图通过挂起本地 CPU 的下半部分来获取写锁定,会因争用而旋转 | | wrtie_lock_irqsave() | 试图通过保存本地中断的当前状态来暂停本地 CPU 的中断,从而获取写锁定。在争论中旋转 | | write_unlock() | 释放写锁定 | | write_unlock_irqrestore() | 释放锁定并将本地中断恢复到以前的状态 | | write_unlock_bh() | 释放写锁定并在本地处理器上启用 BH |

所有这些操作的底层调用类似于自旋锁实现,可以在前面提到的自旋锁部分指定的头中找到。

互斥锁

自旋锁在设计上更适合于保持短的固定时间间隔的情况,因为忙等待无限期将对系统性能产生可怕的影响。然而,有大量的情况是被保持更长的、不确定的持续时间;睡眠锁正是为这种情况而设计的。内核互斥体是睡眠锁的一种实现:当调用者任务试图获取一个不可用的互斥体(已经被另一个上下文所拥有)时,它会进入睡眠状态,并被移出到等待队列中,迫使上下文切换,从而允许 CPU 运行其他生产性任务。当互斥体变得可用时,等待队列中的任务被互斥体的解锁路径唤醒并移动,然后互斥体可以尝试锁定互斥体。

互斥由struct mutex表示,在include/linux/mutex.h中定义,在源文件kernel/locking/mutex.c中实现相应的操作:

 struct mutex {
          atomic_long_t owner;
          spinlock_t wait_lock;
 #ifdef CONFIG_MUTEX_SPIN_ON_OWNER
          struct optimistic_spin_queue osq; /* Spinner MCS lock */
 #endif
          struct list_head wait_list;
 #ifdef CONFIG_DEBUG_MUTEXES
          void *magic;
 #endif
 #ifdef CONFIG_DEBUG_LOCK_ALLOC
          struct lockdep_map dep_map;
 #endif
 }; 

在其基本形式中,每个互斥体包含一个 64 位atomic_long_t计数器(owner),该计数器用于保持锁状态,并存储对拥有锁的当前任务的任务结构的引用。每个互斥体包含一个等待队列(wait_list)和一个旋转锁(wait_lock)来序列化对wait_list的访问。

互斥 API 接口提供了一组用于初始化、锁定、解锁和访问互斥状态的宏和函数。这些操作界面在<include/linux/mutex.h>中定义。

互斥体可以用宏DEFINE_MUTEX(name)声明和初始化。

还有一个通过mutex_init(mutex)动态初始化有效互斥体的选项。

如前所述,在争用时,锁定操作会使调用线程进入睡眠状态,这要求调用线程在进入互斥等待列表之前进入TASK_INTERRUPTIBLETASK_UNINTERRUPTIBLETASK_KILLABLE状态。为了支持这一点,互斥实现提供了锁操作的两种变体,一种用于不间断,另一种用于可中断睡眠。以下是标准互斥操作列表,并对每个操作进行了简短描述:

/**
 * mutex_lock - acquire the mutex
 * @lock: the mutex to be acquired
 *
 * Lock the mutex exclusively for this task. If the mutex is not
 * available right now, Put caller into Uninterruptible sleep until mutex 
 * is available.
 */
    void mutex_lock(struct mutex *lock);

/**
 * mutex_lock_interruptible - acquire the mutex, interruptible
 * @lock: the mutex to be acquired
 *
 * Lock the mutex like mutex_lock(), and return 0 if the mutex has
 * been acquired else put caller into interruptible sleep until the mutex  
 * until mutex is available. Return -EINTR if a signal arrives while sleeping
 * for the lock.                               
 */
 int __must_check mutex_lock_interruptible(struct mutex *lock); /**
 * mutex_lock_Killable - acquire the mutex, interruptible
 * @lock: the mutex to be acquired
 *
 * Similar to mutex_lock_interruptible(),with a difference that the call
 * returns -EINTR only when fatal KILL signal arrives while sleeping for the     
 * lock.                              
 */
 int __must_check mutex_lock_killable(struct mutex *lock); /**
 * mutex_trylock - try to acquire the mutex, without waiting
 * @lock: the mutex to be acquired
 *
 * Try to acquire the mutex atomically. Returns 1 if the mutex
 * has been acquired successfully, and 0 on contention.
 *
 */
    int mutex_trylock(struct mutex *lock); /**
 * atomic_dec_and_mutex_lock - return holding mutex if we dec to 0,
 * @cnt: the atomic which we are to dec
 * @lock: the mutex to return holding if we dec to 0
 *
 * return true and hold lock if we dec to 0, return false otherwise. Please 
 * note that this function is interruptible.
 */
    int atomic_dec_and_mutex_lock(atomic_t *cnt, struct mutex *lock); 
/**
 * mutex_is_locked - is the mutex locked
 * @lock: the mutex to be queried
 *
 * Returns 1 if the mutex is locked, 0 if unlocked.
 */
 static inline int mutex_is_locked(struct mutex *lock); /**
 * mutex_unlock - release the mutex
 * @lock: the mutex to be released
 *
 * Unlock the mutex owned by caller task.
 *
 */
 void mutex_unlock(struct mutex *lock);

尽管可能会阻塞调用,但互斥锁函数已经针对性能进行了极大的优化。它们被编程为在尝试锁定捕获时采用快速和慢速路径方法。让我们探索一下锁定调用的代码,以便更好地理解快速路径和慢速路径。以下代码节选自<kernel/locking/mutex.c>mutex_lock()例程:

void __sched mutex_lock(struct mutex *lock)
{
  might_sleep();

  if (!__mutex_trylock_fast(lock))
    __mutex_lock_slowpath(lock);
}

首先通过调用非阻塞快速路径调用__mutex_trylock_fast()来尝试获取锁。如果由于争用而无法获得锁定,则通过调用__mutex_lock_slowpath()进入慢速路径:

static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
  unsigned long curr = (unsigned long)current;

  if (!atomic_long_cmpxchg_acquire(&lock->owner, 0UL, curr))
    return true;

  return false;
}

该功能被编程为自动获取锁(如果可用)。它调用atomic_long_cmpxchg_acquire()宏,该宏试图将当前线程分配为互斥体的所有者;如果互斥体可用,该操作将成功,在这种情况下,函数返回true。如果其他线程拥有互斥锁,这个函数将失败并返回false。失败时,调用线程将进入慢速路径例程。

按照惯例,慢路径的概念一直是让调用者任务进入睡眠状态,同时等待锁变得可用。然而,随着多核 CPU 的出现,对可伸缩性和改进性能的需求越来越大,因此,为了实现可伸缩性,互斥体慢速路径实现已经通过一种称为乐观旋转的优化进行了返工,也称为中间路径,这可以显著提高性能*。*

乐观旋转的核心思想是,当发现互斥体所有者正在运行时,将竞争任务推进轮询或旋转,而不是休眠。一旦互斥体变得可用(预计会更快,因为发现拥有者正在运行),假设与互斥体等待列表中的挂起或休眠任务相比,旋转任务总是能够更快地获取互斥体。然而,只有在就绪状态下没有其他更高优先级的任务时,这种旋转才有可能。有了这个特性,旋转任务更有可能是高速缓存热的,从而导致确定性的执行,产生显著的性能改进:

static int __sched
__mutex_lock(struct mutex *lock, long state, unsigned int subclass,
       struct lockdep_map *nest_lock, unsigned long ip)
{
  return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL,     false);
}

...
...
...

static noinline void __sched __mutex_lock_slowpath(struct mutex *lock) 
{
        __mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_); 
}

static noinline int __sched
__mutex_lock_killable_slowpath(struct mutex *lock)
{
  return __mutex_lock(lock, TASK_KILLABLE, 0, NULL, _RET_IP_);
}

static noinline int __sched
__mutex_lock_interruptible_slowpath(struct mutex *lock)
{
  return __mutex_lock(lock, TASK_INTERRUPTIBLE, 0, NULL, _RET_IP_);
}

__mutex_lock_common()函数包含一个带有乐观旋转的慢速路径实现;这个例程由互斥锁函数的所有休眠变体调用,并以适当的标志作为参数。该函数首先尝试通过乐观旋转获取互斥体,乐观旋转是通过与互斥体相关联的可取消 mcs 自旋锁(【互斥体结构中的 T1】字段)实现的。当调用者任务无法通过乐观旋转获取互斥体时,作为最后的手段,该功能切换到常规的慢速路径,导致调用者任务进入睡眠状态,并排队进入互斥体wait_list,直到被解锁路径唤醒。

调试检查和验证

互斥操作的不正确使用会导致死锁、排除失败等等。为了检测和防止这种可能发生的情况,互斥子系统配备了适当的检查或验证工具,用于互斥操作。这些检查在默认情况下是禁用的,并且可以通过在内核构建期间选择配置选项CONFIG_DEBUG_MUTEXES=y来启用。

以下是由检测调试代码强制执行的检查列表:

  • 在给定的时间点,互斥体可以由一个任务拥有
  • 互斥体只能由有效的所有者释放(解锁),不拥有锁的上下文释放互斥体的尝试将失败
  • 递归锁定或解锁尝试将失败
  • 互斥体只能通过初始化器调用来初始化,任何对记忆集互斥体的尝试都不会成功
  • 持有互斥锁时,调用方任务可能不会退出
  • 不得释放持有锁所在的动态内存区域
  • 互斥体可以初始化一次,任何重新初始化已经初始化的互斥体的尝试都将失败
  • 互斥体不能用于硬/软中断上下文例程

死锁可能由于许多原因而触发,例如内核代码的执行模式和对锁定调用的粗心使用。例如,让我们考虑一种情况,其中并发代码路径需要通过嵌套锁定函数来获得 L 1L 2 锁的所有权。必须确保所有需要这些锁的内核函数都被编程为以相同的顺序获取它们。当没有严格执行这样的顺序时,总是有两个不同的函数试图以相反的顺序锁定 L1L2 ,当这些函数同时执行时,这可能会触发锁定反转死锁。

内核锁验证器基础设施已经实现,以检查并证明在内核运行时观察到的锁定模式都不会导致死锁。该基础结构打印与锁定模式相关的数据,例如:

  • 获取点跟踪、函数名的符号查找以及系统中所有锁的列表
  • 所有者跟踪
  • 检测自递归锁并打印出所有相关信息
  • 检测锁反转死锁并打印出所有受影响的锁和任务

可以通过在内核构建期间选择CONFIG_PROVE_LOCKING=y来启用锁验证器。

等待/缠绕互斥体

如前一节所述,内核函数中无序的嵌套锁定可能会带来锁反转死锁的风险,内核开发人员通过定义嵌套锁排序规则并通过锁验证器基础结构执行运行时检查来避免这种情况。然而,在有些情况下,锁的排序是动态的,嵌套的锁调用不能被硬编码或者按照预想的规则强加。

一个这样的用例是关于 GPU 缓冲区的;这些缓冲区将由各种系统实体拥有和访问,例如 GPU 硬件、GPU 驱动程序、用户模式应用和其他视频相关驱动程序。用户模式上下文可以以任意顺序提交 dma 缓冲区进行处理,GPU 硬件可以在任意时间处理它们。如果使用锁定来控制缓冲区的所有权,并且必须同时操作多个缓冲区,则无法避免死锁。等待/缠绕互斥体旨在促进嵌套锁的动态排序,而不会导致锁反转死锁。这是通过强制竞争中的上下文绕过来实现的,这意味着强制它释放保持锁。

例如,让我们假设两个缓冲区,每个缓冲区用一个锁保护,并进一步考虑两个线程,比如T<sub class="calibre76">1</sub>T 2通过以相反的顺序尝试锁来寻求缓冲区的所有权:

Thread T1       Thread T2
===========    ==========
lock(bufA);     lock(bufB);
lock(bufB);     lock(bufA);
 ....            ....
 ....            ....
unlock(bufB);   unlock(bufA);
unlock(bufA);   unlock(bufB);

并发执行T<sub class="calibre76">1</sub>T<sub class="calibre76">2</sub>可能会导致每个线程等待另一个线程持有的锁,从而导致死锁。等待/缠绕互斥体通过让首先抓住锁的线程保持睡眠,等待嵌套锁可用,来防止这种情况。另一根线被缠绕,使其松开握持锁,重新开始。假设T<sub class="calibre76">1</sub>T<sub class="calibre76">2</sub>能够锁定bufB之前锁定了bufAT<sub class="calibre76">1</sub>将被认为是最先到达并被bufB锁定而休眠的线,T<sub class="calibre76">2</sub>将被缠绕,导致其释放bufB上的锁定并重新开始。这避免了死锁,T<sub class="calibre76">2</sub>将在T<sub class="calibre76">1</sub>释放锁定时重新开始。

操作界面:

等待/缠绕互斥体通过标题<linux/ww_mutex.h>中定义的struct ww_mutex表示:

struct ww_mutex {
       struct mutex base;
       struct ww_acquire_ctx *ctx;
# ifdef CONFIG_DEBUG_MUTEXES
       struct ww_class *ww_class;
#endif
};

使用等待/缠绕互斥的第一步是定义一个类,,这是一种表示一组锁的机制。当并发任务争用相同的锁时,它们必须通过指定这个类来争用。

可以使用宏定义一个类:

static DEFINE_WW_CLASS(bufclass);

每个声明的类都是类型struct ww_class的一个实例,并包含一个原子计数器stamp,用于保存一个序列号,该序列号记录哪个竞争任务先到达那里。内核的锁验证器使用其他字段来验证等待/缠绕机制的正确使用。

struct ww_class {
       atomic_long_t stamp;
       struct lock_class_key acquire_key;
       struct lock_class_key mutex_key;
       const char *acquire_name;
       const char *mutex_name;
};

每个竞争线程必须在尝试嵌套锁定调用之前调用ww_acquire_init()。这通过为跟踪锁分配一个序列号来设置上下文。

/**
 * ww_acquire_init - initialize a w/w acquire context
 * @ctx: w/w acquire context to initialize
 * @ww_class: w/w class of the context
 *
 * Initializes a context to acquire multiple mutexes of the given w/w class.
 *
 * Context-based w/w mutex acquiring can be done in any order whatsoever 
 * within a given lock class. Deadlocks will be detected and handled with the
 * wait/wound logic.
 *
 * Mixing of context-based w/w mutex acquiring and single w/w mutex locking 
 * can result in undetected deadlocks and is so forbidden. Mixing different
 * contexts for the same w/w class when acquiring mutexes can also result in 
 * undetected deadlocks, and is hence also forbidden. Both types of abuse will 
 * will be caught by enabling CONFIG_PROVE_LOCKING.
 *
 */
   void ww_acquire_init(struct ww_acquire_ctx *ctx, struct ww_clas *ww_class);

一旦设置并初始化了上下文,任务就可以通过ww_mutex_lock()ww_mutex_lock_interruptible()调用开始获取锁:

/**
 * ww_mutex_lock - acquire the w/w mutex
 * @lock: the mutex to be acquired
 * @ctx: w/w acquire context, or NULL to acquire only a single lock.
 *
 * Lock the w/w mutex exclusively for this task.
 *
 * Deadlocks within a given w/w class of locks are detected and handled with 
 * wait/wound algorithm. If the lock isn't immediately available this function
 * will either sleep until it is(wait case) or it selects the current context
 * for backing off by returning -EDEADLK (wound case).Trying to acquire the
 * same lock with the same context twice is also detected and signalled by
 * returning -EALREADY. Returns 0 if the mutex was successfully acquired.
 *
 * In the wound case the caller must release all currently held w/w mutexes  
 * for the given context and then wait for this contending lock to be 
 * available by calling ww_mutex_lock_slow. 
 *
 * The mutex must later on be released by the same task that
 * acquired it. The task may not exit without first unlocking the mutex.Also,
 * kernel memory where the mutex resides must not be freed with the mutex 
 * still locked. The mutex must first be initialized (or statically defined) b
 * before it can be locked. memset()-ing the mutex to 0 is not allowed. The
 * mutex must be of the same w/w lock class as was used to initialize the 
 * acquired context.
 * A mutex acquired with this function must be released with ww_mutex_unlock.
 */
    int ww_mutex_lock(struct ww_mutex *lock, struct ww_acquire_ctx *ctx);

/**
 * ww_mutex_lock_interruptible - acquire the w/w mutex, interruptible
 * @lock: the mutex to be acquired
 * @ctx: w/w acquire context
 *
 */
   int  ww_mutex_lock_interruptible(struct ww_mutex *lock, 
                                             struct  ww_acquire_ctx *ctx);

当一个任务抓取与一个类相关联的所有嵌套锁(使用这些锁定例程中的任何一个)时,它需要使用函数ww_acquire_done()通知所有权的获取。该调用标志着采集阶段的结束,任务可以继续处理共享数据:

/**
 * ww_acquire_done - marks the end of the acquire phase
 * @ctx: the acquire context
 *
 * Marks the end of the acquire phase, any further w/w mutex lock calls using
 * this context are forbidden.
 *
 * Calling this function is optional, it is just useful to document w/w mutex
 * code and clearly designated the acquire phase from actually using the 
 * locked data structures.
 */
 void ww_acquire_done(struct ww_acquire_ctx *ctx);

当任务完成对共享数据的处理后,它可以开始释放所有持有的锁,调用ww_mutex_unlock(例程。一旦所有锁被释放,上下文必须通过调用ww_acquire_fini()来释放:

/**
 * ww_acquire_fini - releases a w/w acquire context
 * @ctx: the acquire context to free
 *
 * Releases a w/w acquire context. This must be called _after_ all acquired 
 * w/w mutexes have been released with ww_mutex_unlock.
 */
    void ww_acquire_fini(struct ww_acquire_ctx *ctx);

旗语

在 2.6 内核版本发布之前,信号量是睡眠锁的主要形式。典型的信号量实现包括计数器、等待队列和一组可以自动递增/递减计数器的操作。

当信号量用于保护共享资源时,其计数器被初始化为大于零的数字,这被认为是解锁状态。寻求访问共享资源的任务从调用信号量上的减量操作开始。这个调用检查信号量计数器;如果发现大于零,计数器递减,函数返回成功。但是,如果发现计数器为零,减量操作将调用方任务置于睡眠状态,直到发现计数器增加到大于零的数字。

这种简单的设计提供了极大的灵活性,允许信号量在不同情况下的适应性和应用。例如,对于资源需要在任何时间点被特定数量的任务访问的情况,信号量计数可以被初始化为需要访问的任务数量,比如 10,这允许在任何时间最多 10 个任务访问共享资源。对于其他情况,例如许多需要互斥访问共享资源的任务,信号量计数可以初始化为 1,导致在任何给定时间点最多有一个任务访问资源。

信号量结构及其接口操作在内核头<include/linux/semaphore.h>中声明:

struct semaphore {
        raw_spinlock_t     lock;
        unsigned int       count;
        struct list_head   wait_list;
};

自旋锁(T0)字段用作count的保护,也就是说,信号量操作(inc/dec)被编程为在操纵count之前获取lockwait_list用于在任务等待信号量计数增加到零以上时,将任务排队等待睡眠。

信号量可以通过一个宏来声明和初始化为 1:DEFINE_SEMAPHORE(s)

信号量也可以通过以下方式动态初始化为任意正数:

void sema_init(struct semaphore *sem, int val)

以下是操作界面列表,并对每个界面进行了简要描述。具有命名约定down_xxx()的例程试图减少信号量,并且可能阻塞调用(除了down_trylock(),而例程up()增加信号量并且总是成功:

/**
 * down_interruptible - acquire the semaphore unless interrupted
 * @sem: the semaphore to be acquired
 *
 * Attempts to acquire the semaphore.  If no more tasks are allowed to
 * acquire the semaphore, calling this function will put the task to sleep.
 * If the sleep is interrupted by a signal, this function will return -EINTR.
 * If the semaphore is successfully acquired, this function returns 0.
 */
 int down_interruptible(struct semaphore *sem); /**
 * down_killable - acquire the semaphore unless killed
 * @sem: the semaphore to be acquired
 *
 * Attempts to acquire the semaphore.  If no more tasks are allowed to
 * acquire the semaphore, calling this function will put the task to sleep.
 * If the sleep is interrupted by a fatal signal, this function will return
 * -EINTR.  If the semaphore is successfully acquired, this function returns
 * 0.
 */
 int down_killable(struct semaphore *sem); /**
 * down_trylock - try to acquire the semaphore, without waiting
 * @sem: the semaphore to be acquired
 *
 * Try to acquire the semaphore atomically.  Returns 0 if the semaphore has
 * been acquired successfully or 1 if it it cannot be acquired.
 *
 */
 int down_trylock(struct semaphore *sem); /**
 * down_timeout - acquire the semaphore within a specified time
 * @sem: the semaphore to be acquired
 * @timeout: how long to wait before failing
 *
 * Attempts to acquire the semaphore.  If no more tasks are allowed to
 * acquire the semaphore, calling this function will put the task to sleep.
 * If the semaphore is not released within the specified number of jiffies,
 * this function returns -ETIME.  It returns 0 if the semaphore was acquired.
 */
 int down_timeout(struct semaphore *sem, long timeout); /**
 * up - release the semaphore
 * @sem: the semaphore to release
 *
 * Release the semaphore.  Unlike mutexes, up() may be called from any
 * context and even by tasks which have never called down().
 */
 void up(struct semaphore *sem);

与互斥实现不同,信号量操作不支持调试检查或验证;这种限制是由于它们固有的通用设计,允许它们用作排除锁、事件通知计数器等。自从互斥体进入内核(2.6.16)以来,信号量不再是排除的首选,信号量作为锁的使用已经大大减少,为了其他目的,内核有了替代接口。大多数使用信号量的内核代码都被转换成了互斥体,只有少数例外。然而信号量仍然存在,并且可能会一直存在,至少直到所有使用它们的内核代码都被转换成互斥体或其他合适的接口。

读写信号量

这个接口是睡眠读写排除的一个实现,作为旋转读写排除的替代。读写信号量由struct rw_semaphore表示,在内核头<linux/rwsem.h>中声明:

struct rw_semaphore {
        atomic_long_t count;
        struct list_head wait_list;
        raw_spinlock_t wait_lock;
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
       struct optimistic_spin_queue osq; /* spinner MCS lock */
       /*
       * Write owner. Used as a speculative check to see
       * if the owner is running on the cpu.
       */
      struct task_struct *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
     struct lockdep_map dep_map;
#endif
};

该结构与互斥体相同,设计支持osq乐观旋转;它还包括通过内核的锁定程序的调试支持。Count用作排除计数器,设置为 1,允许一个时间点最多有一个写入程序拥有锁。这是可行的,因为互斥只在竞争的作者之间执行,任何数量的读者都可以同时共享读锁。wait_lock是一个保护信号量wait_list的自旋锁。

一个rw_semaphore可以通过DECLARE_RWSEM(name)进行静态实例化和初始化,也可以通过init_rwsem(sem)进行动态初始化。

与 rw-spin lock 的情况一样,该接口也为读取器和写入器路径中的锁获取提供了不同的例程。以下是接口操作列表:

/* reader interfaces */
   void down_read(struct rw_semaphore *sem);
   void up_read(struct rw_semaphore *sem);
/* trylock for reading -- returns 1 if successful, 0 if contention */
   int down_read_trylock(struct rw_semaphore *sem);
   void up_read(struct rw_semaphore *sem);

/* writer Interfaces */
   void down_write(struct rw_semaphore *sem);
   int __must_check down_write_killable(struct rw_semaphore *sem);

/* trylock for writing -- returns 1 if successful, 0 if contention */
   int down_write_trylock(struct rw_semaphore *sem); 
   void up_write(struct rw_semaphore *sem);
/* downgrade write lock to read lock */
   void downgrade_write(struct rw_semaphore *sem); 

/* check if rw-sem is currently locked */  
   int rwsem_is_locked(struct rw_semaphore *sem);

这些操作在源文件<kernel/locking/rwsem.c>中实现;该代码是非常不言自明的,我们不会进一步讨论它。

顺序锁

传统的读取器-写入器锁被设计为具有读取器优先级,并且它们可能导致写入器任务等待非确定性的持续时间,这可能不适合具有时间敏感更新的共享数据。这就是顺序锁派上用场的地方,因为它旨在提供对共享资源的快速和无锁访问。当需要保护的资源既小又简单,写访问又快又不频繁时,顺序锁是最好的,因为内部顺序锁依赖于自旋锁原语。

顺序锁引入了一个特殊的计数器,每当编写器获得一个顺序锁和一个自旋锁时,该计数器就会递增。写入程序完成后,它会释放自旋锁并再次递增计数器,并为其他写入程序打开访问权限。对于 read,有两种类型的读取器:序列读取器和锁定读取器。序列读取器在计数器进入临界区之前检查计数器,然后在计数器结束时再次检查,而不会阻塞任何写入器。如果计数器保持不变,则意味着在读取期间没有写入程序访问过该部分,但是如果该部分末尾的计数器增加,则表明写入程序已经访问过该部分,这要求读取器重新读取关键部分以获取更新的数据。A 锁定阅读器,顾名思义,在进行中会获得一个锁,并屏蔽其他阅读器和作者;当另一个锁定读取器或写入器正在进行时,它也会等待。

序列锁由以下类型表示:

typedef struct {
        struct seqcount seqcount;
        spinlock_t lock;
} seqlock_t;

我们可以使用以下宏静态初始化序列锁:

#define DEFINE_SEQLOCK(x) \
               seqlock_t x = __SEQLOCK_UNLOCKED(x)

实际初始化使用__SEQLOCK_UNLOCKED(x)完成,这里定义为:

#define __SEQLOCK_UNLOCKED(lockname)                 \
       {                                               \
               .seqcount = SEQCNT_ZERO(lockname),     \
               .lock = __SPIN_LOCK_UNLOCKED(lockname)   \
       }

要动态初始化序列锁,我们需要使用seqlock_init宏,定义如下:

  #define seqlock_init(x)                                     \
       do {                                                   \
               seqcount_init(&(x)->seqcount);                 \
               spin_lock_init(&(x)->lock);                    \
       } while (0)

应用接口

Linux 提供了很多使用序列锁的 API,在</linux/seqlock.h>中有定义。这里列出了一些重要的问题:

static inline void write_seqlock(seqlock_t *sl)
{
        spin_lock(&sl->lock);
        write_seqcount_begin(&sl->seqcount);
}

static inline void write_sequnlock(seqlock_t *sl)
{
        write_seqcount_end(&sl->seqcount);
        spin_unlock(&sl->lock);
}

static inline void write_seqlock_bh(seqlock_t *sl)
{
        spin_lock_bh(&sl->lock);
        write_seqcount_begin(&sl->seqcount);
}

static inline void write_sequnlock_bh(seqlock_t *sl)
{
        write_seqcount_end(&sl->seqcount);
        spin_unlock_bh(&sl->lock);
}

static inline void write_seqlock_irq(seqlock_t *sl)
{
        spin_lock_irq(&sl->lock);
        write_seqcount_begin(&sl->seqcount);
}

static inline void write_sequnlock_irq(seqlock_t *sl)
{
        write_seqcount_end(&sl->seqcount);
        spin_unlock_irq(&sl->lock);
}

static inline unsigned long __write_seqlock_irqsave(seqlock_t *sl)
{
        unsigned long flags;

        spin_lock_irqsave(&sl->lock, flags);
        write_seqcount_begin(&sl->seqcount);
        return flags;
}

以下两个功能用于开始和结束阅读部分的阅读:

static inline unsigned read_seqbegin(const seqlock_t *sl)
{
        return read_seqcount_begin(&sl->seqcount);
}

static inline unsigned read_seqretry(const seqlock_t *sl, unsigned start)
{
        return read_seqcount_retry(&sl->seqcount, start);
}

完成锁

完成锁如果需要一个或多个执行线程来等待某个事件的完成,比如等待另一个进程到达某个点或状态,那么完成锁是实现代码同步的有效方法。完成锁可能比信号量更受欢迎,原因有二:多个执行线程可以等待完成,使用complete_all(),它们可以同时被释放。这比信号量唤醒多个线程要好得多。其次,如果等待线程释放同步对象,信号量会导致竞争条件;使用 completion 时不存在这个问题。

可以通过包含<linux/completion.h>和创建类型为struct completion的变量来使用完成,这是一种用于保持完成状态的不透明结构。它使用先进先出来排队等待完成事件的线程:

struct completion {
        unsigned int done;
        wait_queue_head_t wait;
};

完成基本上包括初始化完成结构,等待通过wait_for_completion()调用的任何变体,最后通过complete()complete_all()调用发出完成信号。还有一些函数可以在生命周期内检查完成的状态。

初始化

以下宏可用于完成结构的静态声明和初始化:

#define DECLARE_COMPLETION(work) \
       struct completion work = COMPLETION_INITIALIZER(work)

以下内联函数将初始化动态创建的完成结构:

static inline void init_completion(struct completion *x)
{
        x->done = 0;
        init_waitqueue_head(&x->wait);
}

如果需要重用完成结构,将使用以下内联函数来重新初始化它。这可以在complete_all()之后使用:

static inline void reinit_completion(struct completion *x)
{
        x->done = 0;
}

等待完成

如果任何线程需要等待任务完成,它会在初始化的完成结构上调用wait_for_completion()。如果wait_for_completion操作发生在调用complete()complete_all()之后,线程简单地继续,因为它想要等待的原因已经满足;否则,它将等待complete()发出信号。wait_for_completion()呼叫有多种变体:

extern void wait_for_completion_io(struct completion *);
extern int wait_for_completion_interruptible(struct completion *x);
extern int wait_for_completion_killable(struct completion *x);
extern unsigned long wait_for_completion_timeout(struct completion *x,
                                                   unsigned long timeout);
extern unsigned long wait_for_completion_io_timeout(struct completion *x,
                                                    unsigned long timeout);
extern long wait_for_completion_interruptible_timeout(
        struct completion *x, unsigned long timeout);
extern long wait_for_completion_killable_timeout(
        struct completion *x, unsigned long timeout);
extern bool try_wait_for_completion(struct completion *x);
extern bool completion_done(struct completion *x);

extern void complete(struct completion *);
extern void complete_all(struct completion *);

信令完成

想要发出预期任务完成信号的执行线程调用complete()到一个等待线程,以便它可以继续。线程将以排队的相同顺序被唤醒。在多个服务员的情况下,它调用complete_all():

void complete(struct completion *x)
{
        unsigned long flags;

        spin_lock_irqsave(&x->wait.lock, flags);
        if (x->done != UINT_MAX)
                x->done++;
        __wake_up_locked(&x->wait, TASK_NORMAL, 1);
        spin_unlock_irqrestore(&x->wait.lock, flags);
}
EXPORT_SYMBOL(complete);
void complete_all(struct completion *x)
{
        unsigned long flags;

        spin_lock_irqsave(&x->wait.lock, flags);
        x->done = UINT_MAX;
        __wake_up_locked(&x->wait, TASK_NORMAL, 0);
        spin_unlock_irqrestore(&x->wait.lock, flags);
}
EXPORT_SYMBOL(complete_all);

摘要

在这一章中,我们不仅理解了内核提供的各种保护和同步机制,还对这些选项的有效性及其各种功能和缺点进行了潜在的尝试。我们从这一章中得到的启示是内核在提供数据保护和同步时处理这些不同复杂性的坚韧。另一个值得注意的事实是,在处理这些问题时,内核保持了编码的简易性和设计的华丽。

在下一章中,我们将研究内核如何处理中断的另一个重要方面。