-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Request new function #31
Comments
How about AsyncSeq.mergeAll? |
The following should work, but isn't ideal: let parallelize (parallelism:int) (s:seq<Async<'a>>) : Async<seq<'a>> = async {
let buffer = new Collections.Concurrent.BlockingCollection<_>(parallelism)
let cts = new CancellationTokenSource()
try
do! Async.ParallelIgnore cts.Token parallelism (s |> Seq.map (Async.map buffer.Add))
finally
buffer.CompleteAdding()
return seq {
use buffer = buffer
use cts = cts
use _cancel = { new IDisposable with member __.Dispose() = cts.Cancel() }
yield! buffer.GetConsumingEnumerable(cts.Token) } } This will run up to Implementation of static member ParallelIgnore (ct:CancellationToken) (parallelism:int) (xs:seq<Async<_>>) = async {
let sm = new SemaphoreSlim(parallelism)
let cde = new CountdownEvent(1)
let tcs = new TaskCompletionSource<unit>()
ct.Register(Action(fun () -> tcs.TrySetCanceled() |> ignore)) |> ignore
let inline tryComplete () =
if cde.Signal() then
tcs.SetResult(())
let inline ok _ =
sm.Release() |> ignore
tryComplete ()
let inline err (ex:exn) =
tcs.TrySetException ex |> ignore
sm.Release() |> ignore
let inline cnc (ex:OperationCanceledException) =
tcs.TrySetCanceled() |> ignore
sm.Release() |> ignore
try
use en = xs.GetEnumerator()
while not (tcs.Task.IsCompleted) && en.MoveNext() do
sm.Wait()
cde.AddCount(1)
Async.StartWithContinuations(en.Current, ok, err, cnc, ct)
tryComplete ()
do! tcs.Task |> Async.AwaitTask
finally
cde.Dispose()
sm.Dispose() } Use of type private EagerSeq<'a> = TaskCompletionSource<EagerCons<'a>>
and private EagerCons<'a> =
| EagerCons of 'a * EagerSeq<'a>
| EagerNil
let rec private ofEager (e:EagerSeq<'a>) : AsyncSeq<'a> =
async.Delay <| fun () ->
e.Task
|> Async.AwaitTask
|> Async.map (function
| EagerNil -> Nil
| EagerCons(a,tl) -> Cons(a, ofEager tl)) |
Hey @xkrt, following up on this and seeing if your request can be addressed. There is a new function in master (not yet released): Alternatively, we can get functions of this type: val throttle : parallelism:int -> ('a -> Async<'b>) -> ('a -> Async<'b>)
val mapAsyncParallelUnordered:
f:('a -> Async<'b>) ->
AsyncSeq<'a> ->
AsyncSeq<'b> This function would be similar to |
I needed a |
Hi!
In my work I sometimes need a function that takes asyncs and perform them in parallel, but with simultaneously performed asyncs count lesser than count of overall asyncs passed to function, and return results of asyncs as AsyncSeq in order to consumer can process the results as soon as they appear.
Maybe my explanation confusing, I will try express it in the code:
Here is my attempt to do so:
There I forced to wait results of all asyncs in batch, but I want to recieve results as them appears.
In past I use MailboxProcessor's for this, one agent distrubutes tasks to a few worker agents.
The text was updated successfully, but these errors were encountered: