Skip to content

Commit db9eade

Browse files
committed
Add add_nonblocking, capacity and is_full for streams
1 parent fdd2593 commit db9eade

File tree

5 files changed

+130
-2
lines changed

5 files changed

+130
-2
lines changed

lib_eio/stream.ml

+35
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
type drop_priority = Newest | Oldest
2+
13
module Locking = struct
24
type 'a t = {
35
mutex : Mutex.t;
@@ -64,6 +66,26 @@ module Locking = struct
6466
)
6567
)
6668

69+
let add_nonblocking ~drop_priority t item =
70+
Mutex.lock t.mutex;
71+
match Waiters.wake_one t.readers item with
72+
| `Ok -> Mutex.unlock t.mutex; None
73+
| `Queue_empty ->
74+
(* No-one is waiting for an item. Queue it. *)
75+
if Queue.length t.items < t.capacity then (
76+
Queue.add item t.items;
77+
Mutex.unlock t.mutex;
78+
None
79+
) else (
80+
match drop_priority with
81+
| Newest -> Mutex.unlock t.mutex; Some item
82+
| Oldest ->
83+
let dropped_item = Queue.take t.items in
84+
Queue.add item t.items;
85+
Mutex.unlock t.mutex;
86+
Some dropped_item
87+
)
88+
6789
let take t =
6890
Mutex.lock t.mutex;
6991
match Queue.take_opt t.items with
@@ -101,6 +123,8 @@ module Locking = struct
101123
let len = Queue.length t.items in
102124
Mutex.unlock t.mutex;
103125
len
126+
127+
let capacity t = t.capacity
104128

105129
let dump f t =
106130
Fmt.pf f "<Locking stream: %d/%d items>" (length t) t.capacity
@@ -123,6 +147,11 @@ let take = function
123147
| Sync x -> Sync.take x |> Result.get_ok (* todo: allow closing streams *)
124148
| Locking x -> Locking.take x
125149

150+
let add_nonblocking ~drop_priority t v =
151+
match t with
152+
| Sync x -> if Sync.put_nonblocking x v then None else (Some v)
153+
| Locking x -> Locking.add_nonblocking ~drop_priority x v
154+
126155
let take_nonblocking = function
127156
| Locking x -> Locking.take_nonblocking x
128157
| Sync x ->
@@ -134,8 +163,14 @@ let length = function
134163
| Sync _ -> 0
135164
| Locking x -> Locking.length x
136165

166+
let capacity = function
167+
| Sync _ -> 0
168+
| Locking x -> Locking.capacity x
169+
137170
let is_empty t = (length t = 0)
138171

172+
let is_full t = (length t = capacity t)
173+
139174
let dump f = function
140175
| Sync x -> Sync.dump f x
141176
| Locking x -> Locking.dump f x

lib_eio/stream.mli

+21
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,21 @@ val take : 'a t -> 'a
3333
3434
If no items are available, it waits until one becomes available. *)
3535

36+
type drop_priority = Newest | Oldest
37+
38+
val add_nonblocking : drop_priority: drop_priority -> 'a t -> 'a -> 'a option
39+
(** [add_nonblocking ~drop_priority t item] is like [(add t item); None] except that
40+
it returns [Some dropped_item] if the stream is full rather than waiting, where
41+
[dropped_item] is [item] if [drop_priority = Newest], and the first element of the
42+
stream if [drop_priority = Oldest].
43+
44+
In other words, if the stream is full then:
45+
- [add_nonblocking ~drop_priority:Newest t item] is like [Some item]; and
46+
- [add_nonblocking ~drop_priority:Oldest t item] is like
47+
[let dropped_item = take t in add t item; Some dropped_item]
48+
except that no other stream operation can happen (even in other threads)
49+
between the [take] and the [add]. *)
50+
3651
val take_nonblocking : 'a t -> 'a option
3752
(** [take_nonblocking t] is like [Some (take t)] except that
3853
it returns [None] if the stream is empty rather than waiting.
@@ -43,8 +58,14 @@ val take_nonblocking : 'a t -> 'a option
4358
val length : 'a t -> int
4459
(** [length t] returns the number of items currently in [t]. *)
4560

61+
val capacity : 'a t -> int
62+
(** [capacity t] returns the number of items [t] can hold without blocking writers. *)
63+
4664
val is_empty : 'a t -> bool
4765
(** [is_empty t] is [length t = 0]. *)
4866

67+
val is_full : 'a t -> bool
68+
(** [is_full t] is [length t = capacity t]. *)
69+
4970
val dump : 'a t Fmt.t
5071
(** For debugging. *)

lib_eio/sync.ml

+16-2
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,22 @@ let rec put (t : _ t) v =
441441
~suspend:(put_suspend t v)
442442
~closed:(fun () -> raise put_closed_err)
443443

444+
let reject = Slot (fun _ -> false)
445+
446+
let put_nonblocking (t : _ t) v =
447+
match Balance.incr_if_negative t.balance with
448+
| Balance_closed -> raise put_closed_err
449+
| Update_refused -> false
450+
| Updated ->
451+
let rec aux cell =
452+
producer_resume_cell t cell
453+
~success:(fun kc -> (if kc (Ok v) then () else put t v); true)
454+
~in_transition:(fun cell ->
455+
Domain.cpu_relax ();
456+
if Atomic.compare_and_set cell In_transition reject then false
457+
else aux cell)
458+
in aux (Q.next_resume t.consumers)
459+
444460
(* Taking. *)
445461

446462
(* Mirror of [producer_resume_cell]. *)
@@ -489,8 +505,6 @@ let take t =
489505
: (_, [ `Closed ]) result
490506
:> (_, [> `Closed ]) result)
491507

492-
let reject = Slot (fun _ -> false)
493-
494508
let take_nonblocking (t : _ t) =
495509
match Balance.decr_if_positive t.balance with
496510
| Balance_closed -> Error `Closed

lib_eio/sync.mli

+6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ val put : 'a t -> 'a -> unit
2222
2323
@raise Invalid_argument if [t] was closed before [x] was added. *)
2424

25+
val put_nonblocking : 'a t -> 'a -> bool
26+
(** [put_nonblocking t x] is like [put t x; true], except that it returns [false]
27+
instead of waiting if no consumer is immediately available.
28+
29+
@raise Invalid_argument if [t] was closed before [x] was added. *)
30+
2531
val take : 'a t -> ('a, [> `Closed]) result
2632
(** [take t] waits until a producer is available with an item and then returns it.
2733

tests/stream.md

+52
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@ let add t v =
2020
S.add t v;
2121
traceln "Added %d to stream" v
2222
23+
let add_nonblocking ~drop_priority t v =
24+
traceln "Adding %d to stream" v;
25+
match S.add_nonblocking ~drop_priority t v with
26+
| None -> traceln "Added %d to stream" v
27+
| Some d ->
28+
match drop_priority, S.capacity t with
29+
| Newest, _ | _, 0 -> assert (d = v); traceln "Dropped %i instead of adding it to stream" v
30+
| Oldest, _ -> traceln "Dropped %i from stream and added %i to stream" d v
31+
32+
2333
let take t =
2434
traceln "Reading from stream";
2535
traceln "Got %d from stream" (S.take t)
@@ -320,6 +330,48 @@ Cancelling writing to a stream:
320330
- : unit = ()
321331
```
322332

333+
Non-blocking add (capacity = 1):
334+
335+
```ocaml
336+
# run @@ fun () ->
337+
let t = S.create 1 in
338+
add t 0;
339+
add_nonblocking ~drop_priority:Newest t 1;
340+
add_nonblocking ~drop_priority:Oldest t 2;;
341+
+Adding 0 to stream
342+
+Added 0 to stream
343+
+Adding 1 to stream
344+
+Dropped 1 instead of adding it to stream
345+
+Adding 2 to stream
346+
+Dropped 0 from stream and added 2 to stream
347+
- : unit = ()
348+
```
349+
350+
Non-blocking add (capacity = 0):
351+
352+
```ocaml
353+
# run @@ fun () ->
354+
let t = S.create 0 in
355+
Fiber.both
356+
(fun () -> ignore @@ take t; ignore @@ take t)
357+
(fun () ->
358+
add_nonblocking ~drop_priority:Newest t 1;
359+
add_nonblocking ~drop_priority:Newest t 2;
360+
Fiber.yield ();
361+
add_nonblocking ~drop_priority:Newest t 3);;
362+
+Reading from stream
363+
+Adding 1 to stream
364+
+Added 1 to stream
365+
+Adding 2 to stream
366+
+Dropped 2 instead of adding it to stream
367+
+Got 1 from stream
368+
+Reading from stream
369+
+Adding 3 to stream
370+
+Added 3 to stream
371+
+Got 3 from stream
372+
- : unit = ()
373+
```
374+
323375
Non-blocking take:
324376

325377
```ocaml

0 commit comments

Comments
 (0)