diff --git a/bench/bench_select_sync.ml b/bench/bench_select_sync.ml new file mode 100644 index 000000000..9647a35bc --- /dev/null +++ b/bench/bench_select_sync.ml @@ -0,0 +1,57 @@ + +open Eio.Stdenv +open Eio +module Sync = Eio__Sync + +let sender_fibers = 4 + +let message = 1234 + +(* Send [n_msgs] items to streams in a round-robin way. *) +let sender ~n_msgs streams = + let msgs = Seq.take n_msgs (Seq.ints 0) in + let streams = Seq.cycle (List.to_seq streams) in + let zipped = Seq.zip msgs streams in + ignore (Seq.iter (fun (_i, stream) -> + Sync.put stream message) zipped) + +(* Start one sender fiber for each stream, and let it send n_msgs messages. + Each fiber sends to all streams in a round-robin way. *) +let run_senders ~dom_mgr ?(n_msgs = 100) streams = + Switch.run @@ fun sw -> + ignore @@ List.iter (fun _stream -> + Fiber.fork ~sw (fun () -> + Domain_manager.run dom_mgr (fun () -> + sender ~n_msgs streams))) streams + +(* Receive messages from all streams. *) +let receiver ~n_msgs streams = + for _i = 1 to n_msgs do + assert (Int.equal message (Sync.select_of_many streams)); + done + +(* Create [n] streams. *) +let make_streams n = + let unfolder i = if i == 0 then None else Some (Sync.create (), i-1) in + let seq = Seq.unfold unfolder n in + List.of_seq seq + +let run env = + let dom_mgr = domain_mgr env in + let clock = clock env in + let streams = make_streams sender_fibers in + let selector = List.map (fun s -> (s, fun i -> i)) streams in + let n_msgs = 10000 in + Switch.run @@ fun sw -> + Fiber.fork ~sw (fun () -> run_senders ~dom_mgr ~n_msgs streams); + let before = Time.now clock in + receiver ~n_msgs:(sender_fibers * n_msgs) selector; + let after = Time.now clock in + let elapsed = after -. before in + let time_per_iter = elapsed /. (Float.of_int @@ sender_fibers * n_msgs) in + [Metric.create + (Printf.sprintf "sync:true senders:%d msgs_per_sender:%d" sender_fibers n_msgs) + (`Float (1e9 *. time_per_iter)) "ns" + "Time per transmitted int"] + + diff --git a/lib_eio/sync.ml b/lib_eio/sync.ml index ee78c62a7..b0734c2e8 100644 --- a/lib_eio/sync.ml +++ b/lib_eio/sync.ml @@ -374,14 +374,49 @@ let rec put (t : _ t) v = (* Taking. *) (* Mirror of [producer_resume_cell]. *) -let rec consumer_resume_cell t ~success ~in_transition cell = +let rec consumer_resume_cell t ~success ?in_transition cell = match Atomic.get (cell : _ Cell.t Atomic.t) with | Slot _ -> assert false - | In_transition -> in_transition cell - | Finished -> consumer_resume_cell t ~success ~in_transition (Q.next_resume t.producers) + | In_transition -> ( + match in_transition with + | Some in_transition' -> in_transition' cell + (* If no [in_transition] function is provided, spin on cell. + This only occurs in multi-domain contexts, otherwise [In_transition] + is not observed. *) + | None -> consumer_resume_cell t ~success ?in_transition cell) + | Finished -> consumer_resume_cell t ~success ?in_transition (Q.next_resume t.producers) | Item req as old -> if Atomic.compare_and_set cell old Finished then success req - else consumer_resume_cell t ~success ~in_transition cell + else consumer_resume_cell t ~success ?in_transition cell + +let take_suspend_select ~enqueue ~ctx ~cancel_all t f loc finished = + let Short cell | Long (_, cell) = loc in + let kc v = begin + if (Atomic.compare_and_set finished false true) then ( + cancel_all (); + (* deliver value *) + enqueue (Ok (f v)); + true + ) else ( + (* reject value, let producer try again *) + false + ) + end in + add_to_cell t.producers (Slot kc) cell; + (* This will be called if another stream has yielded a value. *) + let cancel_this () = begin + match loc with + | Short _ -> () + | Long loc -> ignore (cancel t loc) + end in + (match loc with + | Short _ -> () + | Long loc -> ( + match Fiber_context.get_error ctx with + | Some ex -> if cancel t loc then enqueue (Error ex); + | None -> () + )); + cancel_this let take_suspend t loc = Suspend.enter_unchecked @@ fun ctx enqueue -> @@ -412,6 +447,37 @@ let take (t : _ t) = take_suspend t (Long (Q.next_suspend t.consumers)) ) +let select_of_many (type a b) (ts: (a t * (a -> b)) list) = + let finished = Atomic.make false in + let cancel_fns = ref [] in + let add_cancel_fn fn = cancel_fns := (fn :: !cancel_fns) in + let cancel_all () = List.iter (fun fn -> fn ()) !cancel_fns in + let wait ctx enqueue ((t, f): (a t * (a -> b))) = begin + if (Atomic.fetch_and_add t.balance (-1)) > 0 then ( + (* have item, can cancel remaining stream waiters*) + if Atomic.compare_and_set finished false true then ( + cancel_all (); + (* Item is available, take it over from producer. *) + let cell = Q.next_resume t.producers in + (* [in_transition] is [None] so we spin-wait on the item being available. + This is a rare scenario. *) + let v = consumer_resume_cell t cell + ~success:(fun it -> it.kp (Ok true); it.v) + ?in_transition:None in + enqueue (Ok (f v)) + ) else ( + (* restore old balance, because another stream was ready first. *) + ignore (Atomic.fetch_and_add t.balance (+1)) + ) + ) else ( + let cell = Long (Q.next_suspend t.consumers) in + let cancel_fn = take_suspend_select ~enqueue ~ctx ~cancel_all t f cell finished in + add_cancel_fn cancel_fn + ) + end in + Suspend.enter_unchecked (fun ctx enqueue -> + List.iter (wait ctx enqueue) ts) + let reject = Slot (fun _ -> false) let take_nonblocking (t : _ t) = diff --git a/lib_eio/sync.mli b/lib_eio/sync.mli index bb304b508..c998f9246 100644 --- a/lib_eio/sync.mli +++ b/lib_eio/sync.mli @@ -50,3 +50,6 @@ val balance : 'a t -> int val dump : 'a t Fmt.t (** [dump] formats the internal state of a channel, for testing and debugging. *) + +val select_of_many : ('a t * ('a -> 'b)) list -> 'b +(** alpha: [select_of_many] returns an element from the first sync stream to yield an item. *) \ No newline at end of file diff --git a/tests/stream.md b/tests/stream.md index c5a035e3b..17a0489d2 100644 --- a/tests/stream.md +++ b/tests/stream.md @@ -357,3 +357,26 @@ Non-blocking take with zero-capacity stream: +Got None from stream - : unit = () ``` + +Selecting from multiple sync (0-capacity) channels. Note that this works on the internal +API, which is usually not exposed. + +```ocaml +# module Sy = Eio__Sync +module Sy = Eio__Sync +# run @@ fun () -> Switch.run (fun sw -> + let t1, t2 = (Sy.create ()), (Sy.create ()) in + let identity x = x in + let selector = [(t1, identity); (t2, identity)] in + Fiber.fork ~sw (fun () -> Sy.put t2 "foo"); + Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many selector)); + Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many selector)); + Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many selector)); + Fiber.fork ~sw (fun () -> Sy.put t2 "bar"); + Fiber.fork ~sw (fun () -> Sy.put t1 "baz"); + ) ++foo ++bar ++baz +- : unit = () +```