Skip to content

Commit 3be614e

Browse files
authored
Merge pull request #657 from SGrondin/pool-never-block
Add Eio.Pool.use ~never_block
2 parents 1776925 + 321bc09 commit 3be614e

File tree

3 files changed

+68
-11
lines changed

3 files changed

+68
-11
lines changed

lib_eio/pool.ml

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type 'a slot = 'a option ref
1919
module Cell = struct
2020
(* The possible behaviours are:
2121
22-
1. Suspender : In_transition -> Request Suspender waits for a resource
22+
1. Suspender : In_transition -> Request Suspender waits for a resource
2323
1.1. Resumer : Request -> Finished Resumer then providers a resource
2424
1.2. Suspender : Request -> Finished Suspender cancels
2525
2. Resumer : In_transition -> Resource Resumer provides a spare resource
@@ -89,11 +89,10 @@ let cancel segment cell =
8989
| In_transition | Resource _ -> assert false (* Can't get here from [Request]. *)
9090

9191
(* If [t] is under capacity, add another (empty) slot. *)
92-
let rec maybe_add_slot t =
93-
let current = Atomic.get t.slots in
92+
let rec maybe_add_slot t current =
9493
if current < t.max_slots then (
9594
if Atomic.compare_and_set t.slots current (current + 1) then add t (ref None)
96-
else maybe_add_slot t (* Concurrent update; try again *)
95+
else maybe_add_slot t (Atomic.get t.slots) (* Concurrent update; try again *)
9796
)
9897

9998
(* [run_with t f slot] ensures that [slot] contains a valid resource and then runs [f resource] with it.
@@ -114,25 +113,46 @@ let run_with t f slot =
114113
f x
115114
end
116115
with
117-
| r ->
116+
| r ->
118117
add t slot;
119118
r
120119
| exception ex ->
121120
let bt = Printexc.get_raw_backtrace () in
122121
add t slot;
123122
Printexc.raise_with_backtrace ex bt
124123

125-
let use t f =
124+
(* Creates a fresh resource [x], runs [f x], then disposes of [x] *)
125+
let run_new_and_dispose t f =
126+
let x = t.alloc () in
127+
match f x with
128+
| r ->
129+
t.dispose x;
130+
r
131+
| exception ex ->
132+
let bt = Printexc.get_raw_backtrace () in
133+
t.dispose x;
134+
Printexc.raise_with_backtrace ex bt
135+
136+
let use t ?(never_block=false) f =
126137
let segment, cell = Q.next_suspend t.q in
127138
match Atomic.get cell with
128139
| Finished | Request _ -> assert false
129140
| Resource slot ->
130141
Atomic.set cell Finished; (* Allow value to be GC'd *)
131142
run_with t f slot
132143
| In_transition ->
133-
(* It would have been better if more resources were available.
134-
If we still have capacity, add a new slot now. *)
135-
maybe_add_slot t;
144+
let current = Atomic.get t.slots in
145+
match current < t.max_slots with
146+
| false when never_block -> (
147+
(* We are at capacity, but cannot block.
148+
Create a new resource to run f but don't add it to the pool. *)
149+
match Atomic.exchange cell Finished with
150+
| Resource slot -> run_with t f slot
151+
| _ -> run_new_and_dispose t f
152+
)
153+
| can_add ->
154+
(* Create a slot if not at capacity. *)
155+
if can_add then maybe_add_slot t current;
136156
(* No item is available right now. Start waiting *)
137157
let slot =
138158
Suspend.enter_unchecked "Pool.acquire" (fun ctx enqueue ->

lib_eio/pool.mli

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ val create :
3535
If it raises, the exception is passed on to the user,
3636
but resource is still considered to have been disposed. *)
3737

38-
val use : 'a t -> ('a -> 'b) -> 'b
38+
val use : 'a t -> ?never_block:bool -> ('a -> 'b) -> 'b
3939
(** [use t fn] waits for some resource [x] to be available and then runs [f x].
40-
Afterwards (on success or error), [x] is returned to the pool. *)
40+
Afterwards (on success or error), [x] is returned to the pool.
41+
42+
@param never_block If [true] and the pool has reached maximum capacity,
43+
then a fresh resource is created to ensure that this [use]
44+
call does not wait for a resource to become available.
45+
This resource is immediately disposed after [f x] returns.
46+
*)

tests/pool.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,37 @@ Two uses with a capacity of 2; they run in parallel:
7777
- : unit = ()
7878
```
7979

80+
Capacity of 1; two uses that cannot block and two normal uses; first 2 are parallel, next 2 are sequential.
81+
Note that the pool always suspends the calling fiber when creating a new slot,
82+
even if the fiber ends up providing the new slot to itself,
83+
which is why the items get assigned out of order in this test.
84+
85+
```ocaml
86+
# Eio_mock.Backend.run @@ fun () ->
87+
let p0, r0 = Promise.create () in
88+
let p1, r1 = Promise.create () in
89+
let t = create 1 [p0; p1] ~dispose in
90+
Fiber.all [
91+
(fun () -> P.use t ~never_block:true (fun x -> traceln "A: using item %d" x; Fiber.yield (); traceln "A done"));
92+
(fun () -> P.use t ~never_block:true (fun x -> traceln "B: using item %d" x; Fiber.yield (); traceln "B done"));
93+
(fun () -> P.use t (fun x -> traceln "C: using item %d" x; Fiber.yield (); traceln "C done"));
94+
(fun () -> P.use t (fun x -> traceln "D: using item %d" x; Fiber.yield (); traceln "D done"));
95+
(fun () -> Promise.resolve r0 (Ok 0); Promise.resolve r1 (Ok 1));
96+
];
97+
+Creating item 0
98+
+Creating item 1
99+
+A: using item 1
100+
+B: using item 0
101+
+A done
102+
+B done
103+
+disposing 0
104+
+C: using item 1
105+
+C done
106+
+D: using item 1
107+
+D done
108+
- : unit = ()
109+
```
110+
80111
## Cancellation
81112

82113
```ocaml

0 commit comments

Comments
 (0)