Skip to content

Commit e1795c5

Browse files
committed
Remove semaphore based bulkhead impl
1 parent c302820 commit e1795c5

File tree

2 files changed

+128
-109
lines changed

2 files changed

+128
-109
lines changed

src/promesa/exec/bulkhead.clj

Lines changed: 93 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@
3838
(-poll! [_])
3939
(-offer! [_ _] [_ _ _]))
4040

41-
(defprotocol IBulkhead
42-
"Bulkhead main API"
43-
(-invoke [_ f] "Call a function in bulkhead context and return a result"))
41+
;; (defprotocol IBulkhead
42+
;; "Bulkhead main API"
43+
;; (-invoke [_ f] "Call a function in bulkhead context and return a result"))
4444

4545
(extend-type BlockingQueue
4646
IQueue
@@ -82,14 +82,15 @@
8282
cp/Datafiable
8383
(datafy [_]
8484
{:permits (.availablePermits semaphore)
85+
:executor executor
8586
:queue (.size queue)
8687
:max-permits max-permits
8788
:max-queue max-queue
8889
:timeout timeout})
8990

90-
IBulkhead
91-
(-invoke [this f]
92-
(px/await! (px/submit! this f)))
91+
;; IBulkhead
92+
;; (-invoke [this f]
93+
;; (px/await! (px/submit! this f)))
9394

9495
Executor
9596
(execute [this f]
@@ -127,78 +128,96 @@
127128
(ns-unmap *ns* 'map->Bulkhead)
128129
(ns-unmap *ns* '->Task)
129130

130-
(defrecord SemaphoreBulkhead [^Semaphore semaphore
131-
^AtomicInteger counter
132-
max-permits
133-
max-queue
134-
timeout]
135-
cp/Datafiable
136-
(datafy [_]
137-
{:permits (.availablePermits semaphore)
138-
:queue (+ (long counter) (long max-permits))
139-
:max-permits max-permits
140-
:max-queue max-queue})
141-
142-
IBulkhead
143-
(-invoke [this f]
144-
(let [nqueued (.incrementAndGet counter)]
145-
(when (> (long nqueued) (long max-queue))
146-
(let [hint (str "bulkhead: queue max capacity reached (" max-queue ")")
147-
props {:type :bulkhead-error
148-
:code :capacity-limit-reached
149-
:size max-queue}]
150-
(.decrementAndGet counter)
151-
(throw (ex-info hint props))))
152-
153-
(try
154-
(if (psm/acquire! semaphore :permits 1 :timeout timeout)
155-
(try
156-
(f)
157-
(finally
158-
(psm/release! semaphore)))
159-
(let [props {:type :bulkhead-error
160-
:code :timeout
161-
:timeout timeout}]
162-
(throw (ex-info "bulkhead: timeout" props))))
163-
(finally
164-
(.decrementAndGet counter))))))
165-
166-
(ns-unmap *ns* '->SemaphoreBulkhead)
167-
(ns-unmap *ns* 'map->SemaphoreBulkhead)
131+
;; (defrecord SemaphoreBulkhead [^Semaphore semaphore
132+
;; ^AtomicInteger counter
133+
;; max-permits
134+
;; max-queue
135+
;; timeout]
136+
;; cp/Datafiable
137+
;; (datafy [_]
138+
;; {:permits (.availablePermits semaphore)
139+
;; :queue (+ (long counter) (long max-permits))
140+
;; :max-permits max-permits
141+
;; :max-queue max-queue})
142+
143+
;; IBulkhead
144+
;; (-invoke [this f]
145+
;; (let [nqueued (.incrementAndGet counter)]
146+
;; (when (> (long nqueued) (long max-queue))
147+
;; (let [hint (str "bulkhead: queue max capacity reached (" max-queue ")")
148+
;; props {:type :bulkhead-error
149+
;; :code :capacity-limit-reached
150+
;; :size max-queue}]
151+
;; (.decrementAndGet counter)
152+
;; (throw (ex-info hint props))))
153+
154+
;; (try
155+
;; (if (psm/acquire! semaphore :permits 1 :timeout timeout)
156+
;; (try
157+
;; (f)
158+
;; (finally
159+
;; (psm/release! semaphore)))
160+
;; (let [props {:type :bulkhead-error
161+
;; :code :timeout
162+
;; :timeout timeout}]
163+
;; (throw (ex-info "bulkhead: timeout" props))))
164+
;; (finally
165+
;; (.decrementAndGet counter))))))
166+
167+
;; (ns-unmap *ns* '->SemaphoreBulkhead)
168+
;; (ns-unmap *ns* 'map->SemaphoreBulkhead)
168169

169170

170171
;; --- PUBLIC API
171172

172173
(defn create
173-
[& {:keys [type permits queue timeout] :as params}]
174-
(case type
175-
(:async :executor)
176-
(let [executor (px/resolve-executor (:executor params))
177-
max-queue (or queue Integer/MAX_VALUE)
178-
max-permits (or permits 1)
179-
queue (LinkedBlockingQueue. (int max-queue))
180-
semaphore (Semaphore. (int max-permits))]
181-
(Bulkhead. executor
182-
semaphore
183-
queue
184-
max-permits
185-
max-queue
186-
timeout))
187-
188-
(:sync :semaphore)
189-
(let [max-queue (or queue Integer/MAX_VALUE)
190-
max-permits (or permits 1)
191-
counter (AtomicInteger. (int (- max-permits)))
192-
semaphore (Semaphore. (int max-permits))]
193-
(SemaphoreBulkhead. semaphore
194-
counter
195-
max-permits
196-
max-queue
197-
timeout))))
198-
199-
(defn invoke!
200-
[instance f]
201-
(-invoke instance f))
174+
[& {:keys [permits queue timeout executor] :as params}]
175+
(let [executor (px/resolve-executor executor)
176+
max-queue (or queue Integer/MAX_VALUE)
177+
max-permits (or permits 1)
178+
queue (LinkedBlockingQueue. (int max-queue))
179+
semaphore (Semaphore. (int max-permits))]
180+
(Bulkhead. executor
181+
semaphore
182+
queue
183+
max-permits
184+
max-queue
185+
timeout)))
186+
187+
;; (defn create
188+
;; [& {:keys [type permits queue timeout executor] :as params}]
189+
;; (case type
190+
;; ;; (:async :executor)
191+
;; (let [executor (px/resolve-executor executor)
192+
;; max-queue (or queue Integer/MAX_VALUE)
193+
;; max-permits (or permits 1)
194+
;; queue (LinkedBlockingQueue. (int max-queue))
195+
;; semaphore (Semaphore. (int max-permits))]
196+
;; (Bulkhead. executor
197+
;; semaphore
198+
;; queue
199+
;; max-permits
200+
;; max-queue
201+
;; timeout))
202+
203+
;; (:sync :semaphore)
204+
;; (let [max-queue (or queue Integer/MAX_VALUE)
205+
;; max-permits (or permits 1)
206+
;; counter (AtomicInteger. (int (- max-permits)))
207+
;; semaphore (Semaphore. (int max-permits))]
208+
;; (SemaphoreBulkhead. semaphore
209+
;; counter
210+
;; max-permits
211+
;; max-queue
212+
;; timeout))))
213+
214+
;; (defn invoke!
215+
;; [instance f]
216+
;; (-invoke instance f))
217+
218+
;; (defn submit
219+
;; [instance f]
220+
;; (-invoke instance f))
202221

203222
(defn get-stats
204223
[instance]
@@ -207,4 +226,4 @@
207226
(defn bulkhead?
208227
"Check if the provided object is instance of Bulkhead type."
209228
[o]
210-
(satisfies? IBulkhead o))
229+
(satisfies? Bulkhead o))

test/promesa/tests/exec_bulkhead_test.clj

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -35,53 +35,53 @@
3535
;; (t/is (< @res 10000000)))))
3636

3737
(t/deftest operations-with-executor-bulkhead
38-
(let [instance (pbh/create {:permits 1 :queue 2 :type :executor})
39-
res1 (pu/try! (px/submit! instance (waiting-fn 1000)))
40-
res2 (pu/try! (px/submit! instance (waiting-fn 200)))
41-
res3 (pu/try! (px/submit! instance (waiting-fn 200)))
42-
]
38+
(let [instance (pbh/create {:permits 1 :queue 2})
39+
res1 (px/submit instance (waiting-fn 1000))
40+
res2 (px/submit instance (waiting-fn 200))
41+
res3 (px/submit instance (waiting-fn 200))]
42+
4343
(t/is (p/promise? res1))
4444
(t/is (p/promise? res2))
45-
(t/is (instance? Throwable res3))
45+
(t/is (p/promise? res3))
4646

4747
(t/is (p/pending? res1))
4848
(t/is (p/pending? res2))
49+
(t/is (p/rejected? res3))
4950

5051
(t/is (pos? (deref res1 2000 -1)))
5152
(t/is (pos? (deref res2 2000 -1)))
52-
(let [data (ex-data res3)]
53+
54+
(let [data (ex-data (pu/try! @res))]
5355
(t/is (= :bulkhead-error (:type data)))
54-
(t/is (= :capacity-limit-reached (:code data))))
55-
56-
))
57-
58-
(t/deftest operations-with-semaphore-bulkhead
59-
(let [instance (pbh/create {:permits 1 :queue 1 :type :semaphore})
60-
res1 (px/with-dispatch :thread
61-
(pbh/invoke! instance (waiting-fn 2000)))
62-
_ (px/sleep 200)
63-
res2 (px/with-dispatch :thread
64-
(pbh/invoke! instance (waiting-fn 2000)))
65-
_ (px/sleep 200)
66-
res3 (px/with-dispatch :thread
67-
(pbh/invoke! instance (with-meta (waiting-fn 200) {:name "res3"})))
68-
]
56+
(t/is (= :capacity-limit-reached (:code data))))))
6957

70-
(t/is (p/promise? res1))
71-
(t/is (p/promise? res2))
72-
(t/is (p/promise? res3))
58+
;; (t/deftest operations-with-semaphore-bulkhead
59+
;; (let [instance (pbh/create {:permits 1 :queue 1 :type :semaphore})
60+
;; res1 (px/with-dispatch :thread
61+
;; (pbh/invoke! instance (waiting-fn 2000)))
62+
;; _ (px/sleep 200)
63+
;; res2 (px/with-dispatch :thread
64+
;; (pbh/invoke! instance (waiting-fn 2000)))
65+
;; _ (px/sleep 200)
66+
;; res3 (px/with-dispatch :thread
67+
;; (pbh/invoke! instance (with-meta (waiting-fn 200) {:name "res3"})))
68+
;; ]
7369

74-
(t/is (p/pending? res1))
75-
(t/is (p/pending? res2))
70+
;; (t/is (p/promise? res1))
71+
;; (t/is (p/promise? res2))
72+
;; (t/is (p/promise? res3))
7673

77-
(p/await res3)
74+
;; (t/is (p/pending? res1))
75+
;; (t/is (p/pending? res2))
7876

79-
(t/is (p/rejected? res3))
80-
(t/is (pos? (deref res1 2200 -1)))
81-
(t/is (pos? (deref res2 2200 -1)))
77+
;; (p/await res3)
8278

83-
(t/is (thrown? java.util.concurrent.ExecutionException (deref res3)))
79+
;; (t/is (p/rejected? res3))
80+
;; (t/is (pos? (deref res1 2200 -1)))
81+
;; (t/is (pos? (deref res2 2200 -1)))
8482

85-
(let [data (ex-data (p/extract res3))]
86-
(t/is (= :bulkhead-error (:type data)))
87-
(t/is (= :capacity-limit-reached (:code data))))))
83+
;; (t/is (thrown? java.util.concurrent.ExecutionException (deref res3)))
84+
85+
;; (let [data (ex-data (p/extract res3))]
86+
;; (t/is (= :bulkhead-error (:type data)))
87+
;; (t/is (= :capacity-limit-reached (:code data))))))

0 commit comments

Comments
 (0)