-
Notifications
You must be signed in to change notification settings - Fork 717
feat: async broadcast, stream map and async changes #9935
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: async broadcast, stream map and async changes #9935
Conversation
…ia/async-changes
…ia/async-changes
…ia/async-changes
|
Mathlib CI status (docs):
|
|
Reference manual CI status:
|
TwoFX
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't looked at Broadcast, CancellationToken and StreamMap yet. All of them are missing tests, please add them.
|
|
||
| Async.ofPromise (pure promise) | ||
|
|
||
| def Selectable.combine (selectables : Array (Selectable α)) : Selector α := { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a docstring.
|
|
||
| Async.ofPromise (pure promise) | ||
|
|
||
| def Selectable.combine (selectables : Array (Selectable α)) : Selector α := { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also (perhaps a docstring would clear this up), It's not really clear to me when you would use this over Selectable.one, and also it seems like this implementation doesn't have the fairness properties of Selectable.one because it does not shuffle the selectables. Is this by design?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to add the fairness property (do we need the overhead of the property, I think that tokio::select! does not do that)
Regarding using it instead of one, when I was coding the StreamMap, I needed some way to recv from multiple channels at the same time so I used to use Selectable.one for the StreamMap.recv function. However, I also need to expose a function that returns a Selector itself in a function like StreamMap.recvSelector itself, so that while I can receive from multiple maps, I can also use this operation in another Selectable for example with a promise to cancel the entire operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio::select! is fair: https://docs.rs/tokio/latest/tokio/macro.select.html#fairness, this is one of the key requirements when building such a primitive right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wow, I misremembered. I thought it wasn’t like that.
| public section | ||
|
|
||
| /-! | ||
| This module contains the implementation of `Std.Notify`. `Std.Notify` provides a lightweight |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this differ from Std.Condvar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Std.Condvar depends on a Std.Mutex and blocks the entire thread that the Task is using while waiting. If I try to use it with async and a lot of Tasks like this:
def condvar : Async Unit := do
let condvar ← Std.Condvar.new
let mutex ← Std.Mutex.new false
for i in [0:threads] do
background do
IO.println s!"start {i + 1}"
await =<< (show IO (ETask _ _) from IO.asTask (mutex.atomically (condvar.wait mutex)))
IO.println s!"end {i + 1}"
IO.sleep 2000
condvar.notifyAllIt causes some weird behavior because some tasks start running and get notified, while others don’t, because condvar.wait blocks the Task entire task and right now afaik it blocks an entire thread and cannot be paused while doing blocking operations like that.
Notify uses Promises so it’s better suited for concurrency. The Task is not blocked while waiting for a notification which makes it simpler for use cases that just involve notifying:
def notify : Async Unit := do
let notify ← Std.Notify.new
for i in [0:threads] do
background do
IO.println s!"start {i}"
notify.wait
IO.println s!"end {i}"
IO.sleep 2000
notify.notify
This PR makes a lot of changes to the Async library:
Std.Sync.Broadcaststructure that is a multi-producer and multi-consumer broadcast queue. Right now, only theBoundedversion is defined because of the ring buffer used to store messages. Broadcast channels can grow very quickly in size since all subscribers need to read each message before it can be dequeued.Asyncmodule.Std.Sync.Notifystructure that can send to one or multiple consumers a signal.Std.Sync.CancellationTokenstructure that can send signals to tokens in a tree, so they can be used to cancel. It's a nice way to create structured concurrency.send.