Skip to content

Commit e307ff6

Browse files
author
Yaron Thurm
committed
Document consumer-commands-chan
1 parent 58a64eb commit e307ff6

File tree

1 file changed

+79
-5
lines changed

1 file changed

+79
-5
lines changed

README.md

+79-5
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,16 @@ Note: `int` is used for brevity but can also mean `long`. Don't worry about it.
7474
| :internal-config | map | optional | A map of the underlying java client properties, for any extra lower level config |
7575

7676
#### Consumer-source options
77-
| Key | Type | Req? | Notes |
78-
|-----|------|------|-------|
79-
| :group-id | string | required | |
80-
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
77+
| Key | Type | Req? | Notes |
78+
|-----|------|------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
79+
| :group-id | string | required | |
80+
| :shape | `:value:`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of `ConsumerRecord` | optional | If unspecified, channel will contain ConsumerRecord objects. [Examples](#data-shapes) |
81+
| :ketu.source/consumer-commands-chan | channel | optional | Used for passing custom functions to be executed from within the poll loop. Items of this channel are expected to be of type `fn[x]`. One example for using this channel is to enable pausing/resuming of the underlying kafka consumer, since trying to do that outside the poll loop causes a `ConcurrentModificationException` to be thrown. [Code example](#data-shapes) |
8182

8283
#### Producer-sink options
8384
| Key | Type | Req? | Notes |
8485
|-----|------|------|-------|
85-
| :shape | `:value`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of the input returning `ProducerRecord` | optional | If unspecified, you must put ProducerRecord objects on the channel. [Examples](#data-shapes) |
86+
| :shape | `:value`, `[:vector <fields>]`,`[:map <fields>]`, or an arity-1 function of the input returning `ProducerRecord` | optional | If unspecified, you must put ProducerRecord objects on the channel. [Examples](#example-of-using-the-custom-commands-channel) |
8687
| :compression-type | `"none"` `"gzip"` `"snappy"` `"lz4"` `"zstd"` | optional | Default `"none"`, values are same as "compression.type" of the java producer |
8788
| :workers | int | optional | Default `1`, number of threads that take from the channel and invoke the internal producer |
8889

@@ -127,6 +128,79 @@ Similarly, to put a clojure data structure on the producer channel:
127128
(>!! producer-chan ["k2" "v2" "events"])
128129
```
129130

131+
## Example of using the custom commands channel
132+
133+
In this example we demonstare how to enable pause/resume of the consumer:
134+
135+
```clojure
136+
(ns custom-commands-channel-example
137+
(:require [clojure.core.async :as async]
138+
[ketu.async.source :as source]
139+
[ketu.async.sink :as sink]))
140+
141+
(let [commands-chan (async/chan 10)
142+
consumer-chan (async/chan 10)
143+
consumer-opts {:name "consumer-example"
144+
:brokers "broker1:9092"
145+
:topic "example"
146+
:group-id "example"
147+
:value-type :string
148+
:shape :value
149+
:ketu.source/consumer-commands-chan commands-chan}
150+
source (source/source consumer-chan consumer-opts)
151+
152+
producer-chan (async/chan 10)
153+
sink-opts {:name "producer-example"
154+
:brokers "broker1:9092"
155+
:topic "example"
156+
:value-type :string
157+
:shape :value}
158+
sink (sink/sink producer-chan sink-opts)
159+
160+
; periodically produce data to the topic
161+
producing (future
162+
(dotimes [i 20]
163+
(async/>!! producer-chan (str i))
164+
(Thread/sleep 300))
165+
(async/>!! producer-chan "done")
166+
(async/close! producer-chan))
167+
168+
; read from the consumer channel and print to the screen
169+
processing (future
170+
(loop []
171+
(let [message (async/<!! consumer-chan)]
172+
(println message)
173+
(when (not= message "done")
174+
(recur)))))]
175+
(try
176+
(Thread/sleep 2000) ; consumer is consuming normally
177+
(let [paused (promise)
178+
resumed (promise)]
179+
180+
; Send the commands channel a function that will pause the consumer
181+
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
182+
(.pause consumer (.assignment consumer))
183+
(deliver paused true)))
184+
185+
@paused
186+
(println "consumer is paused")
187+
(Thread/sleep 2000)
188+
189+
; Send the commands channel a function that will resume the consumer
190+
(async/>!! commands-chan (fn [{consumer :ketu.source/consumer}]
191+
(.resume consumer (.paused consumer))
192+
(deliver resumed true)))
193+
194+
@resumed
195+
(println "consumer is resumed")
196+
197+
; Wait for all futures to finish
198+
@producing
199+
@processing)
200+
(finally
201+
(source/stop! source))))
202+
```
203+
130204
## Development & Contribution
131205

132206
We welcome feedback and would love to hear about use-cases other than ours. You can open issues, send pull requests,

0 commit comments

Comments
 (0)