Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bounded blocking Stack #321

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions bench/bench_stack.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
open Multicore_bench
open Picos_std_sync

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Stack.create ~padded:true () in

let op push =
if push then Stack.push t 101
else match Stack.pop_exn t with _ -> () | exception Stack.Empty -> ()
in

let init _ =
assert (
match Stack.pop_exn t with _ -> false | exception Stack.Empty -> true);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ~n_adders ~n_takers () =
let n_domains = n_adders + n_takers in

let n_msgs = 50 * Util.iter_factor in

let t = Stack.create ~padded:true () in

let n_msgs_to_add = Countdown.create ~n_domains:n_adders () in
let n_msgs_to_take = Countdown.create ~n_domains:n_takers () in

let init _ =
assert (
match Stack.pop_exn t with _ -> false | exception Stack.Empty -> true);
Countdown.non_atomic_set n_msgs_to_add n_msgs;
Countdown.non_atomic_set n_msgs_to_take n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Countdown.alloc n_msgs_to_add ~domain_index:i ~batch:1000 in
if 0 < n then begin
for i = 1 to n do
Stack.push t i
done;
work ()
end
in
work ()
else
let i = i - n_adders in
let rec work () =
let n = Countdown.alloc n_msgs_to_take ~domain_index:i ~batch:1000 in
if 0 < n then
let rec loop n =
if 0 < n then begin
match Stack.pop_exn t with
| _ -> loop (n - 1)
| exception Stack.Empty ->
Backoff.once Backoff.default |> ignore;
loop n
end
else work ()
in
loop n
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "nb adder" n_adders)
(format "nb taker" n_takers)
in
Times.record ~budgetf ~n_domains ~init ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
if Picos_domain.recommended_domain_count () < n_adders + n_takers then []
else run_one ~budgetf ~n_adders ~n_takers ())
1 change: 1 addition & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
(run %{test} -brief "Picos binaries")
(run %{test} -brief "Bounded_q with Picos_std_sync")
(run %{test} -brief "Memory usage")
(run %{test} -brief "Stack")
;;
))
(foreign_stubs
Expand Down
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ let benchmarks =
("Picos binaries", Bench_binaries.run_suite);
("Bounded_q with Picos_std_sync", Bench_bounded_q.run_suite);
("Memory usage", Bench_memory.run_suite);
("Stack", Bench_stack.run_suite);
]

let () = Multicore_bench.Cmd.run ~benchmarks ()
1 change: 1 addition & 0 deletions lib/picos_std.sync/picos_std_sync.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ module Latch = Latch
module Barrier = Barrier
module Ivar = Ivar
module Stream = Stream
module Stack = Stack
19 changes: 19 additions & 0 deletions lib/picos_std.sync/picos_std_sync.mli
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,25 @@ module Stream : sig
the [cursor] position. *)
end

module Stack : sig
(** *)

type !'a t
(** *)

val create : ?padded:bool -> ?capacity:int -> unit -> 'a t
(** *)

val push : 'a t -> 'a -> unit
(** *)

exception Empty
(** *)

val pop_exn : 'a t -> 'a
(** *)
end

(** {1 Examples}

{2 A simple bounded queue}
Expand Down
63 changes: 63 additions & 0 deletions lib/picos_std.sync/stack.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
open Picos_std_awaitable

type 'a state =
| Nil of { mutable capacity : int }
| Cons of { mutable capacity : int; value : 'a; rest : 'a state }

type 'a t = 'a state Awaitable.t

exception Empty

let busy_bit = 0b01
let one = 0b10
let max_capacity = Int.max_int / one

let create ?padded ?capacity () =
let capacity =
match capacity with
| None -> max_capacity * one
| Some capacity ->
if capacity < 1 || max_capacity < capacity then invalid_arg "capacity"
else capacity * one
in
Awaitable.make ?padded (Nil { capacity })

let rec push t value backoff =
match Awaitable.get t with
| Nil r as before ->
let capacity = r.capacity land lnot busy_bit in
if
Awaitable.compare_and_set t before
(Cons { capacity = capacity - one; value; rest = Nil { capacity } })
then begin
if r.capacity land busy_bit <> 0 then Awaitable.broadcast t
end
else push t value (Backoff.once backoff)
| Cons r as before ->
let capacity = r.capacity in
if one <= capacity then begin
if
not
(Awaitable.compare_and_set t before
(Cons { capacity = capacity - one; value; rest = before }))
then push t value (Backoff.once backoff)
end
else begin
if capacity <> capacity lor busy_bit then
r.capacity <- capacity lor busy_bit;
Awaitable.await t before;
push t value Backoff.default
end

let rec pop_exn t backoff =
match Awaitable.get t with
| Nil _ -> raise_notrace Empty
| Cons r as before ->
if Awaitable.compare_and_set t before r.rest then begin
if r.capacity land busy_bit <> 0 then Awaitable.broadcast t;
r.value
end
else pop_exn t (Backoff.once backoff)

let[@inline] push t value = push t value Backoff.default
let[@inline] pop_exn t = pop_exn t Backoff.default