Skip to content

Commit 79c8f8b

Browse files
committed
WIP make channels parallelism-safe
1 parent 09a87d8 commit 79c8f8b

File tree

5 files changed

+100
-30
lines changed

5 files changed

+100
-30
lines changed

src/runtime/chan.go

+93-29
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
//go:build !scheduler.threads
2-
31
package runtime
42

53
// This file implements the 'chan' type and send/receive/select operations.
@@ -32,11 +30,12 @@ package runtime
3230
// non-select operations) so that the select operation knows which case did
3331
// proceed.
3432
// The value is at the same time also a way that goroutines can be the first
35-
// (and only) goroutine to 'take' a channel operation to change it from
36-
// 'waiting' to any other value. This is important for the select statement
37-
// because multiple goroutines could try to let different channels in the
38-
// select statement proceed at the same time. By using Task.Data, only a
39-
// single channel operation in the select statement can proceed.
33+
// (and only) goroutine to 'take' a channel operation using an atomic CAS
34+
// operation to change it from 'waiting' to any other value. This is important
35+
// for the select statement because multiple goroutines could try to let
36+
// different channels in the select statement proceed at the same time. By
37+
// using Task.Data, only a single channel operation in the select statement
38+
// can proceed.
4039
// - It is possible for the channel queues to contain already-processed senders
4140
// or receivers. This can happen when the select statement managed to proceed
4241
// but the goroutine doing the select has not yet cleaned up the stale queue
@@ -51,15 +50,17 @@ import (
5150

5251
// The runtime implementation of the Go 'chan' type.
5352
type channel struct {
54-
closed bool
55-
elementSize uintptr
56-
bufCap uintptr // 'cap'
57-
bufLen uintptr // 'len'
58-
bufHead uintptr
59-
bufTail uintptr
60-
senders chanQueue
61-
receivers chanQueue
62-
buf unsafe.Pointer
53+
closed bool
54+
selectLocked bool
55+
elementSize uintptr
56+
bufCap uintptr // 'cap'
57+
bufLen uintptr // 'len'
58+
bufHead uintptr
59+
bufTail uintptr
60+
senders chanQueue
61+
receivers chanQueue
62+
lock task.PMutex
63+
buf unsafe.Pointer
6364
}
6465

6566
const (
@@ -75,7 +76,8 @@ type chanQueue struct {
7576

7677
// Pus the next channel operation to the queue. All appropriate fields must have
7778
// been initialized already.
78-
// This function must be called with interrupts disabled.
79+
// This function must be called with interrupts disabled and the channel lock
80+
// held.
7981
func (q *chanQueue) push(node *channelOp) {
8082
node.next = q.first
8183
q.first = node
@@ -101,16 +103,17 @@ func (q *chanQueue) pop(chanOp uint32) *channelOp {
101103
newDataValue := chanOp | popped.index<<2
102104

103105
// Try to be the first to proceed with this goroutine.
104-
if popped.task.DataUint32() == chanOperationWaiting {
105-
popped.task.SetDataUint32(newDataValue)
106+
swapped := popped.task.DataAtomicUint32().CompareAndSwap(0, newDataValue)
107+
if swapped {
106108
return popped
107109
}
108110
}
109111
}
110112

111113
// Remove the given to-be-removed node from the queue if it is part of the
112114
// queue. If there are multiple, only one will be removed.
113-
// This function must be called with interrupts disabled.
115+
// This function must be called with interrupts disabled and the channel lock
116+
// held.
114117
func (q *chanQueue) remove(remove *channelOp) {
115118
n := &q.first
116119
for *n != nil {
@@ -161,8 +164,8 @@ func chanCap(c *channel) int {
161164
}
162165

163166
// Push the value to the channel buffer array, for a send operation.
164-
// This function may only be called when interrupts are disabled and it is known
165-
// there is space available in the buffer.
167+
// This function may only be called when interrupts are disabled, the channel is
168+
// locked and it is known there is space available in the buffer.
166169
func (ch *channel) bufferPush(value unsafe.Pointer) {
167170
elemAddr := unsafe.Add(ch.buf, ch.bufHead*ch.elementSize)
168171
ch.bufLen++
@@ -176,8 +179,8 @@ func (ch *channel) bufferPush(value unsafe.Pointer) {
176179

177180
// Pop a value from the channel buffer and store it in the 'value' pointer, for
178181
// a receive operation.
179-
// This function may only be called when interrupts are disabled and it is known
180-
// there is at least one value available in the buffer.
182+
// This function may only be called when interrupts are disabled, the channel is
183+
// locked and it is known there is at least one value available in the buffer.
181184
func (ch *channel) bufferPop(value unsafe.Pointer) {
182185
elemAddr := unsafe.Add(ch.buf, ch.bufTail*ch.elementSize)
183186
ch.bufLen--
@@ -193,7 +196,8 @@ func (ch *channel) bufferPop(value unsafe.Pointer) {
193196
}
194197

195198
// Try to proceed with this send operation without blocking, and return whether
196-
// the send succeeded. Interrupts must be disabled when calling this function.
199+
// the send succeeded. Interrupts must be disabled and the lock must be held
200+
// when calling this function.
197201
func (ch *channel) trySend(value unsafe.Pointer) bool {
198202
// To make sure we send values in the correct order, we can only send
199203
// directly to a receiver when there are no values in the buffer.
@@ -232,9 +236,11 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
232236
}
233237

234238
mask := interrupt.Disable()
239+
ch.lock.Lock()
235240

236241
// See whether we can proceed immediately, and if so, return early.
237242
if ch.trySend(value) {
243+
ch.lock.Unlock()
238244
interrupt.Restore(mask)
239245
return
240246
}
@@ -246,9 +252,12 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
246252
op.index = 0
247253
op.value = value
248254
ch.senders.push(op)
255+
ch.lock.Unlock()
249256
interrupt.Restore(mask)
250257

251258
// Wait until this goroutine is resumed.
259+
// It might be resumed after Unlock() and before Pause(). In that case,
260+
// because we use semaphores, the Pause() will continue immediately.
252261
task.Pause()
253262

254263
// Check whether the sent happened normally (not because the channel was
@@ -260,8 +269,8 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
260269
}
261270

262271
// Try to proceed with this receive operation without blocking, and return
263-
// whether the receive operation succeeded. Interrupts must be disabled when
264-
// calling this function.
272+
// whether the receive operation succeeded. Interrupts must be disabled and the
273+
// lock must be held when calling this function.
265274
func (ch *channel) tryRecv(value unsafe.Pointer) (received, ok bool) {
266275
// To make sure we keep the values in the channel in the correct order, we
267276
// first have to read values from the buffer before we can look at the
@@ -305,8 +314,10 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
305314
}
306315

307316
mask := interrupt.Disable()
317+
ch.lock.Lock()
308318

309319
if received, ok := ch.tryRecv(value); received {
320+
ch.lock.Unlock()
310321
interrupt.Restore(mask)
311322
return ok
312323
}
@@ -319,6 +330,7 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
319330
op.task = t
320331
op.index = 0
321332
ch.receivers.push(op)
333+
ch.lock.Unlock()
322334
interrupt.Restore(mask)
323335

324336
// Wait until the goroutine is resumed.
@@ -337,9 +349,11 @@ func chanClose(ch *channel) {
337349
}
338350

339351
mask := interrupt.Disable()
352+
ch.lock.Lock()
340353

341354
if ch.closed {
342355
// Not allowed by the language spec.
356+
ch.lock.Unlock()
343357
interrupt.Restore(mask)
344358
runtimePanic("close of closed channel")
345359
}
@@ -372,14 +386,56 @@ func chanClose(ch *channel) {
372386

373387
ch.closed = true
374388

389+
ch.lock.Unlock()
375390
interrupt.Restore(mask)
376391
}
377392

393+
// We currently use a global select lock to avoid deadlocks while locking each
394+
// individual channel in the select. Without this global lock, two select
395+
// operations that have a different order of the same channels could end up in a
396+
// deadlock. This global lock is inefficient if there are many select operations
397+
// happening in parallel, but gets the job done.
398+
//
399+
// If this becomes a performance issue, we can see how the Go runtime does this.
400+
// I think it does this by sorting all states by channel address and then
401+
// locking them in that order to avoid this deadlock.
402+
var chanSelectLock task.PMutex
403+
404+
// Lock all channels (taking care to skip duplicate channels).
405+
func lockAllStates(states []chanSelectState) {
406+
if !hasParallelism {
407+
return
408+
}
409+
for _, state := range states {
410+
if state.ch != nil && !state.ch.selectLocked {
411+
state.ch.lock.Lock()
412+
state.ch.selectLocked = true
413+
}
414+
}
415+
}
416+
417+
// Unlock all channels (taking care to skip duplicate channels).
418+
func unlockAllStates(states []chanSelectState) {
419+
if !hasParallelism {
420+
return
421+
}
422+
for _, state := range states {
423+
if state.ch != nil && state.ch.selectLocked {
424+
state.ch.lock.Unlock()
425+
state.ch.selectLocked = false
426+
}
427+
}
428+
}
429+
378430
// chanSelect implements blocking or non-blocking select operations.
379431
// The 'ops' slice must be set if (and only if) this is a blocking select.
380432
func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelOp) (uint32, bool) {
381433
mask := interrupt.Disable()
382434

435+
// Lock everything.
436+
chanSelectLock.Lock()
437+
lockAllStates(states)
438+
383439
const selectNoIndex = ^uint32(0)
384440
selectIndex := selectNoIndex
385441
selectOk := true
@@ -411,6 +467,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
411467
// return early.
412468
blocking := len(ops) != 0
413469
if selectIndex != selectNoIndex || !blocking {
470+
unlockAllStates(states)
471+
chanSelectLock.Unlock()
414472
interrupt.Restore(mask)
415473
return selectIndex, selectOk
416474
}
@@ -419,8 +477,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
419477
// become more complicated.
420478
// We add ourselves as a sender/receiver to every channel, and wait for the
421479
// first one to complete. Only one will successfully complete, because
422-
// senders and receivers will check t.Data for the state so that only one
423-
// will be able to "take" this select operation.
480+
// senders and receivers use a compare-and-exchange atomic operation on
481+
// t.Data so that only one will be able to "take" this select operation.
424482
t := task.Current()
425483
t.Ptr = recvbuf
426484
t.SetDataUint32(chanOperationWaiting)
@@ -440,13 +498,17 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
440498
}
441499

442500
// Now we wait until one of the send/receive operations can proceed.
501+
unlockAllStates(states)
502+
chanSelectLock.Unlock()
443503
interrupt.Restore(mask)
444504
task.Pause()
445505

446506
// Resumed, so one channel operation must have progressed.
447507

448508
// Make sure all channel ops are removed from the senders/receivers
449509
// queue before we return and the memory of them becomes invalid.
510+
chanSelectLock.Lock()
511+
lockAllStates(states)
450512
for i, state := range states {
451513
if state.ch == nil {
452514
continue
@@ -460,6 +522,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
460522
}
461523
interrupt.Restore(mask)
462524
}
525+
unlockAllStates(states)
526+
chanSelectLock.Unlock()
463527

464528
// Pull the return values out of t.Data (which contains two bitfields).
465529
selectIndex = t.DataUint32() >> 2

src/runtime/chan2.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//go:build scheduler.threads
1+
//go:build none
22

33
package runtime
44

src/runtime/scheduler_cooperative.go

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
// queue a new scheduler invocation using setTimeout.
2222
const asyncScheduler = GOOS == "js"
2323

24+
const hasParallelism = false
25+
2426
// Queues used by the scheduler.
2527
var (
2628
runqueue task.Queue

src/runtime/scheduler_none.go

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import "internal/task"
66

77
const hasScheduler = false
88

9+
const hasParallelism = false
10+
911
// run is called by the program entry point to execute the go program.
1012
// With the "none" scheduler, init and the main function are invoked directly.
1113
func run() {

src/runtime/scheduler_threads.go

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import "internal/task"
66

77
const hasScheduler = false // not using the cooperative scheduler
88

9+
const hasParallelism = true
10+
911
var (
1012
timerQueueLock task.PMutex
1113
timerQueueStarted bool

0 commit comments

Comments
 (0)