Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions src/protocol/notification/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,24 +203,21 @@ impl Stream for Connection {
loop {
let notification = match this.next_notification.take() {
Some(notification) => Some(notification),
None => {
let future = async {
tokio::select! {
notification = this.async_rx.recv() => notification,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.rs/tokio/1.49.0/tokio/sync/mpsc/struct.Receiver.html#cancel-safety

But this is cancel safe. So, I don't get your argument?

The wake up call should still be registered and the entire future be called when there was an event?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, the receivers should be cancel safe. The issue here is with the let future = async { }. The context waker is registered by the inner recv calls inside the temporary future. The future would later on be dropped if future.poll_unpin returns Poll::Pending. Then, when the sync_rx got a new notification, it would wake the waker corresponding to the dropped future, causing the poll_next to stall.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, when the sync_rx got a new notification, it would wake the waker corresponding to the dropped future, causing the poll_next to stall.

But the waker is for the entire task and not just the future. So, the waker just wakes up the entire task and not some particular future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've dug a bit into tokio to figure this out, indeed I'm mistaken with the "stalled connection" because I assumed recv worked similarly to reserve (the initial issue we noticed in webrtc):

  • The tokio's bounded Receiver uses a waiting list of "context waker" via a wrapper over Semaphore implementation

  • I assumed that sync_rx.recv() would call into the semaphore acquire or similar to place the context waker into the linked list (obtaining an Acquire)

    • Because the sync_rx.recv() future would get dropped immediately, the waker would be removed on Drop from the linked list
    • When the notification is received, there would be no registered waker in the list

However, the semaphore is only used for capacity. When we call into recv, the Receiver stores the waker into a separate variable:

    /// Receiver waker. Notified when a value is pushed into the channel.
    rx_waker: CachePadded<AtomicWaker>,


fn recv
   self.inner.rx_waker.register_by_ref(cx.waker());

So regardless if the temporary future gets dropped, we'll still wake the proper waker under the hood. This PR justs turns into a tiny optimization to not create and drop a dedicated async block :D

notification = this.sync_rx.recv() => notification,
}
};
futures::pin_mut!(future);

match future.poll_unpin(cx) {
Poll::Pending => None,
None => match this.sync_rx.poll_recv(cx) {
Poll::Ready(Some(notification)) => Some(notification),
Poll::Ready(None) =>
return Poll::Ready(Some(ConnectionEvent::CloseConnection {
notify: NotifyProtocol::Yes,
})),
Poll::Pending => match this.async_rx.poll_recv(cx) {
Poll::Ready(Some(notification)) => Some(notification),
Poll::Ready(None) =>
return Poll::Ready(Some(ConnectionEvent::CloseConnection {
notify: NotifyProtocol::Yes,
})),
Poll::Ready(Some(notification)) => Some(notification),
}
}
Poll::Pending => None,
},
},
};

let Some(notification) = notification else {
Expand Down
Loading