Skip to content

Commit 62c2e51

Browse files
committed
New MPMC queue using cooperative pointer reversal
1 parent 608deec commit 62c2e51

File tree

3 files changed

+129
-196
lines changed

3 files changed

+129
-196
lines changed
Lines changed: 105 additions & 189 deletions
Original file line numberDiff line numberDiff line change
@@ -1,208 +1,124 @@
11
module Atomic = Multicore_magic.Transparent_atomic
22

3-
type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t }
3+
type 'a node = { mutable next : 'a node; index : int; mutable value : 'a }
4+
type 'a t = { head : 'a node Atomic.t; tail : 'a node Atomic.t }
45

5-
and ('a, _) tdt =
6-
| Cons : {
7-
counter : int;
8-
value : 'a;
9-
suffix : 'a head;
10-
}
11-
-> ('a, [> `Cons ]) tdt
12-
| Head : { counter : int } -> ('a, [> `Head ]) tdt
13-
| Snoc : {
14-
counter : int;
15-
prefix : 'a tail;
16-
value : 'a;
17-
}
18-
-> ('a, [> `Snoc ]) tdt
19-
| Tail : {
20-
counter : int;
21-
mutable move : ('a, [ `Snoc | `Used ]) tdt;
22-
}
23-
-> ('a, [> `Tail ]) tdt
24-
| Used : ('a, [> `Used ]) tdt
6+
exception Empty
257

26-
and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
27-
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]
8+
let[@inline] create_sentinel index =
9+
let sentinel = { next = Obj.magic index; index; value = Obj.magic index } in
10+
sentinel.next <- sentinel;
11+
sentinel
2812

29-
let create ?padded () =
30-
let head =
31-
Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as ?padded
32-
in
33-
let tail =
34-
Atomic.make (T (Tail { counter = 0; move = Used }))
35-
|> Multicore_magic.copy_as ?padded
36-
in
37-
Multicore_magic.copy_as ?padded { head; tail }
13+
let[@inline] maybe_fix tail =
14+
let mystery = tail.next in
15+
if mystery.index = tail.index - 1 then
16+
let prev = mystery in
17+
if prev.next != tail then prev.next <- tail
3818

39-
let rec rev (suffix : (_, [< `Cons ]) tdt) = function
40-
| T (Snoc { counter; prefix; value }) ->
41-
rev (Cons { counter; value; suffix = H suffix }) prefix
42-
| T (Tail _) -> suffix
19+
let create ?padded () =
20+
let sentinel = create_sentinel 0 in
21+
let head = Atomic.make sentinel |> Multicore_magic.copy_as ?padded in
22+
let tail = Atomic.make sentinel |> Multicore_magic.copy_as ?padded in
23+
{ head; tail } |> Multicore_magic.copy_as ?padded
4324

44-
let rev = function
45-
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) ->
46-
rev
47-
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
48-
prefix
25+
let rec push t value backoff =
26+
let tail_atomic = t.tail in
27+
let tail = Atomic.get tail_atomic in
28+
let new_tail = { next = tail; index = tail.index + 1; value } in
29+
maybe_fix tail;
30+
if Atomic.compare_and_set tail_atomic tail new_tail then tail.next <- new_tail
31+
else push t value (Backoff.once backoff)
4932

50-
let rec push t value backoff = function
51-
| T (Snoc snoc_r) as prefix ->
52-
let after = Snoc { counter = snoc_r.counter + 1; prefix; value } in
53-
if not (Atomic.compare_and_set t.tail prefix (T after)) then
54-
let backoff = Backoff.once backoff in
55-
push t value backoff (Atomic.fenceless_get t.tail)
56-
| T (Tail tail_r) as prefix -> begin
57-
match tail_r.move with
58-
| Used ->
59-
let after = Snoc { counter = tail_r.counter + 1; prefix; value } in
60-
if not (Atomic.compare_and_set t.tail prefix (T after)) then
61-
let backoff = Backoff.once backoff in
62-
push t value backoff (Atomic.fenceless_get t.tail)
63-
| Snoc move_r as move ->
64-
begin
65-
match Atomic.get t.head with
66-
| H (Head head_r as head) when head_r.counter < move_r.counter ->
67-
let after = rev move in
68-
if
69-
Atomic.fenceless_get t.head == H head
70-
&& Atomic.compare_and_set t.head (H head) (H after)
71-
then tail_r.move <- Used
72-
| _ -> tail_r.move <- Used
73-
end;
74-
push t value backoff (Atomic.get t.tail)
33+
let rec pop_exn t backoff =
34+
let head_atomic = t.head in
35+
let head = Atomic.get head_atomic in
36+
let next = head.next in
37+
if head.index + 1 = next.index then
38+
if Atomic.compare_and_set head_atomic head next then begin
39+
let value = next.value in
40+
next.value <- Obj.magic ();
41+
value
7542
end
76-
77-
exception Empty
78-
79-
let rec pop t backoff = function
80-
| H (Cons cons_r as cons) ->
81-
if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value
82-
else
83-
let backoff = Backoff.once backoff in
84-
pop t backoff (Atomic.fenceless_get t.head)
85-
| H (Head head_r as head) -> begin
86-
match Atomic.get t.tail with
87-
| T (Snoc snoc_r as move) ->
88-
if head_r.counter = snoc_r.counter then
89-
if Atomic.compare_and_set t.tail (T move) snoc_r.prefix then
90-
snoc_r.value
91-
else pop t backoff (Atomic.fenceless_get t.head)
92-
else
93-
let (Tail tail_r as tail : (_, [ `Tail ]) tdt) =
94-
Tail { counter = snoc_r.counter; move }
95-
in
96-
let new_head = Atomic.get t.head in
97-
if new_head != H head then pop t backoff new_head
98-
else if Atomic.compare_and_set t.tail (T move) (T tail) then
99-
let (Cons cons_r) = rev move in
100-
let after = cons_r.suffix in
101-
let new_head = Atomic.get t.head in
102-
if new_head != H head then pop t backoff new_head
103-
else if Atomic.compare_and_set t.head (H head) after then begin
104-
tail_r.move <- Used;
105-
cons_r.value
106-
end
107-
else
108-
let backoff = Backoff.once backoff in
109-
pop t backoff (Atomic.fenceless_get t.head)
110-
else pop t backoff (Atomic.fenceless_get t.head)
111-
| T (Tail tail_r) -> begin
112-
match tail_r.move with
113-
| Used ->
114-
let new_head = Atomic.get t.head in
115-
if new_head != H head then pop t backoff new_head
116-
else raise_notrace Empty
117-
| Snoc move_r as move ->
118-
if head_r.counter < move_r.counter then
119-
let (Cons cons_r) = rev move in
120-
let after = cons_r.suffix in
121-
let new_head = Atomic.get t.head in
122-
if new_head != H head then pop t backoff new_head
123-
else if Atomic.compare_and_set t.head (H head) after then begin
124-
tail_r.move <- Used;
125-
cons_r.value
126-
end
127-
else
128-
let backoff = Backoff.once backoff in
129-
pop t backoff (Atomic.fenceless_get t.head)
130-
else
131-
let new_head = Atomic.get t.head in
132-
if new_head != H head then pop t backoff new_head
133-
else raise_notrace Empty
134-
end
43+
else pop_exn t (Backoff.once backoff)
44+
else
45+
let tail = Atomic.get t.tail in
46+
if tail == head then raise_notrace Empty
47+
else begin
48+
maybe_fix tail;
49+
pop_exn t Backoff.default
13550
end
13651

13752
let rec push_head t value backoff =
138-
match Atomic.get t.head with
139-
| H (Cons cons_r) as suffix ->
140-
let after = Cons { counter = cons_r.counter - 1; value; suffix } in
141-
if not (Atomic.compare_and_set t.head suffix (H after)) then
142-
push_head t value (Backoff.once backoff)
143-
| H (Head head_r) as head -> begin
144-
match Atomic.get t.tail with
145-
| T (Snoc snoc_r as move) ->
146-
if Atomic.get t.head != head then push_head t value backoff
147-
else if head_r.counter = snoc_r.counter then begin
148-
let prefix = T (Snoc { snoc_r with value }) in
149-
let after =
150-
Snoc { snoc_r with counter = snoc_r.counter + 1; prefix }
151-
in
152-
if not (Atomic.compare_and_set t.tail (T move) (T after)) then
153-
push_head t value (Backoff.once backoff)
154-
end
155-
else
156-
let tail = Tail { counter = snoc_r.counter; move } in
157-
let backoff =
158-
if Atomic.compare_and_set t.tail (T move) (T tail) then backoff
159-
else Backoff.once backoff
160-
in
161-
push_head t value backoff
162-
| T (Tail tail_r) as prefix -> begin
163-
match tail_r.move with
164-
| Used ->
165-
if Atomic.get t.head == head then begin
166-
let tail =
167-
Snoc { counter = tail_r.counter + 1; value; prefix }
168-
in
169-
if not (Atomic.compare_and_set t.tail prefix (T tail)) then
170-
push_head t value (Backoff.once backoff)
171-
end
172-
else push_head t value backoff
173-
| Snoc move_r as move ->
174-
begin
175-
match Atomic.get t.head with
176-
| H (Head head_r as head) when head_r.counter < move_r.counter
177-
->
178-
let after = rev move in
179-
if
180-
Atomic.fenceless_get t.head == H head
181-
&& Atomic.compare_and_set t.head (H head) (H after)
182-
then tail_r.move <- Used
183-
| _ -> tail_r.move <- Used
184-
end;
185-
push_head t value backoff
186-
end
53+
let head = Atomic.get t.head in
54+
let next = head.next in
55+
let index = head.index in
56+
if index + 1 = next.index then begin
57+
let new_next = { next; index; value } in
58+
let index = index - 1 in
59+
let new_head = { next = new_next; index; value = Obj.magic index } in
60+
if not (Atomic.compare_and_set t.head head new_head) then
61+
push_head t value (Backoff.once backoff)
62+
end
63+
else
64+
let tail = Atomic.get t.tail in
65+
if tail == head then
66+
let new_tail = { next = tail; index = tail.index + 1; value } in
67+
if Atomic.compare_and_set t.tail tail new_tail then tail.next <- new_tail
68+
else push_head t value (Backoff.once backoff)
69+
else begin
70+
maybe_fix tail;
71+
push_head t value Backoff.default
18772
end
18873

18974
let rec length t =
190-
let head = Atomic.get t.head in
191-
let tail = Atomic.fenceless_get t.tail in
192-
if head != Atomic.get t.head then length t
193-
else
194-
let head_at =
195-
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
196-
in
197-
let tail_at =
198-
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
199-
in
200-
tail_at - head_at + 1
75+
let tail_atomic = t.tail in
76+
let head_atomic = t.head in
77+
let tail = Atomic.get tail_atomic in
78+
let head = Atomic.fenceless_get head_atomic in
79+
if tail == Atomic.get tail_atomic then tail.index - head.index else length t
20180

202-
let[@inline] is_empty t = length t == 0
203-
let[@inline] pop_exn t = pop t Backoff.default (Atomic.fenceless_get t.head)
81+
type ('a, _) res = Seq : ('a, 'a Seq.t) res | Array : ('a, 'a array) res
20482

205-
let[@inline] push t value =
206-
push t value Backoff.default (Atomic.fenceless_get t.tail)
83+
let rec pop_all_as : type a r. a t -> (a, r) res -> _ -> r =
84+
fun t result backoff ->
85+
let head = Atomic.get t.head in
86+
let next = head.next in
87+
if head.index + 1 = next.index then begin
88+
let new_sentinel = create_sentinel head.index in
89+
if Atomic.compare_and_set t.head head new_sentinel then begin
90+
(* TODO: not lock-free. *)
91+
let tail = Atomic.exchange t.tail new_sentinel in
92+
maybe_fix tail;
93+
match result with
94+
| Seq ->
95+
let rec to_seq work tail () =
96+
Seq.Cons
97+
( work.value,
98+
if work == tail then Seq.empty else to_seq work.next tail )
99+
in
100+
to_seq head.next tail
101+
| Array ->
102+
let n = tail.index - head.index in
103+
let work = ref head.next in
104+
Array.init n @@ fun _ ->
105+
let node = !work in
106+
work := node.next;
107+
node.value
108+
end
109+
else pop_all_as t result (Backoff.once backoff)
110+
end
111+
else
112+
let tail = Atomic.get t.tail in
113+
if tail == head then match result with Seq -> Seq.empty | Array -> [||]
114+
else begin
115+
maybe_fix tail;
116+
pop_all_as t result Backoff.default
117+
end
207118

119+
let[@inline] push t value = push t value Backoff.default
120+
let[@inline] pop_exn t = pop_exn t Backoff.default
121+
let[@inline] pop_all t = pop_all_as t Seq Backoff.default
122+
let[@inline] pop_all_as_array t = pop_all_as t Array Backoff.default
208123
let[@inline] push_head t value = push_head t value Backoff.default
124+
let[@inline] is_empty t = length t == 0

lib/picos_aux.mpmcq/picos_aux_mpmcq.mli

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,13 @@
66
threads attempt to pop fibers from the queues of other threads when their
77
local queues are empty. It is also possible to use only a single shared
88
queue, but that will result in very high contention as this queue is not
9-
relaxed. *)
9+
relaxed.
10+
11+
⚠️ The {!pop_all} and {!pop_all_as_array} operation are not lock-free and
12+
prevent concurrent {!pop_exn}, {!pop_all}, {!pop_all_as_array}, and
13+
{!push_head} operations from making progress until the operation has reached
14+
its linearization point. Other concurrent operations are not prevented from
15+
making progress. *)
1016

1117
(** {1 API} *)
1218

@@ -31,6 +37,14 @@ val pop_exn : 'a t -> 'a
3137
3238
@raise Empty in case the queue was empty. *)
3339

40+
val pop_all : 'a t -> 'a Seq.t
41+
(** [pop_all queue] removes all values from the [queue] and returns them as a
42+
sequence. *)
43+
44+
val pop_all_as_array : 'a t -> 'a array
45+
(** [pop_all_as_array queue] removes all values from the [queue] and returns
46+
them as an array. *)
47+
3448
val length : 'a t -> int
3549
(** [length queue] returns the length or the number of values in the [queue]. *)
3650

@@ -60,11 +74,8 @@ val is_empty : 'a t -> bool
6074
# Picos_aux_mpmcq.pop_exn q
6175
- : int = 76
6276
63-
# Picos_aux_mpmcq.pop_exn q
64-
- : int = 42
65-
66-
# Picos_aux_mpmcq.pop_exn q
67-
- : int = 101
77+
# Picos_aux_mpmcq.pop_all_as_array q
78+
- : int array = [|42; 101|]
6879
6980
# Picos_aux_mpmcq.pop_exn q
7081
Exception: Picos_aux_mpmcq.Empty.

0 commit comments

Comments
 (0)