Skip to content

Commit 165b45e

Browse files
committed
2 parents 9d2da47 + 792f619 commit 165b45e

File tree

10 files changed

+190
-205
lines changed

10 files changed

+190
-205
lines changed

src/core/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ let () = Jbuild_plugin.V1.send @@ {|
2626
(synopsis "Monadic promises and concurrent I/O")
2727
(wrapped false)
2828
|} ^ preprocess ^ {|
29-
(libraries bytes result seq)
29+
(libraries bytes result seq domainslib)
3030
(flags (:standard -w +A-29)))
3131

3232
(documentation

src/core/lwt_lock.ml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
(** Locks for multicore*)
2+
module C = Domainslib.Chan
3+
4+
type t = int C.t
5+
6+
let create_lock () =
7+
let c = C.make_bounded 1 in
8+
C.send c 0;
9+
c
10+
11+
let acquire_lock lock =
12+
C.recv lock |> ignore
13+
14+
let release_lock lock =
15+
C.send lock 0
16+
17+
let is_locked lock =
18+
let c = C.recv_poll lock in
19+
match c with
20+
| Some v ->
21+
C.send lock v;
22+
false
23+
| None ->
24+
true
25+
26+
let try_acquire lock =
27+
match C.recv_poll lock with
28+
| Some _ -> true
29+
| None -> false
30+
31+
let try_release lock =
32+
C.send_poll lock 0

src/core/lwt_mutex.ml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,25 @@ type t = { mutable locked : bool; mutable waiters : unit Lwt.u Lwt_sequence.t }
1818

1919
let create () = { locked = false; waiters = Lwt_sequence.create () }
2020

21+
let l = Lwt_lock.create_lock ()
22+
2123
let lock m =
2224
if m.locked then
2325
(Lwt.add_task_r [@ocaml.warning "-3"]) m.waiters
2426
else begin
27+
Lwt_lock.acquire_lock l;
2528
m.locked <- true;
29+
Lwt_lock.release_lock l;
2630
Lwt.return_unit
2731
end
2832

2933
let unlock m =
3034
if m.locked then begin
31-
if Lwt_sequence.is_empty m.waiters then
32-
m.locked <- false
35+
if Lwt_sequence.is_empty m.waiters then begin
36+
Lwt_lock.acquire_lock l;
37+
m.locked <- false;
38+
Lwt_lock.release_lock l
39+
end
3340
else
3441
(* We do not use [Lwt.wakeup] here to avoid a stack overflow
3542
when unlocking a lot of threads. *)

src/core/lwt_mvar.ml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ type 'a t = {
4949
(* Threads waiting for a value *)
5050
}
5151

52+
let lock = Lwt_lock.create_lock ()
53+
5254
let create_empty () =
5355
{ mvar_contents = None;
5456
writers = Lwt_sequence.create ();
@@ -60,6 +62,7 @@ let create v =
6062
readers = Lwt_sequence.create () }
6163

6264
let put mvar v =
65+
Lwt_lock.acquire_lock lock;
6366
match mvar.mvar_contents with
6467
| None ->
6568
begin match Lwt_sequence.take_opt_l mvar.readers with
@@ -68,11 +71,13 @@ let put mvar v =
6871
| Some w ->
6972
Lwt.wakeup_later w v
7073
end;
74+
Lwt_lock.release_lock lock;
7175
Lwt.return_unit
7276
| Some _ ->
7377
let (res, w) = Lwt.task () in
7478
let node = Lwt_sequence.add_r (v, w) mvar.writers in
7579
Lwt.on_cancel res (fun _ -> Lwt_sequence.remove node);
80+
Lwt_lock.release_lock lock;
7681
res
7782

7883
let next_writer mvar =
@@ -86,7 +91,9 @@ let next_writer mvar =
8691
let take_available mvar =
8792
match mvar.mvar_contents with
8893
| Some v ->
94+
Lwt_lock.acquire_lock lock;
8995
next_writer mvar;
96+
Lwt_lock.release_lock lock;
9097
Some v
9198
| None ->
9299
None

src/core/lwt_sequence.ml

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type 'a node = {
2020
external seq_of_node : 'a node -> 'a t = "%identity"
2121
external node_of_seq : 'a t -> 'a node = "%identity"
2222

23+
let lock = Lwt_lock.create_lock ()
2324
(* +-----------------------------------------------------------------+
2425
| Operations on nodes |
2526
+-----------------------------------------------------------------+ *)
@@ -28,14 +29,20 @@ let get node =
2829
node.node_data
2930

3031
let set node data =
31-
node.node_data <- data
32+
Lwt_lock.acquire_lock lock;
33+
node.node_data <- data;
34+
Lwt_lock.release_lock lock
3235

3336
let remove node =
3437
if node.node_active then begin
38+
let is_locked = Lwt_lock.try_acquire lock in
3539
node.node_active <- false;
3640
let seq = seq_of_node node in
3741
seq.prev.next <- seq.next;
38-
seq.next.prev <- seq.prev
42+
seq.next.prev <- seq.prev;
43+
match is_locked with
44+
| true -> Lwt_lock.release_lock lock
45+
| false -> ()
3946
end
4047

4148
(* +-----------------------------------------------------------------+
@@ -62,32 +69,40 @@ let length seq =
6269
loop seq.next 0
6370

6471
let add_l data seq =
72+
Lwt_lock.acquire_lock lock;
6573
let node = { node_prev = seq; node_next = seq.next; node_data = data; node_active = true } in
6674
seq.next.prev <- seq_of_node node;
6775
seq.next <- seq_of_node node;
76+
Lwt_lock.release_lock lock;
6877
node
6978

7079
let add_r data seq =
80+
Lwt_lock.acquire_lock lock;
7181
let node = { node_prev = seq.prev; node_next = seq; node_data = data; node_active = true } in
7282
seq.prev.next <- seq_of_node node;
7383
seq.prev <- seq_of_node node;
84+
Lwt_lock.release_lock lock;
7485
node
7586

7687
let take_l seq =
7788
if is_empty seq then
7889
raise Empty
7990
else begin
91+
Lwt_lock.acquire_lock lock;
8092
let node = node_of_seq seq.next in
8193
remove node;
94+
Lwt_lock.release_lock lock;
8295
node.node_data
8396
end
8497

8598
let take_r seq =
8699
if is_empty seq then
87100
raise Empty
88101
else begin
102+
Lwt_lock.acquire_lock lock;
89103
let node = node_of_seq seq.prev in
90104
remove node;
105+
Lwt_lock.release_lock lock;
91106
node.node_data
92107
end
93108

@@ -110,20 +125,24 @@ let take_opt_r seq =
110125
end
111126

112127
let transfer_l s1 s2 =
128+
Lwt_lock.acquire_lock lock;
113129
s2.next.prev <- s1.prev;
114130
s1.prev.next <- s2.next;
115131
s2.next <- s1.next;
116132
s1.next.prev <- s2;
117133
s1.prev <- s1;
118-
s1.next <- s1
134+
s1.next <- s1;
135+
Lwt_lock.release_lock lock
119136

120137
let transfer_r s1 s2 =
138+
Lwt_lock.acquire_lock lock;
121139
s2.prev.next <- s1.next;
122140
s1.next.prev <- s2.prev;
123141
s2.prev <- s1.prev;
124142
s1.prev.next <- s2;
125143
s1.prev <- s1;
126-
s1.next <- s1
144+
s1.next <- s1;
145+
Lwt_lock.release_lock lock
127146

128147
let iter_l f seq =
129148
let rec loop curr =

src/unix/dune

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ let () = Jbuild_plugin.V1.send @@ {|
4242
(synopsis "Unix support for Lwt")
4343
(optional)
4444
(wrapped false)
45-
(libraries bigarray lwt mmap ocplib-endian.bigstring threads unix)
45+
(libraries bigarray lwt mmap ocplib-endian.bigstring threads unix domainslib)
4646
|} ^ preprocess ^ {|
4747
(c_names
4848
lwt_unix_stubs

0 commit comments

Comments
 (0)