Skip to content

DRAFT: Sync stream select #590

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
57 changes: 57 additions & 0 deletions bench/bench_select_sync.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@

open Eio.Stdenv
open Eio
module Sync = Eio__Sync

let sender_fibers = 4

let message = 1234

(* Send [n_msgs] items to streams in a round-robin way. *)
let sender ~n_msgs streams =
let msgs = Seq.take n_msgs (Seq.ints 0) in
let streams = Seq.cycle (List.to_seq streams) in
let zipped = Seq.zip msgs streams in
ignore (Seq.iter (fun (_i, stream) ->
Sync.put stream message) zipped)

(* Start one sender fiber for each stream, and let it send n_msgs messages.
Each fiber sends to all streams in a round-robin way. *)
let run_senders ~dom_mgr ?(n_msgs = 100) streams =
Switch.run @@ fun sw ->
ignore @@ List.iter (fun _stream ->
Fiber.fork ~sw (fun () ->
Domain_manager.run dom_mgr (fun () ->
sender ~n_msgs streams))) streams

(* Receive messages from all streams. *)
let receiver ~n_msgs streams =
for _i = 1 to n_msgs do
assert (Int.equal message (Sync.select_of_many streams));
done

(* Create [n] streams. *)
let make_streams n =
let unfolder i = if i == 0 then None else Some (Sync.create (), i-1) in
let seq = Seq.unfold unfolder n in
List.of_seq seq

let run env =
let dom_mgr = domain_mgr env in
let clock = clock env in
let streams = make_streams sender_fibers in
let selector = List.map (fun s -> (s, fun i -> i)) streams in
let n_msgs = 10000 in
Switch.run @@ fun sw ->
Fiber.fork ~sw (fun () -> run_senders ~dom_mgr ~n_msgs streams);
let before = Time.now clock in
receiver ~n_msgs:(sender_fibers * n_msgs) selector;
let after = Time.now clock in
let elapsed = after -. before in
let time_per_iter = elapsed /. (Float.of_int @@ sender_fibers * n_msgs) in
[Metric.create
(Printf.sprintf "sync:true senders:%d msgs_per_sender:%d" sender_fibers n_msgs)
(`Float (1e9 *. time_per_iter)) "ns"
"Time per transmitted int"]


74 changes: 70 additions & 4 deletions lib_eio/sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,49 @@ let rec put (t : _ t) v =
(* Taking. *)

(* Mirror of [producer_resume_cell]. *)
let rec consumer_resume_cell t ~success ~in_transition cell =
let rec consumer_resume_cell t ~success ?in_transition cell =
match Atomic.get (cell : _ Cell.t Atomic.t) with
| Slot _ -> assert false
| In_transition -> in_transition cell
| Finished -> consumer_resume_cell t ~success ~in_transition (Q.next_resume t.producers)
| In_transition -> (
match in_transition with
| Some in_transition' -> in_transition' cell
(* If no [in_transition] function is provided, spin on cell.
This only occurs in multi-domain contexts, otherwise [In_transition]
is not observed. *)
| None -> consumer_resume_cell t ~success ?in_transition cell)
| Finished -> consumer_resume_cell t ~success ?in_transition (Q.next_resume t.producers)
| Item req as old ->
if Atomic.compare_and_set cell old Finished then success req
else consumer_resume_cell t ~success ~in_transition cell
else consumer_resume_cell t ~success ?in_transition cell

let take_suspend_select ~enqueue ~ctx ~cancel_all t f loc finished =
let Short cell | Long (_, cell) = loc in
let kc v = begin
if (Atomic.compare_and_set finished false true) then (
cancel_all ();
(* deliver value *)
enqueue (Ok (f v));
true
) else (
(* reject value, let producer try again *)
false
)
end in
add_to_cell t.producers (Slot kc) cell;
(* This will be called if another stream has yielded a value. *)
let cancel_this () = begin
match loc with
| Short _ -> ()
| Long loc -> ignore (cancel t loc)
end in
(match loc with
| Short _ -> ()
| Long loc -> (
match Fiber_context.get_error ctx with
| Some ex -> if cancel t loc then enqueue (Error ex);
| None -> ()
));
cancel_this

let take_suspend t loc =
Suspend.enter_unchecked @@ fun ctx enqueue ->
Expand Down Expand Up @@ -412,6 +447,37 @@ let take (t : _ t) =
take_suspend t (Long (Q.next_suspend t.consumers))
)

let select_of_many (type a b) (ts: (a t * (a -> b)) list) =
let finished = Atomic.make false in
let cancel_fns = ref [] in
let add_cancel_fn fn = cancel_fns := (fn :: !cancel_fns) in
let cancel_all () = List.iter (fun fn -> fn ()) !cancel_fns in
let wait ctx enqueue ((t, f): (a t * (a -> b))) = begin
if (Atomic.fetch_and_add t.balance (-1)) > 0 then (
(* have item, can cancel remaining stream waiters*)
if Atomic.compare_and_set finished false true then (
cancel_all ();
(* Item is available, take it over from producer. *)
let cell = Q.next_resume t.producers in
(* [in_transition] is [None] so we spin-wait on the item being available.
This is a rare scenario. *)
let v = consumer_resume_cell t cell
~success:(fun it -> it.kp (Ok true); it.v)
?in_transition:None in
enqueue (Ok (f v))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to run the user function f here because we're in the sender's context, which could be a different domain, sys-thread, etc.

) else (
(* restore old balance, because another stream was ready first. *)
ignore (Atomic.fetch_and_add t.balance (+1))
Comment on lines +469 to +470
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't safe. Some other domain may have seen the updated value and be relying on it.

)
) else (
let cell = Long (Q.next_suspend t.consumers) in
let cancel_fn = take_suspend_select ~enqueue ~ctx ~cancel_all t f cell finished in
add_cancel_fn cancel_fn
)
end in
Suspend.enter_unchecked (fun ctx enqueue ->
List.iter (wait ctx enqueue) ts)

let reject = Slot (fun _ -> false)

let take_nonblocking (t : _ t) =
Expand Down
3 changes: 3 additions & 0 deletions lib_eio/sync.mli
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ val balance : 'a t -> int

val dump : 'a t Fmt.t
(** [dump] formats the internal state of a channel, for testing and debugging. *)

val select_of_many : ('a t * ('a -> 'b)) list -> 'b
(** alpha: [select_of_many] returns an element from the first sync stream to yield an item. *)
23 changes: 23 additions & 0 deletions tests/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,26 @@ Non-blocking take with zero-capacity stream:
+Got None from stream
- : unit = ()
```

Selecting from multiple sync (0-capacity) channels. Note that this works on the internal
API, which is usually not exposed.

```ocaml
# module Sy = Eio__Sync
module Sy = Eio__Sync
# run @@ fun () -> Switch.run (fun sw ->
let t1, t2 = (Sy.create ()), (Sy.create ()) in
let identity x = x in
let selector = [(t1, identity); (t2, identity)] in
Fiber.fork ~sw (fun () -> Sy.put t2 "foo");
Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many selector));
Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many selector));
Fiber.fork ~sw (fun () -> traceln "%s" (Sy.select_of_many selector));
Fiber.fork ~sw (fun () -> Sy.put t2 "bar");
Fiber.fork ~sw (fun () -> Sy.put t1 "baz");
)
+foo
+bar
+baz
- : unit = ()
```