-
-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Description
Version
│ │ │ │ ├── tokio v1.48.0
│ │ │ │ │ └── tokio-macros v2.6.0 (proc-macro)
│ │ │ │ ├── tokio-util v0.7.16
│ │ │ │ │ └── tokio v1.48.0 (*)
│ │ │ ├── tokio v1.48.0 (*)
│ │ │ ├── tokio v1.48.0 (*)
│ │ ├── tokio v1.48.0 (*)
│ │ │ ├── tokio v1.48.0 (*)
│ ├── tokio v1.48.0 (*)
│ │ ├── tokio v1.48.0 (*)
│ │ ├── tokio-native-tls v0.3.1
│ │ │ └── tokio v1.48.0 (*)
│ ├── tokio v1.48.0 (*)
│ ├── tokio-native-tls v0.3.1 (*)
├── tokio v1.48.0 (*)
Platform
Linux [private hostname] 6.14.0-33-generic #33~24.04.1-Ubuntu SMP PREEMPT_DYNAMIC Fri Sep 19 17:02:30 UTC 2 x86_64 x86_64 x86_64 GNU/Linux
Description
When a receiver holds a send permit, e.g. to send the next received item back to the channel in case of a retryable error, other senders are deadlocked, even when the channel has multiple free slots and only one outstanding permit.
I tried this code:
use tokio::sync::mpsc::{OwnedPermit, Receiver, Sender};
pub struct PushbackReceiver<T> {
sender: Sender<T>,
receiver: Receiver<T>,
permit: Option<OwnedPermit<T>>,
}
impl<T> PushbackReceiver<T> {
pub fn new(receiver: Receiver<T>, sender: &Sender<T>) -> Self {
let sender = sender.clone();
let permit = sender.clone().try_reserve_owned().ok();
PushbackReceiver {
receiver,
sender,
permit,
}
}
pub async fn recv(&mut self) -> T {
let result = self.receiver.recv().await.unwrap();
if self.permit.is_none() {
self.permit = self.sender.clone().try_reserve_owned().ok();
}
result
}
pub fn try_recv(&mut self) -> Option<T> {
let result = self.receiver.try_recv().ok()?;
if self.permit.is_none() {
self.permit = self.sender.clone().try_reserve_owned().ok();
}
Some(result)
}
pub fn try_send(&mut self, value: T) -> bool {
if let Some(permit) = self.permit.take() {
permit.send(value);
self.permit = self.sender.clone().try_reserve_owned().ok();
true
} else {
let result = self.sender.try_send(value).is_ok();
if result {
self.permit = self.sender.clone().try_reserve_owned().ok();
}
result
}
}
}
I expected to see this happen: While the receiving thread holds this permit, send() called from another thread does not block unless the number of outstanding permits equals the number of free slots in the channel; and reserve_many() does not block unless it would lead the total number of outstanding permits to exceed the number of free slots. Messages arrive in the order sent, even when it's not the order in which the permits are reserved. For example, the following sequence:
[thread 1] let a = sender.reserve();
[thread 2] let b = sender.reserve_many(2);
[thread 3] let c = sender.reserve();
[thread 4] sender.send("E");
[thread 3] c.send("C");
[thread 1] a.send("A");
[thread 2] b.next().unwrap().send("B");
[thread 2] b.next().unwrap().send("D");
leads to the messages being received in the order "E", "C", "A", "B", "D" with none of the above calls blocking if the channel capacity is 5 or more, and "C", "A", "B", "D", "E" with only thread 4's send blocking if it is exactly 4.
When a slot previously assigned to a permit is filled this way without consuming the permit, the nth outstanding permit is reassigned to the nth free slot for all n.
Instead, this happened: The sending thread became blocked, leading to deadlock. (NB: The sending thread doesn't hold a copy of the PushbackReceiver; instead, it holds the Sender from which its self.sender was cloned.)