From b2ba192522a4e0b7310c55a4be1abe277a224fbb Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Thu, 20 Jul 2023 23:42:29 +0200 Subject: [PATCH 1/9] Implement first idea for sync stream select --- lib_eio/sync.ml | 58 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/lib_eio/sync.ml b/lib_eio/sync.ml index ee78c62a7..fc8f49faf 100644 --- a/lib_eio/sync.ml +++ b/lib_eio/sync.ml @@ -383,6 +383,35 @@ let rec consumer_resume_cell t ~success ~in_transition cell = if Atomic.compare_and_set cell old Finished then success req else consumer_resume_cell t ~success ~in_transition cell +let take_suspend_select ~enqueue ~ctx ~cancel_all t loc finished = + let Short cell | Long (_, cell) = loc in + let kc v = begin + if (Atomic.compare_and_set finished false true) then ( + cancel_all (); + (* deliver value *) + enqueue (Ok v); + true + ) else ( + (* reject value, let producer try again *) + false + ) + end in + add_to_cell t.producers (Slot kc) cell; + (* This will be called if another stream has yielded a value. *) + let cancel_this () = begin + match loc with + | Short _ -> () + | Long loc -> ignore (cancel t loc) + end in + (match loc with + | Short _ -> () + | Long loc -> ( + match Fiber_context.get_error ctx with + | Some ex -> if cancel t loc then enqueue (Error ex); + | None -> () + )); + cancel_this + let take_suspend t loc = Suspend.enter_unchecked @@ fun ctx enqueue -> let Short cell | Long (_, cell) = loc in @@ -412,6 +441,35 @@ let take (t : _ t) = take_suspend t (Long (Q.next_suspend t.consumers)) ) +let select_of_many (type a) (ts: a t list) = + let finished = Atomic.make false in + let cancel_fns = ref [] in + let add_cancel_fn fn = cancel_fns := (fn :: !cancel_fns) in + let cancel_all () = List.iter (fun fn -> fn ()) !cancel_fns in + let wait ctx enqueue t = begin + if (Atomic.fetch_and_add t.balance (-1)) > 0 then ( + (* have item, can cancel remaining stream waiters*) + if Atomic.compare_and_set finished false true then ( + cancel_all (); + (* Item is available, take it over from producer. *) + let cell = Q.next_resume t.producers in + let v = consumer_resume_cell t cell + ~success:(fun it -> it.kp (Ok true); it.v) + ~in_transition:(fun cell -> assert false) in (* todo: find correct way to implement *) + enqueue (Ok v) + ) else ( + (* restore old balance, because another stream was ready first. *) + ignore (Atomic.fetch_and_add t.balance (+1)) + ) + ) else ( + let cell = Long (Q.next_suspend t.consumers) in + let cancel_fn = take_suspend_select ~enqueue ~ctx ~cancel_all t cell finished in + add_cancel_fn cancel_fn + ) +end in + Suspend.enter_unchecked (fun ctx enqueue -> + List.iter (wait ctx enqueue) ts) + let reject = Slot (fun _ -> false) let take_nonblocking (t : _ t) = From 994416679599d2e48a80873b03b47a5c078403cd Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Fri, 21 Jul 2023 20:40:25 +0200 Subject: [PATCH 2/9] Make first implementation of sync stream select work --- lib_eio/sync.ml | 15 ++++++++++----- lib_eio/sync.mli | 3 +++ tests/stream.md | 21 +++++++++++++++++++++ 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/lib_eio/sync.ml b/lib_eio/sync.ml index fc8f49faf..946459b23 100644 --- a/lib_eio/sync.ml +++ b/lib_eio/sync.ml @@ -446,7 +446,7 @@ let select_of_many (type a) (ts: a t list) = let cancel_fns = ref [] in let add_cancel_fn fn = cancel_fns := (fn :: !cancel_fns) in let cancel_all () = List.iter (fun fn -> fn ()) !cancel_fns in - let wait ctx enqueue t = begin + let wait ctx enqueue (t: a t) = begin if (Atomic.fetch_and_add t.balance (-1)) > 0 then ( (* have item, can cancel remaining stream waiters*) if Atomic.compare_and_set finished false true then ( @@ -454,8 +454,13 @@ let select_of_many (type a) (ts: a t list) = (* Item is available, take it over from producer. *) let cell = Q.next_resume t.producers in let v = consumer_resume_cell t cell - ~success:(fun it -> it.kp (Ok true); it.v) - ~in_transition:(fun cell -> assert false) in (* todo: find correct way to implement *) + ~success:(fun it -> it.kp (Ok true); it.v) + ~in_transition:(fun cell -> + (* TODO: Nested suspend - check if this works as planned.*) + Suspend.enter_unchecked (fun _ctx enqueue' -> + let kc v = enqueue' (Ok v); true in + add_to_cell t.producers (Slot kc) cell + )) in enqueue (Ok v) ) else ( (* restore old balance, because another stream was ready first. *) @@ -466,9 +471,9 @@ let select_of_many (type a) (ts: a t list) = let cancel_fn = take_suspend_select ~enqueue ~ctx ~cancel_all t cell finished in add_cancel_fn cancel_fn ) -end in + end in Suspend.enter_unchecked (fun ctx enqueue -> - List.iter (wait ctx enqueue) ts) + List.iter (wait ctx enqueue) ts) let reject = Slot (fun _ -> false) diff --git a/lib_eio/sync.mli b/lib_eio/sync.mli index bb304b508..2930230b2 100644 --- a/lib_eio/sync.mli +++ b/lib_eio/sync.mli @@ -50,3 +50,6 @@ val balance : 'a t -> int val dump : 'a t Fmt.t (** [dump] formats the internal state of a channel, for testing and debugging. *) + +val select_of_many : 'a t list -> 'a +(** alpha: [select_of_many] returns an element from the first sync stream to yield an item. *) \ No newline at end of file diff --git a/tests/stream.md b/tests/stream.md index c5a035e3b..e9b6b0567 100644 --- a/tests/stream.md +++ b/tests/stream.md @@ -357,3 +357,24 @@ Non-blocking take with zero-capacity stream: +Got None from stream - : unit = () ``` + +Selecting from multiple sync (0-capacity) channels. Note that this works on the internal +API, which is usually not exposed. + +```ocaml +# module Sy = Eio__Sync +module Sy = Eio__Sync +# run @@ fun () -> Switch.run (fun sw -> + let t1, t2 = (Sy.create ()), (Sy.create ()) in + Fiber.fork ~sw (fun () -> Sy.put t2 "foo"); + Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many [t1; t2])); + Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many [t1; t2])); + Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many [t1; t2])); + Fiber.fork ~sw (fun () -> Sy.put t2 "bar"); + Fiber.fork ~sw (fun () -> Sy.put t1 "baz"); + ) ++foo ++bar ++baz +- : unit = () +``` From b322ab45174bac4849d461a3ce974531c31881b1 Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Fri, 21 Jul 2023 21:17:36 +0200 Subject: [PATCH 3/9] Add very bare-bones benchmark script for sync stream select --- bench/bench_select.ml | 46 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 bench/bench_select.ml diff --git a/bench/bench_select.ml b/bench/bench_select.ml new file mode 100644 index 000000000..251935540 --- /dev/null +++ b/bench/bench_select.ml @@ -0,0 +1,46 @@ + +open Eio.Stdenv +open Eio +module Sync = Eio__Sync + +let sender_fibers = 10 + +let message = 1234 + +let sender ~id ~n_msgs stream = + for i = 1 to n_msgs do + traceln "Sent message #%d from %d" i id; + Sync.put stream message + done + +(* Start one sender fiber for each stream, and let it send n_msgs messages. *) +let run_senders ~dom_mgr ?(n_msgs = 100) streams = + let count = ref 0 in + Switch.run @@ fun sw -> + ignore @@ List.map (fun stream -> + Fiber.fork ~sw (fun () -> + let id = !count in + count := !count + 1; + Domain_manager.run dom_mgr (fun () -> sender ~id ~n_msgs stream))) streams + +let receiver ~n_msgs streams = + for i = 1 to n_msgs do + assert (Int.equal message (Sync.select_of_many streams)); + traceln "Received message #%d" i + done + +let make_streams n = + let unfolder i = if i == 0 then None else Some (Sync.create (), i-1) in + let seq = Seq.unfold unfolder n in + List.of_seq seq + +let run env = + let dom_mgr = domain_mgr env in + let streams = make_streams sender_fibers in + let n_msgs = 50 in + Switch.run @@ fun sw -> + Fiber.fork ~sw (fun () -> run_senders ~dom_mgr ~n_msgs streams); + receiver ~n_msgs:(sender_fibers * n_msgs) streams; + [] + + From a4f5fa2a76a89db00919078121442896bc05a0f2 Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Fri, 21 Jul 2023 21:29:11 +0200 Subject: [PATCH 4/9] Add note about origin of "Unhandled effect" exception --- bench/bench_select.ml | 6 +++++- lib_eio/sync.ml | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/bench/bench_select.ml b/bench/bench_select.ml index 251935540..d54f9967c 100644 --- a/bench/bench_select.ml +++ b/bench/bench_select.ml @@ -21,7 +21,8 @@ let run_senders ~dom_mgr ?(n_msgs = 100) streams = Fiber.fork ~sw (fun () -> let id = !count in count := !count + 1; - Domain_manager.run dom_mgr (fun () -> sender ~id ~n_msgs stream))) streams + Domain_manager.run dom_mgr (fun () -> + sender ~id ~n_msgs stream))) streams let receiver ~n_msgs streams = for i = 1 to n_msgs do @@ -34,6 +35,9 @@ let make_streams n = let seq = Seq.unfold unfolder n in List.of_seq seq +(* Currently fails with exception from ocaml-uring/lib/uring/uring.ml:326 + https://github.com/ocaml-multicore/ocaml-uring/blob/07482dae72c8e977e4e4e2b2c8bd137e770ee1dd/lib/uring/uring.ml#L327 +*) let run env = let dom_mgr = domain_mgr env in let streams = make_streams sender_fibers in diff --git a/lib_eio/sync.ml b/lib_eio/sync.ml index 946459b23..e0383554e 100644 --- a/lib_eio/sync.ml +++ b/lib_eio/sync.ml @@ -456,7 +456,7 @@ let select_of_many (type a) (ts: a t list) = let v = consumer_resume_cell t cell ~success:(fun it -> it.kp (Ok true); it.v) ~in_transition:(fun cell -> - (* TODO: Nested suspend - check if this works as planned.*) + (* TODO: this fails with an exception as the Suspend effect is unhandled! *) Suspend.enter_unchecked (fun _ctx enqueue' -> let kc v = enqueue' (Ok v); true in add_to_cell t.producers (Slot kc) cell From 0271de47e9b2fd2048e1224939f88119a0ac9ea3 Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Fri, 21 Jul 2023 22:18:21 +0200 Subject: [PATCH 5/9] Fix unhandled effect exception in select_of_many --- lib_eio/sync.ml | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/lib_eio/sync.ml b/lib_eio/sync.ml index e0383554e..193ea5a74 100644 --- a/lib_eio/sync.ml +++ b/lib_eio/sync.ml @@ -374,14 +374,20 @@ let rec put (t : _ t) v = (* Taking. *) (* Mirror of [producer_resume_cell]. *) -let rec consumer_resume_cell t ~success ~in_transition cell = +let rec consumer_resume_cell t ~success ?in_transition cell = match Atomic.get (cell : _ Cell.t Atomic.t) with | Slot _ -> assert false - | In_transition -> in_transition cell - | Finished -> consumer_resume_cell t ~success ~in_transition (Q.next_resume t.producers) + | In_transition -> ( + match in_transition with + | Some in_transition' -> in_transition' cell + (* If no [in_transition] function is provided, spin on cell. + This only occurs in multi-domain contexts, otherwise [In_transition] + is not observed. *) + | None -> consumer_resume_cell t ~success ?in_transition cell) + | Finished -> consumer_resume_cell t ~success ?in_transition (Q.next_resume t.producers) | Item req as old -> if Atomic.compare_and_set cell old Finished then success req - else consumer_resume_cell t ~success ~in_transition cell + else consumer_resume_cell t ~success ?in_transition cell let take_suspend_select ~enqueue ~ctx ~cancel_all t loc finished = let Short cell | Long (_, cell) = loc in @@ -453,14 +459,11 @@ let select_of_many (type a) (ts: a t list) = cancel_all (); (* Item is available, take it over from producer. *) let cell = Q.next_resume t.producers in + (* [in_transition] is [None] so we spin-wait on the item being available. + This is a rare scenario. *) let v = consumer_resume_cell t cell ~success:(fun it -> it.kp (Ok true); it.v) - ~in_transition:(fun cell -> - (* TODO: this fails with an exception as the Suspend effect is unhandled! *) - Suspend.enter_unchecked (fun _ctx enqueue' -> - let kc v = enqueue' (Ok v); true in - add_to_cell t.producers (Slot kc) cell - )) in + ?in_transition:None in enqueue (Ok v) ) else ( (* restore old balance, because another stream was ready first. *) From b0aaa6aa3a9636beb82662091cf194ed114d563c Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Fri, 21 Jul 2023 22:57:21 +0200 Subject: [PATCH 6/9] Improve sync select benchmark --- bench/bench_select.ml | 50 ++++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/bench/bench_select.ml b/bench/bench_select.ml index d54f9967c..cd97c806f 100644 --- a/bench/bench_select.ml +++ b/bench/bench_select.ml @@ -3,48 +3,54 @@ open Eio.Stdenv open Eio module Sync = Eio__Sync -let sender_fibers = 10 +let sender_fibers = 2 let message = 1234 -let sender ~id ~n_msgs stream = - for i = 1 to n_msgs do - traceln "Sent message #%d from %d" i id; - Sync.put stream message - done +(* Send [n_msgs] items to streams in a round-robin way. *) +let sender ~n_msgs streams = + let msgs = Seq.take n_msgs (Seq.ints 0) in + let streams = Seq.cycle (List.to_seq streams) in + let zipped = Seq.zip msgs streams in + ignore (Seq.iter (fun (_i, stream) -> + Sync.put stream message) zipped) -(* Start one sender fiber for each stream, and let it send n_msgs messages. *) +(* Start one sender fiber for each stream, and let it send n_msgs messages. + Each fiber sends to all streams in a round-robin way. *) let run_senders ~dom_mgr ?(n_msgs = 100) streams = - let count = ref 0 in Switch.run @@ fun sw -> - ignore @@ List.map (fun stream -> + ignore @@ List.iter (fun _stream -> Fiber.fork ~sw (fun () -> - let id = !count in - count := !count + 1; - Domain_manager.run dom_mgr (fun () -> - sender ~id ~n_msgs stream))) streams + Domain_manager.run dom_mgr (fun () -> + sender ~n_msgs streams))) streams +(* Receive messages from all streams. *) let receiver ~n_msgs streams = - for i = 1 to n_msgs do + for _i = 1 to n_msgs do assert (Int.equal message (Sync.select_of_many streams)); - traceln "Received message #%d" i done +(* Create [n] streams. *) let make_streams n = let unfolder i = if i == 0 then None else Some (Sync.create (), i-1) in let seq = Seq.unfold unfolder n in List.of_seq seq -(* Currently fails with exception from ocaml-uring/lib/uring/uring.ml:326 - https://github.com/ocaml-multicore/ocaml-uring/blob/07482dae72c8e977e4e4e2b2c8bd137e770ee1dd/lib/uring/uring.ml#L327 -*) let run env = let dom_mgr = domain_mgr env in + let clock = clock env in let streams = make_streams sender_fibers in - let n_msgs = 50 in + let n_msgs = 10000 in Switch.run @@ fun sw -> - Fiber.fork ~sw (fun () -> run_senders ~dom_mgr ~n_msgs streams); - receiver ~n_msgs:(sender_fibers * n_msgs) streams; - [] + Fiber.fork ~sw (fun () -> run_senders ~dom_mgr ~n_msgs streams); + let before = Time.now clock in + receiver ~n_msgs:(sender_fibers * n_msgs) streams; + let after = Time.now clock in + let elapsed = after -. before in + let time_per_iter = elapsed /. (Float.of_int @@ sender_fibers * n_msgs) in + [Metric.create + (Printf.sprintf "sync:true senders:%d msgs_per_sender:%d" sender_fibers n_msgs) + (`Float (1e9 *. time_per_iter)) "ns" + "Time per transmitted int"] From 8db0997ab70ef7295d98c10cf3f7eb5cdf87e2a6 Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Fri, 21 Jul 2023 23:05:44 +0200 Subject: [PATCH 7/9] Unify type of select_of_many --- bench/bench_select.ml | 3 ++- lib_eio/sync.ml | 12 ++++++------ lib_eio/sync.mli | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/bench/bench_select.ml b/bench/bench_select.ml index cd97c806f..7e95d401c 100644 --- a/bench/bench_select.ml +++ b/bench/bench_select.ml @@ -40,11 +40,12 @@ let run env = let dom_mgr = domain_mgr env in let clock = clock env in let streams = make_streams sender_fibers in + let selector = List.map (fun s -> (s, fun i -> i)) streams in let n_msgs = 10000 in Switch.run @@ fun sw -> Fiber.fork ~sw (fun () -> run_senders ~dom_mgr ~n_msgs streams); let before = Time.now clock in - receiver ~n_msgs:(sender_fibers * n_msgs) streams; + receiver ~n_msgs:(sender_fibers * n_msgs) selector; let after = Time.now clock in let elapsed = after -. before in let time_per_iter = elapsed /. (Float.of_int @@ sender_fibers * n_msgs) in diff --git a/lib_eio/sync.ml b/lib_eio/sync.ml index 193ea5a74..b0734c2e8 100644 --- a/lib_eio/sync.ml +++ b/lib_eio/sync.ml @@ -389,13 +389,13 @@ let rec consumer_resume_cell t ~success ?in_transition cell = if Atomic.compare_and_set cell old Finished then success req else consumer_resume_cell t ~success ?in_transition cell -let take_suspend_select ~enqueue ~ctx ~cancel_all t loc finished = +let take_suspend_select ~enqueue ~ctx ~cancel_all t f loc finished = let Short cell | Long (_, cell) = loc in let kc v = begin if (Atomic.compare_and_set finished false true) then ( cancel_all (); (* deliver value *) - enqueue (Ok v); + enqueue (Ok (f v)); true ) else ( (* reject value, let producer try again *) @@ -447,12 +447,12 @@ let take (t : _ t) = take_suspend t (Long (Q.next_suspend t.consumers)) ) -let select_of_many (type a) (ts: a t list) = +let select_of_many (type a b) (ts: (a t * (a -> b)) list) = let finished = Atomic.make false in let cancel_fns = ref [] in let add_cancel_fn fn = cancel_fns := (fn :: !cancel_fns) in let cancel_all () = List.iter (fun fn -> fn ()) !cancel_fns in - let wait ctx enqueue (t: a t) = begin + let wait ctx enqueue ((t, f): (a t * (a -> b))) = begin if (Atomic.fetch_and_add t.balance (-1)) > 0 then ( (* have item, can cancel remaining stream waiters*) if Atomic.compare_and_set finished false true then ( @@ -464,14 +464,14 @@ let select_of_many (type a) (ts: a t list) = let v = consumer_resume_cell t cell ~success:(fun it -> it.kp (Ok true); it.v) ?in_transition:None in - enqueue (Ok v) + enqueue (Ok (f v)) ) else ( (* restore old balance, because another stream was ready first. *) ignore (Atomic.fetch_and_add t.balance (+1)) ) ) else ( let cell = Long (Q.next_suspend t.consumers) in - let cancel_fn = take_suspend_select ~enqueue ~ctx ~cancel_all t cell finished in + let cancel_fn = take_suspend_select ~enqueue ~ctx ~cancel_all t f cell finished in add_cancel_fn cancel_fn ) end in diff --git a/lib_eio/sync.mli b/lib_eio/sync.mli index 2930230b2..c998f9246 100644 --- a/lib_eio/sync.mli +++ b/lib_eio/sync.mli @@ -51,5 +51,5 @@ val balance : 'a t -> int val dump : 'a t Fmt.t (** [dump] formats the internal state of a channel, for testing and debugging. *) -val select_of_many : 'a t list -> 'a +val select_of_many : ('a t * ('a -> 'b)) list -> 'b (** alpha: [select_of_many] returns an element from the first sync stream to yield an item. *) \ No newline at end of file From ff552519f7c70a26c8aeb2089d60a182e107eb36 Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Sat, 22 Jul 2023 12:04:52 +0200 Subject: [PATCH 8/9] Rename bench_select -> bench_select_sync --- bench/{bench_select.ml => bench_select_sync.ml} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename bench/{bench_select.ml => bench_select_sync.ml} (98%) diff --git a/bench/bench_select.ml b/bench/bench_select_sync.ml similarity index 98% rename from bench/bench_select.ml rename to bench/bench_select_sync.ml index 7e95d401c..9647a35bc 100644 --- a/bench/bench_select.ml +++ b/bench/bench_select_sync.ml @@ -3,7 +3,7 @@ open Eio.Stdenv open Eio module Sync = Eio__Sync -let sender_fibers = 2 +let sender_fibers = 4 let message = 1234 From 1db1bee322841a4553df472032e9a32404e23815 Mon Sep 17 00:00:00 2001 From: Lewin Bormann Date: Sat, 22 Jul 2023 16:50:58 +0200 Subject: [PATCH 9/9] Update mdx test with new select_of_many API --- tests/stream.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/stream.md b/tests/stream.md index e9b6b0567..17a0489d2 100644 --- a/tests/stream.md +++ b/tests/stream.md @@ -366,10 +366,12 @@ API, which is usually not exposed. module Sy = Eio__Sync # run @@ fun () -> Switch.run (fun sw -> let t1, t2 = (Sy.create ()), (Sy.create ()) in + let identity x = x in + let selector = [(t1, identity); (t2, identity)] in Fiber.fork ~sw (fun () -> Sy.put t2 "foo"); - Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many [t1; t2])); - Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many [t1; t2])); - Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many [t1; t2])); + Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many selector)); + Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many selector)); + Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many selector)); Fiber.fork ~sw (fun () -> Sy.put t2 "bar"); Fiber.fork ~sw (fun () -> Sy.put t1 "baz"); )