@@ -7,6 +7,7 @@ namespace FSharpPlus
7
7
module Task =
8
8
9
9
open System
10
+ open System.Threading
10
11
open System.Threading .Tasks
11
12
12
13
let private (| Canceled | Faulted | Completed |) ( t : Task < 'a >) =
@@ -135,26 +136,91 @@ module Task =
135
136
tcs.Task
136
137
137
138
/// <summary>Creates a task workflow from two workflows 'x' and 'y', mapping its results with 'f'.</summary>
138
- /// <remarks>Similar to lift2 but although workflows are started in sequence they might end independently in different order.</remarks>
139
- /// <param name="f">The mapping function.</param>
140
- /// <param name="x">First task workflow.</param>
141
- /// <param name="y">Second task workflow.</param>
142
- let map2 f x y = task {
143
- let! x ' = x
144
- let! y ' = y
145
- return f x' y' }
139
+ /// <remarks>Similar to lift2 but although workflows are started in sequence they might end independently in different order
140
+ /// and all errors are collected.
141
+ /// </remarks>
142
+ /// <param name="mapper">The mapping function.</param>
143
+ /// <param name="task1">First task workflow.</param>
144
+ /// <param name="task2">Second task workflow.</param>
145
+ let map2 mapper ( task1 : Task < 'T1 >) ( task2 : Task < 'T2 >) : Task < 'U > =
146
+ if task1.Status = TaskStatus.RanToCompletion && task2.Status = TaskStatus.RanToCompletion then
147
+ try Task.FromResult ( mapper task1.Result task2.Result)
148
+ with e ->
149
+ let tcs = TaskCompletionSource<_> ()
150
+ tcs.SetException e
151
+ tcs.Task
152
+ else
153
+ let tcs = TaskCompletionSource<_> ()
154
+ let r1 = ref Unchecked.defaultof<_>
155
+ let r2 = ref Unchecked.defaultof<_>
156
+ let mutable cancelled = false
157
+ let failures = [| IReadOnlyCollection.empty; IReadOnlyCollection.empty|]
158
+ let pending = ref 2
159
+
160
+ let trySet () =
161
+ if Interlocked.Decrement pending = 0 then
162
+ let noFailures = Array.forall IReadOnlyCollection.isEmpty failures
163
+ if noFailures && not cancelled then
164
+ try tcs.SetResult ( mapper r1.Value r2.Value)
165
+ with e -> tcs.SetException e
166
+ elif noFailures then tcs.SetCanceled ()
167
+ else tcs.SetException ( failures |> Seq.map AggregateException |> Seq.reduce Exception.add) .InnerExceptions
168
+
169
+ let k ( v : ref < 'k >) i t =
170
+ match t with
171
+ | Canceled -> cancelled <- true
172
+ | Faulted e -> failures[ i] <- e.InnerExceptions
173
+ | Completed r -> v.Value <- r
174
+ trySet ()
175
+
176
+ task1.ContinueWith ( k r1 0 ) |> ignore
177
+ task2.ContinueWith ( k r2 1 ) |> ignore
178
+ tcs.Task
146
179
147
180
/// <summary>Creates a task workflow from three workflows 'x', 'y' and z, mapping its results with 'f'.</summary>
148
- /// <remarks>Similar to lift3 but although workflows are started in sequence they might end independently in different order.</remarks>
149
- /// <param name="f">The mapping function.</param>
150
- /// <param name="x">First task workflow.</param>
151
- /// <param name="y">Second task workflow.</param>
152
- /// <param name="z">Third task workflow.</param>
153
- let map3 f x y z = task {
154
- let! x ' = x
155
- let! y ' = y
156
- let! z ' = z
157
- return f x' y' z' }
181
+ /// <remarks>Similar to lift3 but although workflows are started in sequence they might end independently in different order
182
+ /// and all errors are collected.
183
+ /// </remarks>
184
+ /// <param name="mapper">The mapping function.</param>
185
+ /// <param name="task1">First task workflow.</param>
186
+ /// <param name="task2">Second task workflow.</param>
187
+ /// <param name="task3">Third task workflow.</param>
188
+ let map3 mapper ( task1 : Task < 'T1 >) ( task2 : Task < 'T2 >) ( task3 : Task < 'T3 >) : Task < 'U > =
189
+ if task1.Status = TaskStatus.RanToCompletion && task2.Status = TaskStatus.RanToCompletion && task3.Status = TaskStatus.RanToCompletion then
190
+ try Task.FromResult ( mapper task1.Result task2.Result task3.Result)
191
+ with e ->
192
+ let tcs = TaskCompletionSource<_> ()
193
+ tcs.SetException e
194
+ tcs.Task
195
+ else
196
+ let tcs = TaskCompletionSource<_> ()
197
+ let r1 = ref Unchecked.defaultof<_>
198
+ let r2 = ref Unchecked.defaultof<_>
199
+ let r3 = ref Unchecked.defaultof<_>
200
+ let mutable cancelled = false
201
+ let failures = [| IReadOnlyCollection.empty< exn>; IReadOnlyCollection.empty; IReadOnlyCollection.empty|]
202
+ let pending = ref 3
203
+
204
+ let trySet () =
205
+ if Interlocked.Decrement pending = 0 then
206
+ let noFailures = Array.forall isNull failures
207
+ if noFailures && not cancelled then
208
+ try tcs.SetResult ( mapper r1.Value r2.Value r3.Value)
209
+ with e -> tcs.SetException e
210
+ elif noFailures then tcs.SetCanceled ()
211
+ else tcs.SetException ( failures |> Seq.concat |> Seq.fold Exception.add ( AggregateException ())) .InnerExceptions
212
+
213
+ let k ( v : ref < 'k >) i t =
214
+ match t with
215
+ | Canceled -> cancelled <- true
216
+ | Faulted e -> failures[ i] <- e.InnerExceptions
217
+ | Completed r -> v.Value <- r
218
+ trySet ()
219
+
220
+ task1.ContinueWith ( k r1 0 ) |> ignore
221
+ task2.ContinueWith ( k r2 1 ) |> ignore
222
+ task3.ContinueWith ( k r3 2 ) |> ignore
223
+ tcs.Task
158
224
159
225
/// <summary>Creates a task workflow that is the result of applying the resulting function of a task workflow
160
226
/// to the resulting value of another task workflow</summary>
@@ -242,11 +308,16 @@ module Task =
242
308
tcs.Task
243
309
244
310
/// <summary>Creates a task workflow from two workflows 'x' and 'y', tupling its results.</summary>
245
- /// <remarks>Similar to zipSequentially but although workflows are started in sequence they might end independently in different order.</remarks>
246
- let zip x y = task {
247
- let! x ' = x
248
- let! y ' = y
249
- return x', y' }
311
+ /// <remarks>Similar to zipSequentially but although workflows are started in sequence they might end independently in different order
312
+ /// and all errors are collected.
313
+ /// </remarks>
314
+ let zip ( task1 : Task < 'T1 >) ( task2 : Task < 'T2 >) = map2 ( fun x y -> x, y) task1 task2
315
+
316
+ /// <summary>Creates a task workflow from two workflows 'x', 'y' and 'z', tupling its results.</summary>
317
+ /// <remarks>Similar to zipSequentially but although workflows are started in sequence they might end independently in different order
318
+ /// and all errors are collected.
319
+ /// </remarks>
320
+ let zip3 ( task1 : Task < 'T1 >) ( task2 : Task < 'T2 >) ( task3 : Task < 'T3 >) = map3 ( fun x y z -> x, y, z) task1 task2 task3
250
321
251
322
/// Flattens two nested tasks into one.
252
323
let join ( source : Task < Task < 'T >>) : Task < 'T > = source.Unwrap()
0 commit comments