Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@ Major Breaking Changes

These are breaking changes we believe are most likely to effect your project.

* The ``fail_fast`` argument to |StubBroker_join| now defaults to True.
This means that calling |StubBroker_join|, will by default,
re-raise any Exceptions that caused messages to get dead-lettered
(i.e. any uncaught Exceptions in your actor functions).
You may need to explicitly catch these exception in your tests
(e.g. with :meth:`unittest.TestCase.assertRaises` or :func:`pytest.raises`).

Alternatively, you can revert to the old behavior
by passing ``fail_fast_default=False`` to |StubBroker|.

However, we think the new default behavior is best, because it makes
exceptions happening in your actor functions obvious in your tests.
Previsouly, exceptions in your actor functions could pass silently,
and potentially unnoticed unless you checked the side-effects of the actor.
(`#739`_, `#758`_, `@LincolnPuzey`_)
* The |Prometheus| middleware is no longer in the default middleware list.
To keep exporting the Prometheus statistics, you must now install the ``prometheus`` extra
(e.g. ``pip install 'dramatiq[prometheus]'``)
Expand All @@ -31,6 +46,8 @@ These are breaking changes we believe are most likely to effect your project.
.. _#688: https://github.com/Bogdanp/dramatiq/pull/688
.. _@azmeuk: https://github.com/azmeuk
.. _#728: https://github.com/Bogdanp/dramatiq/pull/728
.. _#758: https://github.com/Bogdanp/dramatiq/pull/758
.. _#739: https://github.com/Bogdanp/dramatiq/issues/739

Minor Breaking Changes
~~~~~~~~~~~~~~~~~~~~~~
Expand Down
1 change: 1 addition & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,5 @@
"python": ("https://docs.python.org/3", None),
"pika": ("https://pika.readthedocs.io/en/stable/", None),
"redis": ("https://redis.readthedocs.io/en/latest/", None),
"pytest": ("https://docs.pytest.org/en/stable", None),
}
13 changes: 7 additions & 6 deletions docs/source/guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -490,17 +490,18 @@ synchronously by calling them as you would normal functions.
Dealing with Exceptions
^^^^^^^^^^^^^^^^^^^^^^^^

By default, any exceptions raised by an actor are raised in the
By default, any exceptions raised by an actor are caught by the
worker, which runs in a separate thread from the one your tests run
in. This means that any exceptions your actor throws will not be
visible to your test code!
immediately visible to your test code!

You can make the stub broker re-raise exceptions from failed actors in your
main thread by passing ``fail_fast=True`` to its ``join`` method::
To help surface actor exceptions, by default,
the stub broker will re-raise exceptions from failed messages
in your main thread when you call its |StubBroker_join| method::

def test_count_words(stub_broker, stub_worker):
count_words.send("http://example.com")
stub_broker.join(count_words.queue_name, fail_fast=True)
count_words.send("http://some-invalid-url.invalid")
stub_broker.join(count_words.queue_name) # Exception from actor will be re-raised here.
stub_worker.join()

This way, whatever exception caused the actor to fail will be raised
Expand Down
7 changes: 6 additions & 1 deletion docs/source/troubleshooting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ pytest_, then you can easily do this from the command line using the

You can also pass ``fail_fast=True`` as a parameter to |StubBroker_join|
in order to make it reraise whatever exception caused the actor to
fail in the main thread. Note, however, that the actor is only
fail in the main thread.

.. versionchanged:: 2.0.0
The ``fail_fast`` parameter now defaults to True.

Note, however, that the actor is only
considered to fail once all of its retries have been used up; meaning
that unless you specify custom retry limits for the actors or for your
tests as a whole (by configuring the |Retries| middleware), then each
Expand Down
32 changes: 23 additions & 9 deletions dramatiq/brokers/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,26 @@
from ..common import current_millis, dq_name, iter_queue, join_queue
from ..errors import QueueNotFound
from ..message import Message
from ..middleware import Middleware


class StubBroker(Broker):
"""A broker that can be used within unit tests."""
"""A broker that can be used within unit tests.

def __init__(self, middleware=None):
Parameters:
middleware: See :class:`Broker<dramatiq.Broker>`.
fail_fast_default: Specifies the default value for the ``fail_fast``
argument of :meth:`join<dramatiq.brokers.stub.StubBroker.join>`.
"""

def __init__(self, middleware: Optional[list[Middleware]] = None, *, fail_fast_default: bool = True):
super().__init__(middleware)

self.dead_letters_by_queue = defaultdict(list)
self.dead_letters_by_queue: defaultdict[str, list[MessageProxy]] = defaultdict(list)
self.fail_fast_default: bool = fail_fast_default

@property
def dead_letters(self) -> list[Message]:
def dead_letters(self) -> list[MessageProxy]:
"""The dead-lettered messages for all defined queues."""
return [message for messages in self.dead_letters_by_queue.values() for message in messages]

Expand Down Expand Up @@ -131,8 +139,7 @@ def flush_all(self) -> None:

self.dead_letters_by_queue.clear()

# TODO: Make fail_fast default to True.
def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: bool = False) -> None:
def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: Optional[bool] = None) -> None:
"""Wait for all the messages on the given queue to be
processed. This method is only meant to be used in tests
to wait for all the messages in a queue to be processed.
Expand All @@ -145,10 +152,16 @@ def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: boo
queue_name(str): The queue to wait on.
fail_fast(bool): When this is True and any message gets
dead-lettered during the join, then an exception will be
raised. This will be True by default starting with
version 2.0.
raised. When False, no exception will be raised.
Defaults to None, which means use the value of the
``fail_fast_default`` instance attribute
(which defaults to True).
timeout(Optional[int]): The max amount of time, in
milliseconds, to wait on this queue.

.. versionchanged:: 2.0.0
The ``fail_fast`` parameter now defaults to ``self.fail_fast_default``
(which defaults to True).
"""
try:
queues = [
Expand All @@ -159,6 +172,7 @@ def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: boo
raise QueueNotFound(queue_name) from None

deadline = timeout and time.monotonic() + timeout / 1000
should_fail_fast = fail_fast if fail_fast is not None else self.fail_fast_default
while True:
for queue in queues:
join_timeout = deadline and deadline - time.monotonic()
Expand All @@ -171,7 +185,7 @@ def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: boo
if queue.unfinished_tasks:
break
else:
if fail_fast:
if should_fail_fast:
for message in self.dead_letters_by_queue[queue_name]:
raise (message._exception or Exception("Message failed with unknown error")) from None

Expand Down
40 changes: 20 additions & 20 deletions tests/middleware/test_retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def do_work():
do_work.send()

# Then join on the queue
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# I expect successes
Expand All @@ -48,7 +48,7 @@ def do_work():
do_work.send()

# And join on the queue
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# Then I expect 4 attempts to have occurred
Expand All @@ -69,7 +69,7 @@ def do_work():
do_work.send()

# And join on the queue
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# Then I expect at least one attempt to have occurred
Expand All @@ -88,7 +88,7 @@ def do_work():
do_work.send()

# And join on the queue
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# Then no error should be logged
Expand All @@ -113,7 +113,7 @@ def do_work():
do_work.send()

# And join on the queue
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# Then the actor should have been retried after 100ms
Expand All @@ -136,7 +136,7 @@ def do_work():
do_work.send()

# And join on the queue
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# Then the actor should have retried 10 times without delay
Expand All @@ -160,7 +160,7 @@ def do_work():
do_work.send()

# And join on the queue
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# Then the actor should have retried 10 times without delay
Expand All @@ -184,7 +184,7 @@ def do_work():
do_work.send_with_options(min_backoff=0)

# And join on the queue
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# Then the actor should have retried 10 times without delay
Expand All @@ -208,7 +208,7 @@ def do_work():
do_work.send_with_options(max_backoff=0)

# And join on the queue
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# Then the actor should have retried 10 times without delay
Expand All @@ -231,7 +231,7 @@ def do_work():
do_work.send_with_options(max_retries=max_retries_message_option, min_backoff=50, max_backoff=500)

# And join on the queue
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# Then I expect it to be retried as specified in the message options
Expand All @@ -257,7 +257,7 @@ def raises_errors(raise_runtime_error):
raises_errors.send(False)

# And wait for it
stub_broker.join(raises_errors.queue_name)
stub_broker.join(raises_errors.queue_name, fail_fast=False)
stub_worker.join()

# Then I expect the actor not to retry
Expand All @@ -268,7 +268,7 @@ def raises_errors(raise_runtime_error):
raises_errors.send(True)

# And wait for it
stub_broker.join(raises_errors.queue_name)
stub_broker.join(raises_errors.queue_name, fail_fast=False)
stub_worker.join()

# Then I expect the actor to retry 3 times
Expand Down Expand Up @@ -312,7 +312,7 @@ def do_work():
do_work.send()

# And join on the queue
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# Then no errors and or warnings should be logged
Expand Down Expand Up @@ -350,7 +350,7 @@ def do_work():
message = do_work.send_with_options(delay=100)

# When I join on the queue and run the actor
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# Then I expect correct number of requeue timestamps recorded
Expand Down Expand Up @@ -380,8 +380,8 @@ def do_work():

do_work.send()

stub_broker.join(do_work.queue_name)
stub_broker.join(handle_retries_exhausted.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_broker.join(handle_retries_exhausted.queue_name, fail_fast=False)
stub_worker.join()

# We should have the initial attempt + max_retries
Expand All @@ -405,8 +405,8 @@ def do_work():

do_work.send()

stub_broker.join(do_work.queue_name)
stub_broker.join(handle_retries_exhausted.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_broker.join(handle_retries_exhausted.queue_name, fail_fast=False)
stub_worker.join()

# No retry should be required
Expand All @@ -432,8 +432,8 @@ def do_work():

do_work.send()

stub_broker.join(do_work.queue_name)
stub_broker.join(handle_retries_exhausted.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_broker.join(handle_retries_exhausted.queue_name, fail_fast=False)
stub_worker.join()

# The first retry should have succeeded
Expand Down
5 changes: 3 additions & 2 deletions tests/middleware/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ def do_work():
stub_worker.stop()

# Then join on the queue
stub_broker.join(do_work.queue_name)
with pytest.raises(shutdown.Shutdown): # expect Shutdown exception
stub_broker.join(do_work.queue_name)
stub_worker.join()

# I expect it to shutdown
Expand Down Expand Up @@ -240,7 +241,7 @@ def do_work(n=10, i=0.1):
stub_worker.stop()

# Then join on the queue
stub_broker.join(do_work.queue_name)
stub_broker.join(do_work.queue_name, fail_fast=False)
stub_worker.join()

# I expect only one success and one shutdown
Expand Down
3 changes: 2 additions & 1 deletion tests/middleware/test_time_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def do_work():
do_work.send()

# Then join on the queue
stub_broker.join(do_work.queue_name)
with pytest.raises(time_limit.TimeLimitExceeded): # expect TimeLimitExceeded exception
stub_broker.join(do_work.queue_name)
stub_worker.join()

# I expect the time limit to have been exceeded
Expand Down
17 changes: 11 additions & 6 deletions tests/test_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import dramatiq
from dramatiq import Message, Middleware
from dramatiq.errors import ActorNotFound, RateLimitExceeded
from dramatiq.middleware import CurrentMessage, SkipMessage
from dramatiq.middleware import CurrentMessage, SkipMessage, TimeLimitExceeded

from .common import skip_on_pypy, worker

Expand Down Expand Up @@ -199,7 +199,8 @@ def do_work():
do_work.send()

# And join on the queue
stub_broker.join(do_work.queue_name)
with pytest.raises(TimeLimitExceeded): # expect TimeLimitExceeded exception
stub_broker.join(do_work.queue_name)
stub_worker.join()

# Then I expect it to fail
Expand All @@ -223,7 +224,8 @@ def do_work():
do_work.send_with_options(time_limit=1000)

# Then join on the queue
stub_broker.join(do_work.queue_name)
with pytest.raises(TimeLimitExceeded): # expect TimeLimitExceeded exception
stub_broker.join(do_work.queue_name)
stub_worker.join()

# I expect it to fail
Expand All @@ -248,7 +250,8 @@ def do_work():

# Then join on its queue
with worker(stub_broker, worker_timeout=100) as stub_worker:
stub_broker.join(do_work.queue_name)
with pytest.raises(SkipMessage): # expect SkipMessage exception
stub_broker.join(do_work.queue_name)
stub_worker.join()

# I expect the message to have been skipped
Expand All @@ -272,7 +275,8 @@ def do_work():

# Then join on its queue
with worker(stub_broker, worker_timeout=100) as stub_worker:
stub_broker.join(do_work.queue_name)
with pytest.raises(SkipMessage): # expect SkipMessage exception
stub_broker.join(do_work.queue_name)
stub_worker.join()

# I expect the message to have been skipped
Expand Down Expand Up @@ -483,7 +487,8 @@ def raise_rate_limit_exceeded():
raise_rate_limit_exceeded.send()

# And wait for the message to get processed
stub_broker.join(raise_rate_limit_exceeded.queue_name)
with pytest.raises(RateLimitExceeded): # expect RateLimitExceeded exception
stub_broker.join(raise_rate_limit_exceeded.queue_name)
stub_worker.join()

# Then debug mock should be called with a special message
Expand Down
Loading