Skip to content

Commit 7d6fd57

Browse files
committed
Proper Task.zip implementation
1 parent 50b1b7b commit 7d6fd57

File tree

2 files changed

+147
-24
lines changed

2 files changed

+147
-24
lines changed

src/FSharpPlus/Extensions/Task.fs

+103-23
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ namespace FSharpPlus
77
module Task =
88

99
open System
10+
open System.Threading
1011
open System.Threading.Tasks
1112

1213
let private (|Canceled|Faulted|Completed|) (t: Task<'a>) =
@@ -135,26 +136,100 @@ module Task =
135136
tcs.Task
136137

137138
/// <summary>Creates a task workflow from two workflows 'x' and 'y', mapping its results with 'f'.</summary>
138-
/// <remarks>Similar to lift2 but although workflows are started in sequence they might end independently in different order.</remarks>
139-
/// <param name="f">The mapping function.</param>
140-
/// <param name="x">First task workflow.</param>
141-
/// <param name="y">Second task workflow.</param>
142-
let map2 f x y = task {
143-
let! x' = x
144-
let! y' = y
145-
return f x' y' }
139+
/// <remarks>Similar to lift2 but although workflows are started in sequence they might end independently in different order
140+
/// and all errors are collected.
141+
/// </remarks>
142+
/// <param name="mapper">The mapping function.</param>
143+
/// <param name="task1">First task workflow.</param>
144+
/// <param name="task2">Second task workflow.</param>
145+
let map2 mapper (task1: Task<'T1>) (task2: Task<'T2>) : Task<'U> =
146+
if task1.Status = TaskStatus.RanToCompletion && task2.Status = TaskStatus.RanToCompletion then
147+
try Task.FromResult (mapper task1.Result task2.Result)
148+
with e ->
149+
let tcs = TaskCompletionSource<_> ()
150+
tcs.SetException e
151+
tcs.Task
152+
else
153+
let tcs = TaskCompletionSource<_> ()
154+
let r1 = ref Unchecked.defaultof<_>
155+
let r2 = ref Unchecked.defaultof<_>
156+
let mutable cancelled = false
157+
let failures = [|IReadOnlyCollection.empty; IReadOnlyCollection.empty|]
158+
let pending = ref 2
159+
160+
let trySet () =
161+
if Interlocked.Decrement pending = 0 then
162+
let noFailures = Array.forall IReadOnlyCollection.isEmpty failures
163+
if noFailures && not cancelled then
164+
try tcs.SetResult (mapper r1.Value r2.Value)
165+
with e -> tcs.SetException e
166+
elif noFailures then tcs.SetCanceled ()
167+
else tcs.SetException (failures |> Seq.map AggregateException |> Seq.reduce Exception.add).InnerExceptions
168+
169+
let k (v: ref<'k>) i t =
170+
match t with
171+
| Canceled -> cancelled <- true
172+
| Faulted e -> failures[i] <- e.InnerExceptions
173+
| Completed r -> v.Value <- r
174+
trySet ()
175+
176+
if task1.IsCompleted && task2.IsCompleted then
177+
task1 |> k r1 0
178+
task2 |> k r2 1
179+
else
180+
task1.ContinueWith (k r1 0) |> ignore
181+
task2.ContinueWith (k r2 1) |> ignore
182+
tcs.Task
146183

147184
/// <summary>Creates a task workflow from three workflows 'x', 'y' and z, mapping its results with 'f'.</summary>
148-
/// <remarks>Similar to lift3 but although workflows are started in sequence they might end independently in different order.</remarks>
149-
/// <param name="f">The mapping function.</param>
150-
/// <param name="x">First task workflow.</param>
151-
/// <param name="y">Second task workflow.</param>
152-
/// <param name="z">Third task workflow.</param>
153-
let map3 f x y z = task {
154-
let! x' = x
155-
let! y' = y
156-
let! z' = z
157-
return f x' y' z' }
185+
/// <remarks>Similar to lift3 but although workflows are started in sequence they might end independently in different order
186+
/// and all errors are collected.
187+
/// </remarks>
188+
/// <param name="mapper">The mapping function.</param>
189+
/// <param name="task1">First task workflow.</param>
190+
/// <param name="task2">Second task workflow.</param>
191+
/// <param name="task3">Third task workflow.</param>
192+
let map3 mapper (task1: Task<'T1>) (task2: Task<'T2>) (task3: Task<'T3>) : Task<'U> =
193+
if task1.Status = TaskStatus.RanToCompletion && task2.Status = TaskStatus.RanToCompletion && task3.Status = TaskStatus.RanToCompletion then
194+
try Task.FromResult (mapper task1.Result task2.Result task3.Result)
195+
with e ->
196+
let tcs = TaskCompletionSource<_> ()
197+
tcs.SetException e
198+
tcs.Task
199+
else
200+
let tcs = TaskCompletionSource<_> ()
201+
let r1 = ref Unchecked.defaultof<_>
202+
let r2 = ref Unchecked.defaultof<_>
203+
let r3 = ref Unchecked.defaultof<_>
204+
let mutable cancelled = false
205+
let failures = [|IReadOnlyCollection.empty<exn>; IReadOnlyCollection.empty; IReadOnlyCollection.empty|]
206+
let pending = ref 3
207+
208+
let trySet () =
209+
if Interlocked.Decrement pending = 0 then
210+
let noFailures = Array.forall isNull failures
211+
if noFailures && not cancelled then
212+
try tcs.SetResult (mapper r1.Value r2.Value r3.Value)
213+
with e -> tcs.SetException e
214+
elif noFailures then tcs.SetCanceled ()
215+
else tcs.SetException (failures |> Seq.concat |> Seq.fold Exception.add (AggregateException ())).InnerExceptions
216+
217+
let k (v: ref<'k>) i t =
218+
match t with
219+
| Canceled -> cancelled <- true
220+
| Faulted e -> failures[i] <- e.InnerExceptions
221+
| Completed r -> v.Value <- r
222+
trySet ()
223+
224+
if task1.IsCompleted && task2.IsCompleted && task3.IsCompleted then
225+
task1 |> k r1 0
226+
task2 |> k r2 1
227+
task3 |> k r3 2
228+
else
229+
task1.ContinueWith (k r1 0) |> ignore
230+
task2.ContinueWith (k r2 1) |> ignore
231+
task3.ContinueWith (k r3 2) |> ignore
232+
tcs.Task
158233

159234
/// <summary>Creates a task workflow that is the result of applying the resulting function of a task workflow
160235
/// to the resulting value of another task workflow</summary>
@@ -242,11 +317,16 @@ module Task =
242317
tcs.Task
243318

244319
/// <summary>Creates a task workflow from two workflows 'x' and 'y', tupling its results.</summary>
245-
/// <remarks>Similar to zipSequentially but although workflows are started in sequence they might end independently in different order.</remarks>
246-
let zip x y = task {
247-
let! x' = x
248-
let! y' = y
249-
return x', y' }
320+
/// <remarks>Similar to zipSequentially but although workflows are started in sequence they might end independently in different order
321+
/// and all errors are collected.
322+
/// </remarks>
323+
let zip (task1: Task<'T1>) (task2: Task<'T2>) = map2 (fun x y -> x, y) task1 task2
324+
325+
/// <summary>Creates a task workflow from two workflows 'x', 'y' and 'z', tupling its results.</summary>
326+
/// <remarks>Similar to zipSequentially but although workflows are started in sequence they might end independently in different order
327+
/// and all errors are collected.
328+
/// </remarks>
329+
let zip3 (task1: Task<'T1>) (task2: Task<'T2>) (task3: Task<'T3>) = map3 (fun x y z -> x, y, z) task1 task2 task3
250330

251331
/// Flattens two nested tasks into one.
252332
let join (source: Task<Task<'T>>) : Task<'T> = source.Unwrap()

tests/FSharpPlus.Tests/Task.fs

+44-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ module Task =
1212
exception TestException of string
1313

1414
module TaskTests =
15+
open System.Threading
1516

1617
let createTask isFailed delay value =
1718
if not isFailed && delay = 0 then Task.FromResult value
@@ -39,11 +40,17 @@ module Task =
3940
let b = Task.zipSequentially x1 x2
4041
require b.IsCompleted "Task.zipSequentially didn't short-circuit"
4142

43+
let b1 = Task.zip3 x1 x2 x3
44+
require b1.IsCompleted "Task.zip3 didn't short-circuit"
45+
4246
let c = Task.lift2 (+) x1 x2
4347
require c.IsCompleted "Task.lift2 didn't short-circuit"
4448

4549
let d = Task.lift3 (fun x y z -> x + y + z) x1 x2 x3
46-
require d.IsCompleted "Task.lift3 didn't short-circiut"
50+
require d.IsCompleted "Task.lift3 didn't short-circuit"
51+
52+
let d2 = Task.map3 (fun x y z -> x + y + z) x1 x2 x3
53+
require d2.IsCompleted "Task.map3 didn't short-circuit"
4754

4855
[<Test>]
4956
let erroredTasks () =
@@ -151,6 +158,42 @@ module Task =
151158
r19.Exception.InnerExceptions |> areEquivalent [TestException "Ouch, can't create: 2"]
152159
let r20 = Task.lift3 (mapping3 false) (x1 ()) (x2 ()) (e3 ())
153160
r20.Exception.InnerExceptions |> areEquivalent [TestException "Ouch, can't create: 3"]
161+
162+
[<Test>]
163+
let testTaskZip () =
164+
let t1 = createTask true 0 1
165+
let t2 = createTask true 0 2
166+
let t3 = createTask true 0 3
167+
168+
let c = new CancellationToken true
169+
let t4 = Task.FromCanceled<int> c
170+
171+
let t5 = createTask false 0 5
172+
let t6 = createTask false 0 6
173+
174+
let t12 = Task.WhenAll [t1; t2]
175+
let t12t12 = Task.WhenAll [t12; t12]
176+
let t33 = Task.WhenAll [t3; t3]
177+
178+
Task.WaitAny t12 |> ignore
179+
Task.WaitAny t12t12 |> ignore
180+
Task.WaitAny t33 |> ignore
181+
182+
let t12123 = Task.zip3 t12t12 t33 t4
183+
let ac1 =
184+
try
185+
t12123.Exception.InnerExceptions |> Seq.map (fun x -> int (Char.GetNumericValue x.Message.[35]))
186+
with e ->
187+
failwithf "Failure in testTaskZip. Task status is %A . Exception is %A" t12123.Status e
188+
189+
CollectionAssert.AreEquivalent ([1; 2; 1; 2; 3], ac1, "Task.zip(3) should add only non already existing exceptions.")
190+
191+
let t13 = Task.zip3 (Task.zip t1 t3) t4 (Task.zip t5 t6)
192+
Assert.AreEqual (true, t13.IsFaulted, "Task.zip(3) between a value, an exception and a cancellation -> exception wins.")
193+
let ac2 = t13.Exception.InnerExceptions |> Seq.map (fun x -> int (Char.GetNumericValue x.Message.[35]))
194+
CollectionAssert.AreEquivalent ([1; 3], ac2, "Task.zip between 2 exceptions => both exceptions returned, even after combining with cancellation and values.")
195+
196+
154197

155198
module TaskBuilderTests =
156199

0 commit comments

Comments
 (0)