Skip to content

Commit 958b905

Browse files
authored
Merge pull request #14 from toyokumo/add-visibility-timeout-in-heartbeat-option
Add visibility timeout in heartbeat option
2 parents 20d2384 + 98a3131 commit 958b905

File tree

7 files changed

+99
-49
lines changed

7 files changed

+99
-49
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Change Log
22

33
## [Unreleased]
4+
### Added
5+
* Add `:visibility-timeout-in-heartbeat` option to control visibility timeout in heartbeat mode.
46

57
## 0.4.91
68
### Fixed

README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ See [API docs](https://cljdoc.org/d/toyokumo/gluttony/CURRENT) for detail.
6565
### Heartbeat
6666
If you don't know how long it takes to process a message, pass `:hearbeat` and `:heartbeat-timeout` options.
6767

68-
Then Gluttony extends the message visibility per `:hearbeat` seconds to `:heartbeat-timeout` seconds.
68+
Then Gluttony extends the message visibility per `:hearbeat` seconds to `:hearbeat + 1` seconds.
69+
(Extended seconds is configurable by `:visibility-timeout-in-heartbeat` option)
6970

7071
See [AWS documents](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html)
7172
for more detail.

build.edn

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{:lib toyokumo/gluttony
2-
:version "0.4.{{git/commit-count}}"
2+
:version "0.5.{{git/commit-count}}"
33
:documents [{:file "CHANGELOG.md"
44
:match "Unreleased"
55
:action :append-after

src/gluttony/core.clj

+44-37
Original file line numberDiff line numberDiff line change
@@ -39,43 +39,46 @@
3939
(raise 10))))
4040
4141
Optional arguments:
42-
:client - the SQS client, which is the instance of cognitect.aws.client.Client.
43-
if missing, cognitect.aws.client.api/client would be called.
44-
:num-workers - the number of workers processing messages concurrently.
45-
default: (Runtime/availableProcessors) - 1
46-
:num-receivers - the number of receivers polling from sqs.
47-
default: (num-workers / 10) because each receiver is able to receive
48-
up to 10 messages at a time.
49-
:message-channel-size - the number of messages to prefetch from sqs.
50-
default: 20 * num-receivers
51-
:receive-limit - the number of messages to receive at a time. 1 to 10.
52-
default: 10
53-
:consume-limit - the number of processing messages at the same time. 0 to 1024
54-
If the consume run asynchronously, for instance inside go block,
55-
you may want to use this option.
56-
default: 0, which means gluttony doesn't care about how many message
57-
are processed simultaneously.
58-
:long-polling-duration - the duration (in seconds) for which the call waits for a message to
59-
arrive in the queue before returning. 0 to 20.
60-
default: 20
61-
:exceptional-poll-delay-ms - when an Exception is received while polling, receiver wait for the
62-
number of ms until polling again.
63-
default: 10000 (10 seconds).
64-
:heartbeat - the duration (in seconds) for which the consumer extends message
65-
visibility if the message is being processed. 1 to 43199.
66-
default: nil
67-
If it isn't set, heartbeat doesn't work.
68-
If it's set, :heartbeat-timeout is required.
69-
Refer to AWS documents for more detail: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html
70-
If you set this option and :consume-limit, recommend to make
71-
:consume-limit bigger than :message-channel-size so as not to block
72-
heartbeat requests.
73-
:heartbeat-timeout - the timeout (in seconds) of heartbeat.
74-
If your consume function doesn't call respond or raise within heartbeat
75-
timeout, the consumer doesn't extend message visibility any more.
76-
2 to 43200.
77-
default: nil
78-
:heartbeat-timeout must be longer than :heartbeat.
42+
:client - the SQS client, which is the instance of cognitect.aws.client.Client.
43+
if missing, cognitect.aws.client.api/client would be called.
44+
:num-workers - the number of workers processing messages concurrently.
45+
default: (Runtime/availableProcessors) - 1
46+
:num-receivers - the number of receivers polling from sqs.
47+
default: (num-workers / 10) because each receiver is able to receive
48+
up to 10 messages at a time.
49+
:message-channel-size - the number of messages to prefetch from sqs.
50+
default: 20 * num-receivers
51+
:receive-limit - the number of messages to receive at a time. 1 to 10.
52+
default: 10
53+
:consume-limit - the number of processing messages at the same time. 0 to 1024
54+
If the consume run asynchronously, for instance inside go block,
55+
you may want to use this option.
56+
default: 0, which means gluttony doesn't care about how many message
57+
are processed simultaneously.
58+
:long-polling-duration - the duration (in seconds) for which the call waits for a message to
59+
arrive in the queue before returning. 0 to 20.
60+
default: 20
61+
:exceptional-poll-delay-ms - when an Exception is received while polling, receiver wait for the
62+
number of ms until polling again.
63+
default: 10000 (10 seconds).
64+
:heartbeat - the duration (in seconds) for which the consumer extends message
65+
visibility if the message is being processed. 1 to 43199.
66+
default: nil
67+
If it isn't set, heartbeat doesn't work.
68+
If it's set, :heartbeat-timeout is required.
69+
Refer to AWS documents for more detail: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html
70+
If you set this option and :consume-limit, recommend to make
71+
:consume-limit bigger than :message-channel-size so as not to block
72+
heartbeat requests.
73+
:heartbeat-timeout - the timeout (in seconds) of heartbeat.
74+
If your consume function doesn't call respond or raise within heartbeat
75+
timeout, the consumer doesn't extend message visibility any more.
76+
2 to 43200.
77+
default: nil
78+
:heartbeat-timeout must be longer than :heartbeat.
79+
:visibility-timeout-in-heartbeat - control visibility timeout (in seconds) in heartbeat.
80+
default: :heartbeat + 1
81+
:visibility-timeout-in-heartbeat must be longer than :heartbeat.
7982
Output:
8083
a instance of gluttony.record.consumer.Consumer"
8184
^Consumer [queue-url consume & [opts]]
@@ -96,6 +99,9 @@
9699
20)
97100
exceptional-poll-delay-ms (or (:exceptional-poll-delay-ms opts)
98101
10000)
102+
visibility-timeout-in-heartbeat (or (:visibility-timeout-in-heartbeat opts)
103+
(when (:heartbeat opts)
104+
(inc (:heartbeat opts))))
99105
consumer (c/new-consumer {:queue-url queue-url
100106
:consume consume
101107
:client client
@@ -109,6 +115,7 @@
109115
:exceptional-poll-delay-ms exceptional-poll-delay-ms
110116
:heartbeat (:heartbeat opts)
111117
:heartbeat-timeout (:heartbeat-timeout opts)
118+
:visibility-timeout-in-heartbeat visibility-timeout-in-heartbeat
112119
:receiver-enabled (atom true)})]
113120
(p/-start consumer)))
114121

src/gluttony/record/consumer.clj

+9-4
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979

8080
(defn- heartbeat*
8181
"When heartbeat parameter is set, a heartbeat process start after the first heartbeat"
82-
[{:keys [client queue-url heartbeat heartbeat-timeout]} p message]
82+
[{:keys [client queue-url heartbeat heartbeat-timeout visibility-timeout-in-heartbeat]} p message]
8383
(when heartbeat
8484
(let [heartbeat-msecs (* heartbeat 1000)
8585
start (System/currentTimeMillis)]
@@ -91,7 +91,7 @@
9191
(log/debugf "message-id:%s heartbeat" (:message-id message))
9292
(sqs/change-message-visibility client {:queue-url queue-url
9393
:receipt-handle (:receipt-handle message)
94-
:visibility-timeout (inc heartbeat)})
94+
:visibility-timeout visibility-timeout-in-heartbeat})
9595
(a/<! (a/timeout heartbeat-msecs))
9696
(recur)))))))
9797

@@ -133,6 +133,7 @@
133133
consume-chan
134134
heartbeat
135135
heartbeat-timeout
136+
visibility-timeout-in-heartbeat
136137
receiver-enabled]
137138
p/IConsumer
138139
(-start [this]
@@ -172,7 +173,8 @@
172173
long-polling-duration
173174
exceptional-poll-delay-ms
174175
heartbeat
175-
heartbeat-timeout]}]
176+
heartbeat-timeout
177+
visibility-timeout-in-heartbeat]}]
176178
{:pre [(not (str/blank? queue-url))
177179
(ifn? consume)
178180
(instance? Client client)
@@ -187,5 +189,8 @@
187189
(or (= nil heartbeat heartbeat-timeout)
188190
(and (integer? heartbeat) (integer? heartbeat-timeout)
189191
(<= 1 heartbeat 43199) (<= 2 heartbeat-timeout 43200)
190-
(< heartbeat heartbeat-timeout)))]}
192+
(< heartbeat heartbeat-timeout)))
193+
(or (nil? heartbeat)
194+
(and (integer? visibility-timeout-in-heartbeat)
195+
(< heartbeat visibility-timeout-in-heartbeat)))]}
191196
(map->Consumer m))

test/gluttony/core_test.clj

+4-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
:exceptional-poll-delay-ms 10000
3939
:consume-chan nil
4040
:heartbeat nil
41-
:heartbeat-timeout nil}
41+
:heartbeat-timeout nil
42+
:visibility-timeout-in-heartbeat nil}
4243
(dissoc consumer :message-chan :receiver-enabled)))
4344
(stop-consumer consumer)))
4445

@@ -66,7 +67,8 @@
6667
:long-polling-duration 20
6768
:exceptional-poll-delay-ms 10000
6869
:heartbeat 60
69-
:heartbeat-timeout 300}
70+
:heartbeat-timeout 300
71+
:visibility-timeout-in-heartbeat 61}
7072
(dissoc consumer :message-chan :consume-chan :receiver-enabled)))
7173
(stop-consumer consumer))))
7274

test/gluttony/record/consumer_test.clj

+37-4
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@
163163
:consume-limit 0
164164
:long-polling-duration 20
165165
:exceptional-poll-delay-ms 0
166-
:heartbeat 60}))
166+
:heartbeat 60
167+
:visibility-timeout-in-heartbeat 61}))
167168
"heartbeat is set but heartbeat-timeout isn't set")
168169
(is (thrown? AssertionError
169170
(new-consumer {:queue-url "https://ap..."
@@ -176,7 +177,39 @@
176177
:receive-limit 10
177178
:consume-limit 0
178179
:long-polling-duration 20
179-
:exceptional-poll-delay-ms 0
180+
:exceptional-poll-delay-ms 1000
181+
:heartbeat 60
182+
:heartbeat-timeout 10
183+
:visibility-timeout-in-heartbeat 61}))
184+
"heartbeat is bigger than heartbeat-timeout")
185+
(is (thrown? AssertionError
186+
(new-consumer {:queue-url "https://ap..."
187+
:consume (fn [_ _ _])
188+
:client client
189+
:given-client? true
190+
:num-workers 1
191+
:num-receivers 1
192+
:message-channel-size 10
193+
:receive-limit 10
194+
:consume-limit 0
195+
:long-polling-duration 20
196+
:exceptional-poll-delay-ms 1000
197+
:heartbeat 60
198+
:heartbeat-timeout 300}))
199+
"heartbeat is set but visibility-timeout-in-heartbeat isn't set")
200+
(is (thrown? AssertionError
201+
(new-consumer {:queue-url "https://ap..."
202+
:consume (fn [_ _ _])
203+
:client client
204+
:given-client? true
205+
:num-workers 1
206+
:num-receivers 1
207+
:message-channel-size 10
208+
:receive-limit 10
209+
:consume-limit 0
210+
:long-polling-duration 20
211+
:exceptional-poll-delay-ms 1000
180212
:heartbeat 60
181-
:heartbeat-timeout 10}))
182-
"heartbeat is bigger than heartbeat-timeout")))
213+
:heartbeat-timeout 300
214+
:visibility-timeout-in-heartbeat 59}))
215+
"heartbeat is bigger than visibility-timeout-in-heartbeat")))

0 commit comments

Comments
 (0)