File tree 1 file changed +35
-0
lines changed
1 file changed +35
-0
lines changed Original file line number Diff line number Diff line change 211
211
(source/stop! source-rebalance)
212
212
(source/stop! source)
213
213
(.close ^AdminClient admin-client)))))
214
+
215
+ (deftest commands-channel
216
+ (let [commands-chan (async/chan 10 )
217
+ consumer-chan (async/chan 10 )
218
+ result-chan (async/chan 10 )
219
+ clicks-consumer-opts {:name " clicks-consumer"
220
+ :brokers (kafka-setup/get-bootstrap-servers )
221
+ :topic " clicks"
222
+ :group-id " clicks-test-consumer"
223
+ :auto-offset-reset " earliest"
224
+ :ketu.source/consumer-commands-chan commands-chan}
225
+ source (source/source consumer-chan clicks-consumer-opts)]
226
+ (try
227
+ (async/>!! commands-chan (fn [{consumer :ketu.source/consumer }] (async/>!! result-chan [:1 consumer])))
228
+ (async/>!! commands-chan (fn [{consumer :ketu.source/consumer }]
229
+ (try
230
+ (.pause consumer (.assignment consumer))
231
+ (async/>!! result-chan [:2 :success ])
232
+ (catch Exception _
233
+ (async/>!! result-chan [:2 :failed ])))))
234
+ (async/>!! commands-chan (fn [{consumer :ketu.source/consumer }]
235
+ (try
236
+ (.resume consumer (.paused consumer))
237
+ (async/>!! result-chan [:3 :success ])
238
+ (catch Exception _
239
+ (async/>!! result-chan [:3 :failed ])))))
240
+
241
+ (let [expected [[:1 (:ketu.source/consumer source)]
242
+ [:2 :success ]
243
+ [:3 :success ]]
244
+ actual (doall (mapv (fn [_] (u/try-take! result-chan)) (range 3 )))]
245
+ (is (= expected actual)))
246
+ (finally
247
+ (Thread/sleep 2000 )
248
+ (source/stop! source)))))
You can’t perform that action at this time.
0 commit comments