Skip to content

Commit 1a2a4be

Browse files
authored
IAsyncEnumerable support (#37)
* Remove altcover It can't cover inline code which this library is all inline code * Add IAsyncEnumerable to TaskBaseBuilders * Add IAsyncEnumerable to CancellableTaskBaseBuilders * AsyncEx IAsyncEnumerable support * Docs
1 parent 27e4e6f commit 1a2a4be

21 files changed

+872
-34
lines changed

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,24 @@ AsyncEx is similar to Async except in the following ways:
119119
}
120120
```
121121
122+
4. Use [IAsyncEnumerable](https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerable-1?view=net-8.0) with `for` keyword. This example uses [TaskSeq](https://github.com/fsprojects/FSharp.Control.TaskSeq) but you can use any `IAsyncEnumerable<T>`.
123+
124+
125+
```fsharp
126+
open IcedTasks
127+
open FSharp.Control
128+
let myAsyncEx = asyncEx {
129+
let items = taskSeq { // IAsyncEnumerable<T>
130+
yield 42
131+
do! Task.Delay(100)
132+
yield 1701
133+
}
134+
let mutable sum = 0
135+
for i in items do
136+
sum <- sum + i
137+
return sum
138+
}
139+
```
122140
123141
### For [ValueTasks](https://devblogs.microsoft.com/dotnet/understanding-the-whys-whats-and-whens-of-valuetask/)
124142

docsSrc/index.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,24 @@ AsyncEx is similar to Async except in the following ways:
9090
}
9191
```
9292
93+
4. Use [IAsyncEnumerable](https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerable-1?view=net-8.0) with `for` keyword. This example uses [TaskSeq](https://github.com/fsprojects/FSharp.Control.TaskSeq) but you can use any `IAsyncEnumerable<T>`.
94+
95+
96+
```fsharp
97+
open IcedTasks
98+
open FSharp.Control
99+
let myAsyncEx = asyncEx {
100+
let items = taskSeq { // IAsyncEnumerable<T>
101+
yield 42
102+
do! Task.Delay(100)
103+
yield 1701
104+
}
105+
let mutable sum = 0
106+
for i in items do
107+
sum <- sum + i
108+
return sum
109+
}
110+
```
93111
94112
### For [ValueTasks](https://devblogs.microsoft.com/dotnet/understanding-the-whys-whats-and-whens-of-valuetask/)
95113
@@ -110,6 +128,7 @@ let myValueTask = task {
110128
- You want to be able to re-run these executable tasks
111129
- You don't want to pollute your methods/functions with extra CancellationToken parameters
112130
- You want the computation to handle checking cancellation before every bind.
131+
- You want to use the `for` keyword with `IAsyncEnumerable<T>`.
113132

114133

115134
### ColdTask

paket.dependencies

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ group Test
1818
nuget YoloDev.Expecto.TestSdk >= 0.14.2
1919
nuget Microsoft.NET.Test.Sdk >= 17.7.2
2020
nuget altcover >= 8.6.68
21+
nuget FSharp.Control.TaskSeq
2122

2223
// [ FAKE GROUP ]
2324
group Build

paket.lock

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -365,10 +365,12 @@ NUGET
365365
Expecto (10.1)
366366
FSharp.Core (>= 7.0.200) - restriction: >= net6.0
367367
Mono.Cecil (>= 0.11.4 < 1.0) - restriction: >= net6.0
368-
FSharp.Core (7.0.401) - restriction: >= net6.0
368+
FSharp.Control.TaskSeq (0.3)
369+
FSharp.Core (>= 6.0.2) - restriction: >= netstandard2.1
370+
FSharp.Core (7.0.401) - restriction: >= netstandard2.1
369371
Microsoft.Bcl.AsyncInterfaces (7.0) - restriction: || (&& (>= net462) (>= net6.0)) (&& (>= net462) (>= netstandard2.0)) (&& (>= net6.0) (< net8.0)) (&& (< net6.0) (>= netstandard2.0))
370372
System.Threading.Tasks.Extensions (>= 4.5.4) - restriction: || (>= net462) (&& (>= netstandard2.0) (< netstandard2.1))
371-
Microsoft.Bcl.TimeProvider (8.0.0-rc.2.23479.6) - restriction: || (&& (>= net6.0) (< net8.0)) (&& (< net6.0) (>= netstandard2.0))
373+
Microsoft.Bcl.TimeProvider (8.0) - restriction: || (&& (>= net6.0) (< net8.0)) (&& (< net6.0) (>= netstandard2.0))
372374
Microsoft.Bcl.AsyncInterfaces (>= 6.0) - restriction: || (>= net462) (&& (< net8.0) (>= netstandard2.0))
373375
System.ValueTuple (>= 4.5) - restriction: >= net462
374376
Microsoft.CodeCoverage (17.7.2) - restriction: || (>= net462) (>= netcoreapp3.1)
@@ -397,8 +399,8 @@ NUGET
397399
System.Threading.Tasks.Extensions (4.5.4) - restriction: || (&& (>= net462) (>= net6.0)) (&& (>= net462) (>= netstandard2.0)) (&& (>= net6.0) (< netstandard2.1)) (&& (>= netstandard2.0) (< netstandard2.1))
398400
System.Runtime.CompilerServices.Unsafe (>= 4.5.3) - restriction: || (&& (< monoandroid) (< monotouch) (< net45) (>= netstandard1.0) (< netstandard2.0) (< win8) (< wpa81) (< xamarintvos) (< xamarinwatchos)) (&& (< monoandroid) (< netstandard1.0) (>= portable-net45+win8+wp8+wpa81) (< win8)) (&& (>= net45) (< netstandard2.0)) (&& (< net45) (< netcoreapp2.1) (>= netstandard2.0) (< xamarinios) (< xamarinmac) (< xamarintvos) (< xamarinwatchos)) (>= net461) (&& (< netstandard1.0) (>= win8)) (&& (< netstandard2.0) (>= wpa81)) (>= wp8)
399401
System.ValueTuple (4.5) - restriction: || (&& (>= net462) (>= net6.0)) (&& (>= net462) (>= netstandard2.0))
400-
TimeProviderExtensions (1.0.0-rc.2)
401-
Microsoft.Bcl.TimeProvider (>= 8.0.0-rc.1.23419.4) - restriction: || (&& (>= net6.0) (< net8.0)) (&& (< net6.0) (>= netstandard2.0))
402+
TimeProviderExtensions (1.0)
403+
Microsoft.Bcl.TimeProvider (>= 8.0) - restriction: || (&& (>= net6.0) (< net8.0)) (&& (< net6.0) (>= netstandard2.0))
402404
YoloDev.Expecto.TestSdk (0.14.2)
403405
Expecto (>= 10.0 < 11.0) - restriction: >= net6.0
404406
FSharp.Core (>= 7.0.200) - restriction: >= net6.0

src/IcedTasks/AsyncEx.fs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ open System
44
open System.Threading
55
open System.Threading.Tasks
66
open System.Runtime.ExceptionServices
7+
open System.Collections.Generic
78

89
type private Async =
910
static member inline map f x =
@@ -303,13 +304,47 @@ type AsyncExBuilder() =
303304
)
304305
)
305306

307+
308+
member inline internal _.WhileAsync(guard: Async<bool>, computation: Async<unit>) =
309+
async {
310+
let mutable keepGoing = true
311+
312+
while keepGoing do
313+
let! guardResult = guard
314+
if guardResult then do! computation else keepGoing <- false
315+
}
316+
317+
member inline this.For
318+
(
319+
sequence: IAsyncEnumerable<'e>,
320+
[<InlineIfLambda>] body: 'e -> Async<unit>
321+
) =
322+
this.Bind(
323+
Async.CancellationToken,
324+
fun (ct: CancellationToken) ->
325+
this.Using(
326+
sequence.GetAsyncEnumerator ct,
327+
(fun enumerator ->
328+
this.WhileAsync(
329+
(this.Delay(fun () ->
330+
enumerator.MoveNextAsync()
331+
|> AsyncEx.AwaitValueTask
332+
)),
333+
(body enumerator.Current)
334+
)
335+
336+
)
337+
)
338+
)
339+
306340
#endif
307341
member inline _.While([<InlineIfLambda>] guard: unit -> bool, computation: Async<unit>) =
308342
async.While(guard, computation)
309343

310344
member inline _.For(sequence: seq<'e>, [<InlineIfLambda>] body: 'e -> Async<unit>) =
311345
async.For(sequence, body)
312346

347+
313348
member inline _.Combine(computation1, computation2) =
314349
async.Combine(computation1, computation2)
315350

@@ -335,6 +370,8 @@ module AsyncExExtensionsLowPriority =
335370

336371
type AsyncExBuilder with
337372

373+
member inline _.Source(seq: #seq<_>) = seq
374+
338375
member inline _.Using(resource: #IDisposable, [<InlineIfLambda>] binder) =
339376
async.Using(resource, binder)
340377

@@ -361,6 +398,7 @@ module AsyncExExtensionsLowPriority =
361398
/// <item><description>Allows <c>use</c> on <see cref="T:System.IAsyncDisposable">System.IAsyncDisposable</see></description></item>
362399
/// <item><description>Allows <c>let!</c> for Tasks, ValueTasks, and any Awaitable Type</description></item>
363400
/// <item><description>When Tasks throw exceptions they will use the behavior described in <see href="https://github.com/fsharp/fslang-suggestions/issues/840">Async.Await overload (esp. AwaitTask without throwing AggregateException)</see></description></item>
401+
/// <item><description>Allow <c>for</c> on <see cref="T:System.Collections.Generic.IAsyncEnumerable`1">System.Collections.Generic.IAsyncDisposable</see></description></item>
364402
/// </list>
365403
///
366404
/// </remarks>
@@ -372,8 +410,9 @@ module AsyncExExtensionsHighPriority =
372410

373411
type AsyncExBuilder with
374412

375-
member inline _.Source(seq: #seq<_>) = seq
376-
413+
#if NETSTANDARD2_1 || NET6_0_OR_GREATER
414+
member inline _.Source(seq: #IAsyncEnumerable<_>) = seq
415+
#endif
377416
// Required because SRTP can't determine the type of the awaiter
378417
// Candidates:
379418
// - Task.GetAwaiter() : Runtime.CompilerServices.TaskAwaiter
@@ -403,6 +442,7 @@ module PolyfillBuilders =
403442
/// <item><description>Allows <c>use</c> on <see cref="T:System.IAsyncDisposable">System.IAsyncDisposable</see></description></item>
404443
/// <item><description>Allows <c>let!</c> for Tasks, ValueTasks, and any Awaitable Type</description></item>
405444
/// <item><description>When Tasks throw exceptions they will use the behavior described in <see href="https://github.com/fsharp/fslang-suggestions/issues/840">Async.Await overload (esp. AwaitTask without throwing AggregateException)</see></description></item>
445+
/// <item><description>Allow <c>for</c> on <see cref="T:System.Collections.Generic.IAsyncEnumerable`1">System.Collections.Generic.IAsyncDisposable</see></description></item>
406446
/// </list>
407447
///
408448
/// </remarks>

src/IcedTasks/CancellableTaskBuilderBase.fs

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ module CancellableTaskBase =
1212
open Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers
1313
open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators
1414
open Microsoft.FSharp.Collections
15+
open System.Collections.Generic
1516

1617
/// The extra data stored in ResumableStateMachine for tasks
1718
[<Struct; NoComparison; NoEquality>]
@@ -291,6 +292,93 @@ module CancellableTaskBase =
291292
|> Awaitable.GetAwaiter
292293
)
293294
)
295+
296+
297+
member inline internal _.WhileAsync
298+
(
299+
[<InlineIfLambda>] condition,
300+
body: CancellableTaskBaseCode<_, unit, 'Builder>
301+
) : CancellableTaskBaseCode<_, unit, 'Builder> =
302+
let mutable condition_res = true
303+
304+
ResumableCode.While(
305+
(fun () -> condition_res),
306+
CancellableTaskBaseCode<_, unit, 'Builder>(fun sm ->
307+
if __useResumableCode then
308+
309+
let mutable __stack_condition_fin = true
310+
let mutable awaiter = condition ()
311+
312+
if Awaiter.IsCompleted awaiter then
313+
314+
__stack_condition_fin <- true
315+
316+
condition_res <- Awaiter.GetResult awaiter
317+
else
318+
319+
// This will yield with __stack_fin = false
320+
// This will resume with __stack_fin = true
321+
let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm)
322+
__stack_condition_fin <- __stack_yield_fin
323+
324+
if __stack_condition_fin then
325+
condition_res <- Awaiter.GetResult awaiter
326+
327+
328+
if __stack_condition_fin then
329+
330+
if condition_res then body.Invoke(&sm) else true
331+
else
332+
let mutable awaiter = awaiter :> ICriticalNotifyCompletion
333+
334+
MethodBuilder.AwaitUnsafeOnCompleted(
335+
&sm.Data.MethodBuilder,
336+
&awaiter,
337+
&sm
338+
)
339+
340+
false
341+
else
342+
343+
let mutable awaiter = condition ()
344+
345+
let cont =
346+
CancellableTaskBaseResumptionFunc<'TOverall, 'Builder>(fun sm ->
347+
condition_res <- Awaiter.GetResult awaiter
348+
if condition_res then body.Invoke(&sm) else true
349+
)
350+
351+
if Awaiter.IsCompleted awaiter then
352+
cont.Invoke(&sm)
353+
else
354+
sm.ResumptionDynamicInfo.ResumptionData <-
355+
(awaiter :> ICriticalNotifyCompletion)
356+
357+
sm.ResumptionDynamicInfo.ResumptionFunc <- cont
358+
false
359+
)
360+
)
361+
362+
member inline this.For
363+
(
364+
source: #IAsyncEnumerable<'T>,
365+
body: 'T -> CancellableTaskBaseCode<_, unit, 'Builder>
366+
) : CancellableTaskBaseCode<_, _, 'Builder> =
367+
368+
CancellableTaskBaseCode<_, _, 'Builder>(fun sm ->
369+
this
370+
.Using(
371+
source.GetAsyncEnumerator sm.Data.CancellationToken,
372+
(fun (e: IAsyncEnumerator<'T>) ->
373+
this.WhileAsync(
374+
(fun () -> Awaitable.GetAwaiter(e.MoveNextAsync())),
375+
(fun sm -> (body e.Current).Invoke(&sm))
376+
)
377+
)
378+
379+
)
380+
.Invoke(&sm)
381+
)
294382
#endif
295383

296384

@@ -595,6 +683,15 @@ module CancellableTaskBase =
595683
) =
596684
ResumableCode.Using(resource, binder)
597685

686+
687+
/// <summary>Allows the computation expression to turn other types into other types</summary>
688+
///
689+
/// <remarks>This is the identify function for For binds.</remarks>
690+
///
691+
/// <returns>IEnumerable</returns>
692+
member inline _.Source(s: #seq<_>) : #seq<_> = s
693+
694+
598695
/// <exclude/>
599696
[<AutoOpen>]
600697
module HighPriority =
@@ -667,13 +764,14 @@ module CancellableTaskBase =
667764
// High priority extensions
668765
type CancellableTaskBuilderBase with
669766

670-
767+
#if NETSTANDARD2_1 || NET6_0_OR_GREATER
671768
/// <summary>Allows the computation expression to turn other types into other types</summary>
672769
///
673770
/// <remarks>This is the identify function for For binds.</remarks>
674771
///
675772
/// <returns>IEnumerable</returns>
676-
member inline _.Source(s: #seq<_>) : #seq<_> = s
773+
member inline _.Source(s: #IAsyncEnumerable<_>) = s
774+
#endif
677775

678776
/// <summary>Allows the computation expression to turn other types into CancellationToken -> 'Awaiter</summary>
679777
///

0 commit comments

Comments
 (0)