Skip to content

Commit 86debba

Browse files
committed
implement linearizable wakeups
1 parent e7b5922 commit 86debba

File tree

5 files changed

+323
-91
lines changed

5 files changed

+323
-91
lines changed

crossbeam-channel/src/flavors/array.rs

Lines changed: 92 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,18 @@ pub(crate) struct Channel<T> {
8787
receivers: SyncWaker,
8888
}
8989

90+
/// The state of the channel after calling `start_recv` or `start_send`.
91+
#[derive(PartialEq, Eq)]
92+
enum Status {
93+
/// The channel is ready to read or write to.
94+
Ready,
95+
/// There is currently a send or receive in progress holding up the queue.
96+
/// All operations must block to preserve linearizability.
97+
InProgress,
98+
/// The channel is empty.
99+
Empty,
100+
}
101+
90102
impl<T> Channel<T> {
91103
/// Creates a bounded channel of capacity `cap`.
92104
pub(crate) fn with_capacity(cap: usize) -> Self {
@@ -135,7 +147,7 @@ impl<T> Channel<T> {
135147
}
136148

137149
/// Attempts to reserve a slot for sending a message.
138-
fn start_send(&self, token: &mut Token) -> bool {
150+
fn start_send(&self, token: &mut Token) -> Status {
139151
let backoff = Backoff::new();
140152
let mut tail = self.tail.load(Ordering::Relaxed);
141153

@@ -144,7 +156,7 @@ impl<T> Channel<T> {
144156
if tail & self.mark_bit != 0 {
145157
token.array.slot = ptr::null();
146158
token.array.stamp = 0;
147-
return true;
159+
return Status::Ready;
148160
}
149161

150162
// Deconstruct the tail.
@@ -179,7 +191,7 @@ impl<T> Channel<T> {
179191
// Prepare the token for the follow-up call to `write`.
180192
token.array.slot = slot as *const Slot<T> as *const u8;
181193
token.array.stamp = tail + 1;
182-
return true;
194+
return Status::Ready;
183195
}
184196
Err(t) => {
185197
tail = t;
@@ -193,7 +205,14 @@ impl<T> Channel<T> {
193205
// If the head lags one lap behind the tail as well...
194206
if head.wrapping_add(self.one_lap) == tail {
195207
// ...then the channel is full.
196-
return false;
208+
return Status::Empty;
209+
}
210+
211+
// The head was advanced but the stamp hasn't been updated yet,
212+
// meaning a receive is in-progress. Spin for a bit waiting for
213+
// the receive to complete before falling back to parking.
214+
if backoff.is_completed() {
215+
return Status::InProgress;
197216
}
198217

199218
backoff.spin();
@@ -225,7 +244,7 @@ impl<T> Channel<T> {
225244
}
226245

227246
/// Attempts to reserve a slot for receiving a message.
228-
fn start_recv(&self, token: &mut Token) -> bool {
247+
fn start_recv(&self, token: &mut Token) -> Status {
229248
let backoff = Backoff::new();
230249
let mut head = self.head.load(Ordering::Relaxed);
231250

@@ -262,7 +281,7 @@ impl<T> Channel<T> {
262281
// Prepare the token for the follow-up call to `read`.
263282
token.array.slot = slot as *const Slot<T> as *const u8;
264283
token.array.stamp = head.wrapping_add(self.one_lap);
265-
return true;
284+
return Status::Ready;
266285
}
267286
Err(h) => {
268287
head = h;
@@ -280,13 +299,20 @@ impl<T> Channel<T> {
280299
// ...then receive an error.
281300
token.array.slot = ptr::null();
282301
token.array.stamp = 0;
283-
return true;
302+
return Status::Ready;
284303
} else {
285304
// Otherwise, the receive operation is not ready.
286-
return false;
305+
return Status::Empty;
287306
}
288307
}
289308

309+
// The tail was advanced but the stamp hasn't been updated yet,
310+
// meaning a send is in-progress. Spin for a bit waiting for
311+
// the send to complete before falling back to parking.
312+
if backoff.is_completed() {
313+
return Status::InProgress;
314+
}
315+
290316
backoff.spin();
291317
head = self.head.load(Ordering::Relaxed);
292318
} else {
@@ -317,11 +343,13 @@ impl<T> Channel<T> {
317343

318344
/// Attempts to send a message into the channel.
319345
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
320-
let token = &mut Token::default();
321-
if self.start_send(token) {
322-
unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
323-
} else {
324-
Err(TrySendError::Full(msg))
346+
match self.send_blocking(msg, None, false) {
347+
Ok(None) => Ok(()),
348+
Ok(Some(msg)) => Err(TrySendError::Full(msg)),
349+
Err(SendTimeoutError::Disconnected(msg)) => Err(TrySendError::Disconnected(msg)),
350+
Err(SendTimeoutError::Timeout(_)) => {
351+
unreachable!("called recv_blocking with deadline: None")
352+
}
325353
}
326354
}
327355

@@ -331,14 +359,30 @@ impl<T> Channel<T> {
331359
msg: T,
332360
deadline: Option<Instant>,
333361
) -> Result<(), SendTimeoutError<T>> {
362+
self.send_blocking(msg, deadline, true)
363+
.map(|value| assert!(value.is_none(), "called send_blocking with block: true"))
364+
}
365+
366+
/// Sends a message into the channel.
367+
pub(crate) fn send_blocking(
368+
&self,
369+
msg: T,
370+
deadline: Option<Instant>,
371+
block: bool,
372+
) -> Result<Option<T>, SendTimeoutError<T>> {
334373
let token = &mut Token::default();
374+
let mut state = self.senders.start();
335375
loop {
336376
// Try sending a message several times.
337377
let backoff = Backoff::new();
338378
loop {
339-
if self.start_send(token) {
340-
let res = unsafe { self.write(token, msg) };
341-
return res.map_err(SendTimeoutError::Disconnected);
379+
match self.start_send(token) {
380+
Status::Ready => {
381+
let res = unsafe { self.write(token, msg) };
382+
return res.map(|_| None).map_err(SendTimeoutError::Disconnected);
383+
}
384+
Status::Empty if !block => return Ok(Some(msg)),
385+
_ => {}
342386
}
343387

344388
if backoff.is_completed() {
@@ -357,7 +401,7 @@ impl<T> Channel<T> {
357401
Context::with(|cx| {
358402
// Prepare for blocking until a receiver wakes us up.
359403
let oper = Operation::hook(token);
360-
self.senders.register(oper, cx);
404+
self.senders.register2(oper, cx, &state);
361405

362406
// Has the channel become ready just now?
363407
if !self.is_full() || self.is_disconnected() {
@@ -375,30 +419,47 @@ impl<T> Channel<T> {
375419
Selected::Operation(_) => {}
376420
}
377421
});
422+
423+
state.unpark();
378424
}
379425
}
380426

381427
/// Attempts to receive a message without blocking.
382428
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
383-
let token = &mut Token::default();
384-
385-
if self.start_recv(token) {
386-
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
387-
} else {
388-
Err(TryRecvError::Empty)
429+
match self.recv_blocking(None, false) {
430+
Ok(Some(value)) => Ok(value),
431+
Ok(None) => Err(TryRecvError::Empty),
432+
Err(RecvTimeoutError::Disconnected) => Err(TryRecvError::Disconnected),
433+
Err(RecvTimeoutError::Timeout) => {
434+
unreachable!("called recv_blocking with deadline: None")
435+
}
389436
}
390437
}
391438

392439
/// Receives a message from the channel.
393440
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
441+
self.recv_blocking(deadline, true)
442+
.map(|value| value.expect("called recv_blocking with block: true"))
443+
}
444+
445+
pub(crate) fn recv_blocking(
446+
&self,
447+
deadline: Option<Instant>,
448+
block: bool,
449+
) -> Result<Option<T>, RecvTimeoutError> {
394450
let token = &mut Token::default();
451+
let mut state = self.receivers.start();
395452
loop {
396453
// Try receiving a message several times.
397454
let backoff = Backoff::new();
398455
loop {
399-
if self.start_recv(token) {
400-
let res = unsafe { self.read(token) };
401-
return res.map_err(|_| RecvTimeoutError::Disconnected);
456+
match self.start_recv(token) {
457+
Status::Ready => {
458+
let res = unsafe { self.read(token) };
459+
return res.map(Some).map_err(|_| RecvTimeoutError::Disconnected);
460+
}
461+
Status::Empty if !block => return Ok(None),
462+
_ => {}
402463
}
403464

404465
if backoff.is_completed() {
@@ -417,7 +478,7 @@ impl<T> Channel<T> {
417478
Context::with(|cx| {
418479
// Prepare for blocking until a sender wakes us up.
419480
let oper = Operation::hook(token);
420-
self.receivers.register(oper, cx);
481+
self.receivers.register2(oper, cx, &mut state);
421482

422483
// Has the channel become ready just now?
423484
if !self.is_empty() || self.is_disconnected() {
@@ -437,6 +498,8 @@ impl<T> Channel<T> {
437498
Selected::Operation(_) => {}
438499
}
439500
});
501+
502+
state.unpark();
440503
}
441504
}
442505

@@ -568,7 +631,7 @@ pub(crate) struct Sender<'a, T>(&'a Channel<T>);
568631

569632
impl<T> SelectHandle for Receiver<'_, T> {
570633
fn try_select(&self, token: &mut Token) -> bool {
571-
self.0.start_recv(token)
634+
self.0.start_recv(token) == Status::Ready
572635
}
573636

574637
fn deadline(&self) -> Option<Instant> {
@@ -604,7 +667,7 @@ impl<T> SelectHandle for Receiver<'_, T> {
604667

605668
impl<T> SelectHandle for Sender<'_, T> {
606669
fn try_select(&self, token: &mut Token) -> bool {
607-
self.0.start_send(token)
670+
self.0.start_send(token) == Status::Ready
608671
}
609672

610673
fn deadline(&self) -> Option<Instant> {

0 commit comments

Comments
 (0)