|
1 | 1 | # cond
|
| 2 | + |
| 3 | +`sync.Cond` 是 Go 标准库中的条件变量,它是唯一一个需要手动初始化的同步工具。与其他同步原语不同,`sync.Cond` 需要传入一个互斥锁 (`sync.Mutex`) 来保护共享资源的访问。它允许协程在某个条件满足之前进入等待状态,并在条件满足时被唤醒。 |
| 4 | + |
| 5 | +## 示例代码 |
| 6 | + |
| 7 | +```go |
| 8 | +package main |
| 9 | + |
| 10 | +import ( |
| 11 | + "fmt" |
| 12 | + "sync" |
| 13 | + "time" |
| 14 | +) |
| 15 | + |
| 16 | +var i = 0 |
| 17 | + |
| 18 | +func main() { |
| 19 | + var mu sync.Mutex |
| 20 | + var wg sync.WaitGroup |
| 21 | + |
| 22 | + // 创建一个条件变量,并传入互斥锁 |
| 23 | + cd := sync.NewCond(&mu) |
| 24 | + |
| 25 | + // 添加 4 个待处理的协程 |
| 26 | + wg.Add(4) |
| 27 | + |
| 28 | + // 创建 3 个协程,每个协程都会等待条件满足 |
| 29 | + for j := range 3 { |
| 30 | + go func() { |
| 31 | + defer wg.Done() |
| 32 | + |
| 33 | + mu.Lock() |
| 34 | + for i <= 100 { |
| 35 | + // 条件不满足时,协程会被阻塞在此 |
| 36 | + cd.Wait() |
| 37 | + } |
| 38 | + fmt.Printf("%d wake up\n", j) |
| 39 | + mu.Unlock() |
| 40 | + }() |
| 41 | + } |
| 42 | + |
| 43 | + // 创建一个协程,用来更新条件并唤醒其他协程 |
| 44 | + go func() { |
| 45 | + defer wg.Done() |
| 46 | + for { |
| 47 | + mu.Lock() |
| 48 | + i++ // 更新共享变量 |
| 49 | + mu.Unlock() |
| 50 | + if i > 100 { |
| 51 | + cd.Broadcast() // 条件满足时唤醒所有等待的协程 |
| 52 | + break |
| 53 | + } |
| 54 | + time.Sleep(time.Millisecond * 10) // 模拟工作负载 |
| 55 | + } |
| 56 | + }() |
| 57 | + |
| 58 | + // 等待所有协程完成 |
| 59 | + wg.Wait() |
| 60 | +} |
| 61 | +``` |
| 62 | + |
| 63 | +在上面的示例中,共享变量 `i` 被多个协程并发访问和修改。通过互斥锁 `mu` 来确保在并发条件下,访问 `i` 的操作是安全的。然后,通过 `sync.NewCond(&mu)` 创建了一个条件变量 `cd`,它依赖于 `mu` 锁来保证在等待时对共享资源的访问是同步的。 |
| 64 | + |
| 65 | +- **三个等待的协程**:每个协程通过 `cd.Wait()` 阻塞自己,直到条件满足(`i > 100`)。这些协程会在共享资源 `i` 的值更新之前一直处于阻塞状态。 |
| 66 | +- **一个更新条件并唤醒其他协程的协程**:当条件满足时(即 `i > 100`),这个协程通过 `cd.Broadcast()` 唤醒所有等待的协程,让它们继续执行。 |
| 67 | + |
| 68 | + |
| 69 | + |
| 70 | +## 结构 |
| 71 | + |
| 72 | +```go |
| 73 | +type Cond struct { |
| 74 | + // L is held while observing or changing the condition |
| 75 | + L Locker |
| 76 | + |
| 77 | + notify notifyList |
| 78 | +} |
| 79 | + |
| 80 | +type notifyList struct { |
| 81 | + // wait is the ticket number of the next waiter. It is atomically |
| 82 | + // incremented outside the lock. |
| 83 | + wait atomic.Uint32 |
| 84 | + |
| 85 | + notify uint32 |
| 86 | + |
| 87 | + // List of parked waiters. |
| 88 | + lock mutex |
| 89 | + head *sudog |
| 90 | + tail *sudog |
| 91 | +} |
| 92 | +``` |
| 93 | + |
| 94 | +其结构并不复杂: |
| 95 | + |
| 96 | +- `L`,互斥锁,这里的类型是`Locker`接口,而不是具体的锁类型 |
| 97 | +- `notify`,等待协程的通知链表 |
| 98 | + |
| 99 | +比较重要的是`runtime.notifyList`结构 |
| 100 | + |
| 101 | +- `wait`,原子值,记录了有多少个等待协程 |
| 102 | +- `notify`,指向下一个将要被唤醒的协程,从0开始递增 |
| 103 | +- `lock`,互斥锁,并不是我们传入的锁,而是`runtime`内部实现的一个锁 |
| 104 | +- `head`,`tail`,链表指针 |
| 105 | + |
| 106 | +它总共就三个方法 |
| 107 | + |
| 108 | +- `Wait`, 阻塞等待 |
| 109 | +- `Signal` ,唤醒一个等待协程 |
| 110 | +- `Broadcast`,唤醒所有等待协程 |
| 111 | + |
| 112 | +它的大部分实现都被隐藏在了`runtime`库下,这些实现位于`runtime/sema.go`文件中,以至于在标准库中它的代码非常简短,其基本原理就是一个加了锁的阻塞队列。 |
| 113 | + |
| 114 | + |
| 115 | + |
| 116 | +## Wait |
| 117 | + |
| 118 | +`Wait`方法会让协程自身陷入阻塞等待,直到被唤醒。 |
| 119 | + |
| 120 | +```go |
| 121 | +func (c *Cond) Wait() { |
| 122 | + t := runtime_notifyListAdd(&c.notify) |
| 123 | + c.L.Unlock() |
| 124 | + runtime_notifyListWait(&c.notify, t) |
| 125 | + c.L.Lock() |
| 126 | +} |
| 127 | +``` |
| 128 | + |
| 129 | +它首先会将自身加入`notifyList`中,但其实只是将`notifyList.wait`加一而已,这里的操作就相当于`len(notifyList)-1`,得到了最后一个元素的下标 |
| 130 | + |
| 131 | +```go |
| 132 | +func notifyListAdd(l *notifyList) uint32 { |
| 133 | + return l.wait.Add(1) - 1 |
| 134 | +} |
| 135 | +``` |
| 136 | + |
| 137 | +真正的加入操作是在`notifyListWait`函数中完成 |
| 138 | + |
| 139 | +```go |
| 140 | +func notifyListWait(l *notifyList, t uint32) { |
| 141 | + ... |
| 142 | +} |
| 143 | +``` |
| 144 | + |
| 145 | +在该函数中,它首先会对链表进行上锁,然后快速判断当前协程是否已经被唤醒了,如果已经唤醒了就直接返回,不需要阻塞等待。 |
| 146 | + |
| 147 | +```go |
| 148 | +lockWithRank(&l.lock, lockRankNotifyList) |
| 149 | +// Return right away if this ticket has already been notified. |
| 150 | +if less(t, l.notify) { |
| 151 | + unlock(&l.lock) |
| 152 | + return |
| 153 | +} |
| 154 | +``` |
| 155 | + |
| 156 | +如果没有被唤醒,则构造成`sudog`加入队列,然后通过`gopark`挂起。 |
| 157 | + |
| 158 | +```go |
| 159 | +s := acquireSudog() |
| 160 | +s.g = getg() |
| 161 | +s.ticket = t |
| 162 | +s.releasetime = 0 |
| 163 | +if l.tail == nil { |
| 164 | + l.head = s |
| 165 | +} else { |
| 166 | + l.tail.next = s |
| 167 | +} |
| 168 | +l.tail = s |
| 169 | +goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3) |
| 170 | +``` |
| 171 | + |
| 172 | +被唤醒后释放`sudog`结构 |
| 173 | + |
| 174 | +```go |
| 175 | +releaseSudog(s) |
| 176 | +``` |
| 177 | + |
| 178 | + |
| 179 | + |
| 180 | +## Signal |
| 181 | + |
| 182 | +`Signal`会按照队列先入先出的顺序唤醒阻塞的协程 |
| 183 | + |
| 184 | +```go |
| 185 | +func (c *Cond) Signal() { |
| 186 | + runtime_notifyListNotifyOne(&c.notify) |
| 187 | +} |
| 188 | +``` |
| 189 | + |
| 190 | +它的流程如下 |
| 191 | + |
| 192 | +1. 不加锁直接判断,`l.wait`是否等于`l.notify`,相等则表示所有协程都已经唤醒 |
| 193 | + |
| 194 | + ```go |
| 195 | + if l.wait.Load() == atomic.Load(&l.notify) { |
| 196 | + return |
| 197 | + } |
| 198 | + ``` |
| 199 | + |
| 200 | +2. 加锁后,再判断一次是否都已经被唤醒 |
| 201 | + |
| 202 | + ```go |
| 203 | + lockWithRank(&l.lock, lockRankNotifyList) |
| 204 | + t := l.notify |
| 205 | + if t == l.wait.Load() { |
| 206 | + unlock(&l.lock) |
| 207 | + return |
| 208 | + } |
| 209 | + ``` |
| 210 | + |
| 211 | +3. `l.notify`加一 |
| 212 | + |
| 213 | + ```go |
| 214 | + atomic.Store(&l.notify, t+1) |
| 215 | + ``` |
| 216 | + |
| 217 | +4. 循环遍历链表,找到需要被唤醒的协程,最后通过`runtime.goready`来唤醒协程。 |
| 218 | + |
| 219 | + ```go |
| 220 | + for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { |
| 221 | + if s.ticket == t { |
| 222 | + n := s.next |
| 223 | + if p != nil { |
| 224 | + p.next = n |
| 225 | + } else { |
| 226 | + l.head = n |
| 227 | + } |
| 228 | + if n == nil { |
| 229 | + l.tail = p |
| 230 | + } |
| 231 | + unlock(&l.lock) |
| 232 | + s.next = nil |
| 233 | + readyWithTime(s, 4) |
| 234 | + return |
| 235 | + } |
| 236 | + } |
| 237 | + unlock(&l.lock) |
| 238 | + ``` |
| 239 | + |
| 240 | + |
| 241 | + |
| 242 | + |
| 243 | + |
| 244 | +## Broadcast |
| 245 | + |
| 246 | +`Broadcast`会唤醒所有阻塞的协程 |
| 247 | + |
| 248 | +```go |
| 249 | +func (c *Cond) Broadcast() { |
| 250 | + runtime_notifyListNotifyAll(&c.notify) |
| 251 | +} |
| 252 | +``` |
| 253 | + |
| 254 | +它的流程基本上是一致的 |
| 255 | + |
| 256 | +1. 无锁检查,是否都已经被唤醒了 |
| 257 | + |
| 258 | + ```go |
| 259 | + // Fast-path: if there are no new waiters since the last notification |
| 260 | + // we don't need to acquire the lock. |
| 261 | + if l.wait.Load() == atomic.Load(&l.notify) { |
| 262 | + return |
| 263 | + } |
| 264 | + ``` |
| 265 | + |
| 266 | +2. 加锁,清空链表,然后释放锁,后续新到达的协程会被添加到链表头部 |
| 267 | + |
| 268 | + ```go |
| 269 | + lockWithRank(&l.lock, lockRankNotifyList) |
| 270 | + s := l.head |
| 271 | + l.head = nil |
| 272 | + l.tail = nil |
| 273 | + atomic.Store(&l.notify, l.wait.Load()) |
| 274 | + unlock(&l.lock) |
| 275 | + ``` |
| 276 | + |
| 277 | +3. 遍历链表,唤醒所有协程 |
| 278 | + |
| 279 | + ```go |
| 280 | + for s != nil { |
| 281 | + next := s.next |
| 282 | + s.next = nil |
| 283 | + readyWithTime(s, 4) |
| 284 | + s = next |
| 285 | + } |
| 286 | + ``` |
| 287 | + |
| 288 | + |
| 289 | + |
| 290 | +## 小结 |
| 291 | + |
| 292 | +`sync.Cond` 最常见的使用场景是需要在多个协程之间同步某些条件,通常应用于生产者-消费者模型、任务调度等场景。在这些场景中,多个协程需要等待某些条件满足才能继续执行,或者需要在条件改变时通知多个协程。它提供了一种灵活且高效的方式来管理协程间的同步。通过与互斥锁配合使用,`sync.Cond` 可以确保共享资源的访问安全,并且可以在特定条件满足时控制协程的执行顺序。理解其内部实现原理有助于我们更好地掌握并发编程的技巧,尤其是在涉及复杂条件同步时。 |
0 commit comments