Skip to content

Commit 0833169

Browse files
committed
Add bounded blocking Stack
1 parent d407785 commit 0833169

File tree

6 files changed

+171
-0
lines changed

6 files changed

+171
-0
lines changed

bench/bench_stack.ml

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
open Multicore_bench
2+
open Picos_std_sync
3+
4+
let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
5+
let t = Stack.create ~padded:true () in
6+
7+
let op push =
8+
if push then Stack.push t 101
9+
else match Stack.pop_exn t with _ -> () | exception Stack.Empty -> ()
10+
in
11+
12+
let init _ =
13+
assert (
14+
match Stack.pop_exn t with _ -> false | exception Stack.Empty -> true);
15+
Util.generate_push_and_pop_sequence n_msgs
16+
in
17+
let work _ bits = Util.Bits.iter op bits in
18+
19+
Times.record ~budgetf ~n_domains:1 ~init ~work ()
20+
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"
21+
22+
let run_one ~budgetf ~n_adders ~n_takers () =
23+
let n_domains = n_adders + n_takers in
24+
25+
let n_msgs = 50 * Util.iter_factor in
26+
27+
let t = Stack.create ~padded:true () in
28+
29+
let n_msgs_to_add = Countdown.create ~n_domains:n_adders () in
30+
let n_msgs_to_take = Countdown.create ~n_domains:n_takers () in
31+
32+
let init _ =
33+
assert (
34+
match Stack.pop_exn t with _ -> false | exception Stack.Empty -> true);
35+
Countdown.non_atomic_set n_msgs_to_add n_msgs;
36+
Countdown.non_atomic_set n_msgs_to_take n_msgs
37+
in
38+
let work i () =
39+
if i < n_adders then
40+
let rec work () =
41+
let n = Countdown.alloc n_msgs_to_add ~domain_index:i ~batch:1000 in
42+
if 0 < n then begin
43+
for i = 1 to n do
44+
Stack.push t i
45+
done;
46+
work ()
47+
end
48+
in
49+
work ()
50+
else
51+
let i = i - n_adders in
52+
let rec work () =
53+
let n = Countdown.alloc n_msgs_to_take ~domain_index:i ~batch:1000 in
54+
if 0 < n then
55+
let rec loop n =
56+
if 0 < n then begin
57+
match Stack.pop_exn t with
58+
| _ -> loop (n - 1)
59+
| exception Stack.Empty ->
60+
Backoff.once Backoff.default |> ignore;
61+
loop n
62+
end
63+
else work ()
64+
in
65+
loop n
66+
in
67+
work ()
68+
in
69+
70+
let config =
71+
let format role n =
72+
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
73+
in
74+
Printf.sprintf "%s, %s"
75+
(format "nb adder" n_adders)
76+
(format "nb taker" n_takers)
77+
in
78+
Times.record ~budgetf ~n_domains ~init ~work ()
79+
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config
80+
81+
let run_suite ~budgetf =
82+
run_one_domain ~budgetf ()
83+
@ (Util.cross [ 1; 2; 4 ] [ 1; 2; 4 ]
84+
|> List.concat_map @@ fun (n_adders, n_takers) ->
85+
if Picos_domain.recommended_domain_count () < n_adders + n_takers then []
86+
else run_one ~budgetf ~n_adders ~n_takers ())

bench/dune

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
(run %{test} -brief "Picos binaries")
2424
(run %{test} -brief "Bounded_q with Picos_std_sync")
2525
(run %{test} -brief "Memory usage")
26+
(run %{test} -brief "Stack")
2627
;;
2728
))
2829
(foreign_stubs

bench/main.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ let benchmarks =
2222
("Picos binaries", Bench_binaries.run_suite);
2323
("Bounded_q with Picos_std_sync", Bench_bounded_q.run_suite);
2424
("Memory usage", Bench_memory.run_suite);
25+
("Stack", Bench_stack.run_suite);
2526
]
2627

2728
let () = Multicore_bench.Cmd.run ~benchmarks ()

lib/picos_std.sync/picos_std_sync.ml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ module Latch = Latch
99
module Barrier = Barrier
1010
module Ivar = Ivar
1111
module Stream = Stream
12+
module Stack = Stack

lib/picos_std.sync/picos_std_sync.mli

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,25 @@ module Stream : sig
753753
the [cursor] position. *)
754754
end
755755

756+
module Stack : sig
757+
(** *)
758+
759+
type !'a t
760+
(** *)
761+
762+
val create : ?padded:bool -> ?capacity:int -> unit -> 'a t
763+
(** *)
764+
765+
val push : 'a t -> 'a -> unit
766+
(** *)
767+
768+
exception Empty
769+
(** *)
770+
771+
val pop_exn : 'a t -> 'a
772+
(** *)
773+
end
774+
756775
(** {1 Examples}
757776
758777
{2 A simple bounded queue}

lib/picos_std.sync/stack.ml

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
open Picos_std_awaitable
2+
3+
type 'a state =
4+
| Nil of { mutable capacity : int }
5+
| Cons of { mutable capacity : int; value : 'a; rest : 'a state }
6+
7+
type 'a t = 'a state Awaitable.t
8+
9+
exception Empty
10+
11+
let busy_bit = 0b01
12+
let one = 0b10
13+
let max_capacity = Int.max_int / one
14+
15+
let create ?padded ?capacity () =
16+
let capacity =
17+
match capacity with
18+
| None -> max_capacity * one
19+
| Some capacity ->
20+
if capacity < 1 || max_capacity < capacity then invalid_arg "capacity"
21+
else capacity * one
22+
in
23+
Awaitable.make ?padded (Nil { capacity })
24+
25+
let rec push t value backoff =
26+
match Awaitable.get t with
27+
| Nil r as before ->
28+
let capacity = r.capacity land lnot busy_bit in
29+
if
30+
Awaitable.compare_and_set t before
31+
(Cons { capacity = capacity - one; value; rest = Nil { capacity } })
32+
then begin
33+
if r.capacity land busy_bit <> 0 then Awaitable.broadcast t
34+
end
35+
else push t value (Backoff.once backoff)
36+
| Cons r as before ->
37+
let capacity = r.capacity in
38+
if one <= capacity then begin
39+
if
40+
not
41+
(Awaitable.compare_and_set t before
42+
(Cons { capacity = capacity - one; value; rest = before }))
43+
then push t value (Backoff.once backoff)
44+
end
45+
else begin
46+
if capacity <> capacity lor busy_bit then
47+
r.capacity <- capacity lor busy_bit;
48+
Awaitable.await t before;
49+
push t value Backoff.default
50+
end
51+
52+
let rec pop_exn t backoff =
53+
match Awaitable.get t with
54+
| Nil _ -> raise_notrace Empty
55+
| Cons r as before ->
56+
if Awaitable.compare_and_set t before r.rest then begin
57+
if r.capacity land busy_bit <> 0 then Awaitable.broadcast t;
58+
r.value
59+
end
60+
else pop_exn t (Backoff.once backoff)
61+
62+
let[@inline] push t value = push t value Backoff.default
63+
let[@inline] pop_exn t = pop_exn t Backoff.default

0 commit comments

Comments
 (0)