Conversation
439ddf1 to
1fd6d5f
Compare
When an async function does:
```
async f(&mut self) {
tokio::select! {
v = self.subscriber().next => { /* do something with v */ }
_ = std::future::ready() => {},
}
}
```
then the future returned by `self.subscriber().next` is cancelled, but
the observed object stilled referenced the waker, preventing the future
(consequently, the function's closure) from being dropped even though
it won't be scheduled again.
This change is twofold:
1. `ObservableState` is now handed `Weak` references, so it does not
keep futures alive, and a strong reference is kept by whichever
object is held by the future awaiting it (`Subscriber` or `Next`)
2. `ObservableState` garbage-collects weak references from time to time,
so its own vector of wakers does not grow unbounded
1fd6d5f to
c04b922
Compare
|
Hey, thanks for this PR! Is this pattern used in other channel implementations? I'm surprised something like this is necessary. |
|
The only other channel I'm familiar with is |
|
Hm, that does not make sense to me, how is that equivalent. All wakers in eyeball are already consumed when "sending" (setting a new value on the observable), even without this change. |
|
Without this change, they are consumed only when sending. So if we never send (or take a long time between sends) then wakers accumulate. |
|
Yes, but it sounds like tokio's mpsc suffers from the same problem then? |
|
hmm okay actually, and a (sorted) linked list recording the position and waker of each receiver: and when a receiver is dropped, it removes itself from the list, including the waker: (it's not clear to me why the receiver being dropped is always at the tail of the list; i guess what it calls "tail" is actually a pointer to the link that was the tail at the time the receiver was created and the list is actually doubly-linked) |
|
Alternatively, we could probably remove all the manual waker management from |
|
eyeball was originally implemented on top of tokio, but that was limiting in a number of ways. I guess we should probably do something like tokio's waker list, but I'm not sure if there are any crates out that that implement this, and it feels a bit wrong to put it right in |
When an async function does:
then the future returned by
self.subscriber().nextis cancelled, but the observed object stilled referenced the waker, preventing the future (consequently, the function's closure) from being dropped even though it won't be scheduled again.This change is twofold:
ObservableStateis now handedWeakreferences, so it does not keep futures alive, and a strong reference is kept by whichever object is held by the future awaiting it (SubscriberorNext)ObservableStategarbage-collects weak references from time to time, so its own vector of wakers does not grow unboundedMany thanks for having this log line:
eyeball/eyeball/src/state.rs
Line 122 in 7ce1b78
I spent my day trying to figure why my app was leaking 1MB/s and
Waking up XXXX waiting subscribersin my logs made me figure it out :)