Skip to content

Commit 2293a55

Browse files
authored
Merge pull request #758 from LincolnPuzey/fail_fast_default_true
Change StubBroker.join() parameter fail_fast to default to True
2 parents cb93475 + 8a72e73 commit 2293a55

File tree

11 files changed

+93
-48
lines changed

11 files changed

+93
-48
lines changed

docs/source/changelog.rst

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,21 @@ Major Breaking Changes
1616

1717
These are breaking changes we believe are most likely to effect your project.
1818

19+
* The ``fail_fast`` argument to |StubBroker_join| now defaults to True.
20+
This means that calling |StubBroker_join|, will by default,
21+
re-raise any Exceptions that caused messages to get dead-lettered
22+
(i.e. any uncaught Exceptions in your actor functions).
23+
You may need to explicitly catch these exception in your tests
24+
(e.g. with :meth:`unittest.TestCase.assertRaises` or :func:`pytest.raises`).
25+
26+
Alternatively, you can revert to the old behavior
27+
by passing ``fail_fast_default=False`` to |StubBroker|.
28+
29+
However, we think the new default behavior is best, because it makes
30+
exceptions happening in your actor functions obvious in your tests.
31+
Previsouly, exceptions in your actor functions could pass silently,
32+
and potentially unnoticed unless you checked the side-effects of the actor.
33+
(`#739`_, `#758`_, `@LincolnPuzey`_)
1934
* The |Prometheus| middleware is no longer in the default middleware list.
2035
To keep exporting the Prometheus statistics, you must now install the ``prometheus`` extra
2136
(e.g. ``pip install 'dramatiq[prometheus]'``)
@@ -31,6 +46,8 @@ These are breaking changes we believe are most likely to effect your project.
3146
.. _#688: https://github.com/Bogdanp/dramatiq/pull/688
3247
.. _@azmeuk: https://github.com/azmeuk
3348
.. _#728: https://github.com/Bogdanp/dramatiq/pull/728
49+
.. _#758: https://github.com/Bogdanp/dramatiq/pull/758
50+
.. _#739: https://github.com/Bogdanp/dramatiq/issues/739
3451

3552
Minor Breaking Changes
3653
~~~~~~~~~~~~~~~~~~~~~~

docs/source/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,4 +195,5 @@
195195
"python": ("https://docs.python.org/3", None),
196196
"pika": ("https://pika.readthedocs.io/en/stable/", None),
197197
"redis": ("https://redis.readthedocs.io/en/latest/", None),
198+
"pytest": ("https://docs.pytest.org/en/stable", None),
198199
}

docs/source/guide.rst

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -490,17 +490,18 @@ synchronously by calling them as you would normal functions.
490490
Dealing with Exceptions
491491
^^^^^^^^^^^^^^^^^^^^^^^^
492492

493-
By default, any exceptions raised by an actor are raised in the
493+
By default, any exceptions raised by an actor are caught by the
494494
worker, which runs in a separate thread from the one your tests run
495495
in. This means that any exceptions your actor throws will not be
496-
visible to your test code!
496+
immediately visible to your test code!
497497

498-
You can make the stub broker re-raise exceptions from failed actors in your
499-
main thread by passing ``fail_fast=True`` to its ``join`` method::
498+
To help surface actor exceptions, by default,
499+
the stub broker will re-raise exceptions from failed messages
500+
in your main thread when you call its |StubBroker_join| method::
500501

501502
def test_count_words(stub_broker, stub_worker):
502-
count_words.send("http://example.com")
503-
stub_broker.join(count_words.queue_name, fail_fast=True)
503+
count_words.send("http://some-invalid-url.invalid")
504+
stub_broker.join(count_words.queue_name) # Exception from actor will be re-raised here.
504505
stub_worker.join()
505506

506507
This way, whatever exception caused the actor to fail will be raised

docs/source/troubleshooting.rst

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,12 @@ pytest_, then you can easily do this from the command line using the
5858

5959
You can also pass ``fail_fast=True`` as a parameter to |StubBroker_join|
6060
in order to make it reraise whatever exception caused the actor to
61-
fail in the main thread. Note, however, that the actor is only
61+
fail in the main thread.
62+
63+
.. versionchanged:: 2.0.0
64+
The ``fail_fast`` parameter now defaults to True.
65+
66+
Note, however, that the actor is only
6267
considered to fail once all of its retries have been used up; meaning
6368
that unless you specify custom retry limits for the actors or for your
6469
tests as a whole (by configuring the |Retries| middleware), then each

dramatiq/brokers/stub.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,26 @@
2727
from ..common import current_millis, dq_name, iter_queue, join_queue
2828
from ..errors import QueueNotFound
2929
from ..message import Message
30+
from ..middleware import Middleware
3031

3132

3233
class StubBroker(Broker):
33-
"""A broker that can be used within unit tests."""
34+
"""A broker that can be used within unit tests.
3435
35-
def __init__(self, middleware=None):
36+
Parameters:
37+
middleware: See :class:`Broker<dramatiq.Broker>`.
38+
fail_fast_default: Specifies the default value for the ``fail_fast``
39+
argument of :meth:`join<dramatiq.brokers.stub.StubBroker.join>`.
40+
"""
41+
42+
def __init__(self, middleware: Optional[list[Middleware]] = None, *, fail_fast_default: bool = True):
3643
super().__init__(middleware)
3744

38-
self.dead_letters_by_queue = defaultdict(list)
45+
self.dead_letters_by_queue: defaultdict[str, list[MessageProxy]] = defaultdict(list)
46+
self.fail_fast_default: bool = fail_fast_default
3947

4048
@property
41-
def dead_letters(self) -> list[Message]:
49+
def dead_letters(self) -> list[MessageProxy]:
4250
"""The dead-lettered messages for all defined queues."""
4351
return [message for messages in self.dead_letters_by_queue.values() for message in messages]
4452

@@ -131,8 +139,7 @@ def flush_all(self) -> None:
131139

132140
self.dead_letters_by_queue.clear()
133141

134-
# TODO: Make fail_fast default to True.
135-
def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: bool = False) -> None:
142+
def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: Optional[bool] = None) -> None:
136143
"""Wait for all the messages on the given queue to be
137144
processed. This method is only meant to be used in tests
138145
to wait for all the messages in a queue to be processed.
@@ -145,10 +152,16 @@ def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: boo
145152
queue_name(str): The queue to wait on.
146153
fail_fast(bool): When this is True and any message gets
147154
dead-lettered during the join, then an exception will be
148-
raised. This will be True by default starting with
149-
version 2.0.
155+
raised. When False, no exception will be raised.
156+
Defaults to None, which means use the value of the
157+
``fail_fast_default`` instance attribute
158+
(which defaults to True).
150159
timeout(Optional[int]): The max amount of time, in
151160
milliseconds, to wait on this queue.
161+
162+
.. versionchanged:: 2.0.0
163+
The ``fail_fast`` parameter now defaults to ``self.fail_fast_default``
164+
(which defaults to True).
152165
"""
153166
try:
154167
queues = [
@@ -159,6 +172,7 @@ def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: boo
159172
raise QueueNotFound(queue_name) from None
160173

161174
deadline = timeout and time.monotonic() + timeout / 1000
175+
should_fail_fast = fail_fast if fail_fast is not None else self.fail_fast_default
162176
while True:
163177
for queue in queues:
164178
join_timeout = deadline and deadline - time.monotonic()
@@ -171,7 +185,7 @@ def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: boo
171185
if queue.unfinished_tasks:
172186
break
173187
else:
174-
if fail_fast:
188+
if should_fail_fast:
175189
for message in self.dead_letters_by_queue[queue_name]:
176190
raise (message._exception or Exception("Message failed with unknown error")) from None
177191

tests/middleware/test_retries.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def do_work():
2727
do_work.send()
2828

2929
# Then join on the queue
30-
stub_broker.join(do_work.queue_name)
30+
stub_broker.join(do_work.queue_name, fail_fast=False)
3131
stub_worker.join()
3232

3333
# I expect successes
@@ -48,7 +48,7 @@ def do_work():
4848
do_work.send()
4949

5050
# And join on the queue
51-
stub_broker.join(do_work.queue_name)
51+
stub_broker.join(do_work.queue_name, fail_fast=False)
5252
stub_worker.join()
5353

5454
# Then I expect 4 attempts to have occurred
@@ -69,7 +69,7 @@ def do_work():
6969
do_work.send()
7070

7171
# And join on the queue
72-
stub_broker.join(do_work.queue_name)
72+
stub_broker.join(do_work.queue_name, fail_fast=False)
7373
stub_worker.join()
7474

7575
# Then I expect at least one attempt to have occurred
@@ -88,7 +88,7 @@ def do_work():
8888
do_work.send()
8989

9090
# And join on the queue
91-
stub_broker.join(do_work.queue_name)
91+
stub_broker.join(do_work.queue_name, fail_fast=False)
9292
stub_worker.join()
9393

9494
# Then no error should be logged
@@ -113,7 +113,7 @@ def do_work():
113113
do_work.send()
114114

115115
# And join on the queue
116-
stub_broker.join(do_work.queue_name)
116+
stub_broker.join(do_work.queue_name, fail_fast=False)
117117
stub_worker.join()
118118

119119
# Then the actor should have been retried after 100ms
@@ -136,7 +136,7 @@ def do_work():
136136
do_work.send()
137137

138138
# And join on the queue
139-
stub_broker.join(do_work.queue_name)
139+
stub_broker.join(do_work.queue_name, fail_fast=False)
140140
stub_worker.join()
141141

142142
# Then the actor should have retried 10 times without delay
@@ -160,7 +160,7 @@ def do_work():
160160
do_work.send()
161161

162162
# And join on the queue
163-
stub_broker.join(do_work.queue_name)
163+
stub_broker.join(do_work.queue_name, fail_fast=False)
164164
stub_worker.join()
165165

166166
# Then the actor should have retried 10 times without delay
@@ -184,7 +184,7 @@ def do_work():
184184
do_work.send_with_options(min_backoff=0)
185185

186186
# And join on the queue
187-
stub_broker.join(do_work.queue_name)
187+
stub_broker.join(do_work.queue_name, fail_fast=False)
188188
stub_worker.join()
189189

190190
# Then the actor should have retried 10 times without delay
@@ -208,7 +208,7 @@ def do_work():
208208
do_work.send_with_options(max_backoff=0)
209209

210210
# And join on the queue
211-
stub_broker.join(do_work.queue_name)
211+
stub_broker.join(do_work.queue_name, fail_fast=False)
212212
stub_worker.join()
213213

214214
# Then the actor should have retried 10 times without delay
@@ -231,7 +231,7 @@ def do_work():
231231
do_work.send_with_options(max_retries=max_retries_message_option, min_backoff=50, max_backoff=500)
232232

233233
# And join on the queue
234-
stub_broker.join(do_work.queue_name)
234+
stub_broker.join(do_work.queue_name, fail_fast=False)
235235
stub_worker.join()
236236

237237
# Then I expect it to be retried as specified in the message options
@@ -257,7 +257,7 @@ def raises_errors(raise_runtime_error):
257257
raises_errors.send(False)
258258

259259
# And wait for it
260-
stub_broker.join(raises_errors.queue_name)
260+
stub_broker.join(raises_errors.queue_name, fail_fast=False)
261261
stub_worker.join()
262262

263263
# Then I expect the actor not to retry
@@ -268,7 +268,7 @@ def raises_errors(raise_runtime_error):
268268
raises_errors.send(True)
269269

270270
# And wait for it
271-
stub_broker.join(raises_errors.queue_name)
271+
stub_broker.join(raises_errors.queue_name, fail_fast=False)
272272
stub_worker.join()
273273

274274
# Then I expect the actor to retry 3 times
@@ -312,7 +312,7 @@ def do_work():
312312
do_work.send()
313313

314314
# And join on the queue
315-
stub_broker.join(do_work.queue_name)
315+
stub_broker.join(do_work.queue_name, fail_fast=False)
316316
stub_worker.join()
317317

318318
# Then no errors and or warnings should be logged
@@ -350,7 +350,7 @@ def do_work():
350350
message = do_work.send_with_options(delay=100)
351351

352352
# When I join on the queue and run the actor
353-
stub_broker.join(do_work.queue_name)
353+
stub_broker.join(do_work.queue_name, fail_fast=False)
354354
stub_worker.join()
355355

356356
# Then I expect correct number of requeue timestamps recorded
@@ -380,8 +380,8 @@ def do_work():
380380

381381
do_work.send()
382382

383-
stub_broker.join(do_work.queue_name)
384-
stub_broker.join(handle_retries_exhausted.queue_name)
383+
stub_broker.join(do_work.queue_name, fail_fast=False)
384+
stub_broker.join(handle_retries_exhausted.queue_name, fail_fast=False)
385385
stub_worker.join()
386386

387387
# We should have the initial attempt + max_retries
@@ -405,8 +405,8 @@ def do_work():
405405

406406
do_work.send()
407407

408-
stub_broker.join(do_work.queue_name)
409-
stub_broker.join(handle_retries_exhausted.queue_name)
408+
stub_broker.join(do_work.queue_name, fail_fast=False)
409+
stub_broker.join(handle_retries_exhausted.queue_name, fail_fast=False)
410410
stub_worker.join()
411411

412412
# No retry should be required
@@ -432,8 +432,8 @@ def do_work():
432432

433433
do_work.send()
434434

435-
stub_broker.join(do_work.queue_name)
436-
stub_broker.join(handle_retries_exhausted.queue_name)
435+
stub_broker.join(do_work.queue_name, fail_fast=False)
436+
stub_broker.join(handle_retries_exhausted.queue_name, fail_fast=False)
437437
stub_worker.join()
438438

439439
# The first retry should have succeeded

tests/middleware/test_shutdown.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ def do_work():
174174
stub_worker.stop()
175175

176176
# Then join on the queue
177-
stub_broker.join(do_work.queue_name)
177+
with pytest.raises(shutdown.Shutdown): # expect Shutdown exception
178+
stub_broker.join(do_work.queue_name)
178179
stub_worker.join()
179180

180181
# I expect it to shutdown
@@ -240,7 +241,7 @@ def do_work(n=10, i=0.1):
240241
stub_worker.stop()
241242

242243
# Then join on the queue
243-
stub_broker.join(do_work.queue_name)
244+
stub_broker.join(do_work.queue_name, fail_fast=False)
244245
stub_worker.join()
245246

246247
# I expect only one success and one shutdown

tests/middleware/test_time_limit.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ def do_work():
163163
do_work.send()
164164

165165
# Then join on the queue
166-
stub_broker.join(do_work.queue_name)
166+
with pytest.raises(time_limit.TimeLimitExceeded): # expect TimeLimitExceeded exception
167+
stub_broker.join(do_work.queue_name)
167168
stub_worker.join()
168169

169170
# I expect the time limit to have been exceeded

tests/test_actors.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import dramatiq
1111
from dramatiq import Message, Middleware
1212
from dramatiq.errors import ActorNotFound, RateLimitExceeded
13-
from dramatiq.middleware import CurrentMessage, SkipMessage
13+
from dramatiq.middleware import CurrentMessage, SkipMessage, TimeLimitExceeded
1414

1515
from .common import skip_on_pypy, worker
1616

@@ -199,7 +199,8 @@ def do_work():
199199
do_work.send()
200200

201201
# And join on the queue
202-
stub_broker.join(do_work.queue_name)
202+
with pytest.raises(TimeLimitExceeded): # expect TimeLimitExceeded exception
203+
stub_broker.join(do_work.queue_name)
203204
stub_worker.join()
204205

205206
# Then I expect it to fail
@@ -223,7 +224,8 @@ def do_work():
223224
do_work.send_with_options(time_limit=1000)
224225

225226
# Then join on the queue
226-
stub_broker.join(do_work.queue_name)
227+
with pytest.raises(TimeLimitExceeded): # expect TimeLimitExceeded exception
228+
stub_broker.join(do_work.queue_name)
227229
stub_worker.join()
228230

229231
# I expect it to fail
@@ -248,7 +250,8 @@ def do_work():
248250

249251
# Then join on its queue
250252
with worker(stub_broker, worker_timeout=100) as stub_worker:
251-
stub_broker.join(do_work.queue_name)
253+
with pytest.raises(SkipMessage): # expect SkipMessage exception
254+
stub_broker.join(do_work.queue_name)
252255
stub_worker.join()
253256

254257
# I expect the message to have been skipped
@@ -272,7 +275,8 @@ def do_work():
272275

273276
# Then join on its queue
274277
with worker(stub_broker, worker_timeout=100) as stub_worker:
275-
stub_broker.join(do_work.queue_name)
278+
with pytest.raises(SkipMessage): # expect SkipMessage exception
279+
stub_broker.join(do_work.queue_name)
276280
stub_worker.join()
277281

278282
# I expect the message to have been skipped
@@ -483,7 +487,8 @@ def raise_rate_limit_exceeded():
483487
raise_rate_limit_exceeded.send()
484488

485489
# And wait for the message to get processed
486-
stub_broker.join(raise_rate_limit_exceeded.queue_name)
490+
with pytest.raises(RateLimitExceeded): # expect RateLimitExceeded exception
491+
stub_broker.join(raise_rate_limit_exceeded.queue_name)
487492
stub_worker.join()
488493

489494
# Then debug mock should be called with a special message

0 commit comments

Comments
 (0)