@@ -19,7 +19,7 @@ type 'a slot = 'a option ref
19
19
module Cell = struct
20
20
(* The possible behaviours are:
21
21
22
- 1. Suspender : In_transition -> Request Suspender waits for a resource
22
+ 1. Suspender : In_transition -> Request Suspender waits for a resource
23
23
1.1. Resumer : Request -> Finished Resumer then providers a resource
24
24
1.2. Suspender : Request -> Finished Suspender cancels
25
25
2. Resumer : In_transition -> Resource Resumer provides a spare resource
@@ -89,11 +89,10 @@ let cancel segment cell =
89
89
| In_transition | Resource _ -> assert false (* Can't get here from [Request]. *)
90
90
91
91
(* If [t] is under capacity, add another (empty) slot. *)
92
- let rec maybe_add_slot t =
93
- let current = Atomic. get t.slots in
92
+ let rec maybe_add_slot t current =
94
93
if current < t.max_slots then (
95
94
if Atomic. compare_and_set t.slots current (current + 1 ) then add t (ref None )
96
- else maybe_add_slot t (* Concurrent update; try again *)
95
+ else maybe_add_slot t ( Atomic. get t.slots) (* Concurrent update; try again *)
97
96
)
98
97
99
98
(* [run_with t f slot] ensures that [slot] contains a valid resource and then runs [f resource] with it.
@@ -114,25 +113,46 @@ let run_with t f slot =
114
113
f x
115
114
end
116
115
with
117
- | r ->
116
+ | r ->
118
117
add t slot;
119
118
r
120
119
| exception ex ->
121
120
let bt = Printexc. get_raw_backtrace () in
122
121
add t slot;
123
122
Printexc. raise_with_backtrace ex bt
124
123
125
- let use t f =
124
+ (* Creates a fresh resource [x], runs [f x], then disposes of [x] *)
125
+ let run_new_and_dispose t f =
126
+ let x = t.alloc () in
127
+ match f x with
128
+ | r ->
129
+ t.dispose x;
130
+ r
131
+ | exception ex ->
132
+ let bt = Printexc. get_raw_backtrace () in
133
+ t.dispose x;
134
+ Printexc. raise_with_backtrace ex bt
135
+
136
+ let use t ?(never_block =false ) f =
126
137
let segment, cell = Q. next_suspend t.q in
127
138
match Atomic. get cell with
128
139
| Finished | Request _ -> assert false
129
140
| Resource slot ->
130
141
Atomic. set cell Finished ; (* Allow value to be GC'd *)
131
142
run_with t f slot
132
143
| In_transition ->
133
- (* It would have been better if more resources were available.
134
- If we still have capacity, add a new slot now. *)
135
- maybe_add_slot t;
144
+ let current = Atomic. get t.slots in
145
+ match current < t.max_slots with
146
+ | false when never_block -> (
147
+ (* We are at capacity, but cannot block.
148
+ Create a new resource to run f but don't add it to the pool. *)
149
+ match Atomic. exchange cell Finished with
150
+ | Resource slot -> run_with t f slot
151
+ | _ -> run_new_and_dispose t f
152
+ )
153
+ | can_add ->
154
+ (* Create a slot if not at capacity. *)
155
+ if can_add then maybe_add_slot t current;
136
156
(* No item is available right now. Start waiting *)
137
157
let slot =
138
158
Suspend. enter_unchecked " Pool.acquire" (fun ctx enqueue ->
0 commit comments