diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index e927021c..3733198e 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -16,6 +16,21 @@ Major Breaking Changes These are breaking changes we believe are most likely to effect your project. +* The ``fail_fast`` argument to |StubBroker_join| now defaults to True. + This means that calling |StubBroker_join|, will by default, + re-raise any Exceptions that caused messages to get dead-lettered + (i.e. any uncaught Exceptions in your actor functions). + You may need to explicitly catch these exception in your tests + (e.g. with :meth:`unittest.TestCase.assertRaises` or :func:`pytest.raises`). + + Alternatively, you can revert to the old behavior + by passing ``fail_fast_default=False`` to |StubBroker|. + + However, we think the new default behavior is best, because it makes + exceptions happening in your actor functions obvious in your tests. + Previsouly, exceptions in your actor functions could pass silently, + and potentially unnoticed unless you checked the side-effects of the actor. + (`#739`_, `#758`_, `@LincolnPuzey`_) * The |Prometheus| middleware is no longer in the default middleware list. To keep exporting the Prometheus statistics, you must now install the ``prometheus`` extra (e.g. ``pip install 'dramatiq[prometheus]'``) @@ -31,6 +46,8 @@ These are breaking changes we believe are most likely to effect your project. .. _#688: https://github.com/Bogdanp/dramatiq/pull/688 .. _@azmeuk: https://github.com/azmeuk .. _#728: https://github.com/Bogdanp/dramatiq/pull/728 +.. _#758: https://github.com/Bogdanp/dramatiq/pull/758 +.. _#739: https://github.com/Bogdanp/dramatiq/issues/739 Minor Breaking Changes ~~~~~~~~~~~~~~~~~~~~~~ diff --git a/docs/source/conf.py b/docs/source/conf.py index 935f082a..91ca7ffa 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -195,4 +195,5 @@ "python": ("https://docs.python.org/3", None), "pika": ("https://pika.readthedocs.io/en/stable/", None), "redis": ("https://redis.readthedocs.io/en/latest/", None), + "pytest": ("https://docs.pytest.org/en/stable", None), } diff --git a/docs/source/guide.rst b/docs/source/guide.rst index 8863e65b..867676b1 100644 --- a/docs/source/guide.rst +++ b/docs/source/guide.rst @@ -490,17 +490,18 @@ synchronously by calling them as you would normal functions. Dealing with Exceptions ^^^^^^^^^^^^^^^^^^^^^^^^ -By default, any exceptions raised by an actor are raised in the +By default, any exceptions raised by an actor are caught by the worker, which runs in a separate thread from the one your tests run in. This means that any exceptions your actor throws will not be -visible to your test code! +immediately visible to your test code! -You can make the stub broker re-raise exceptions from failed actors in your -main thread by passing ``fail_fast=True`` to its ``join`` method:: +To help surface actor exceptions, by default, +the stub broker will re-raise exceptions from failed messages +in your main thread when you call its |StubBroker_join| method:: def test_count_words(stub_broker, stub_worker): - count_words.send("http://example.com") - stub_broker.join(count_words.queue_name, fail_fast=True) + count_words.send("http://some-invalid-url.invalid") + stub_broker.join(count_words.queue_name) # Exception from actor will be re-raised here. stub_worker.join() This way, whatever exception caused the actor to fail will be raised diff --git a/docs/source/troubleshooting.rst b/docs/source/troubleshooting.rst index 26b93e8a..fa7803b6 100644 --- a/docs/source/troubleshooting.rst +++ b/docs/source/troubleshooting.rst @@ -58,7 +58,12 @@ pytest_, then you can easily do this from the command line using the You can also pass ``fail_fast=True`` as a parameter to |StubBroker_join| in order to make it reraise whatever exception caused the actor to -fail in the main thread. Note, however, that the actor is only +fail in the main thread. + +.. versionchanged:: 2.0.0 + The ``fail_fast`` parameter now defaults to True. + +Note, however, that the actor is only considered to fail once all of its retries have been used up; meaning that unless you specify custom retry limits for the actors or for your tests as a whole (by configuring the |Retries| middleware), then each diff --git a/dramatiq/brokers/stub.py b/dramatiq/brokers/stub.py index 515ae0d4..8e2637f9 100644 --- a/dramatiq/brokers/stub.py +++ b/dramatiq/brokers/stub.py @@ -27,18 +27,26 @@ from ..common import current_millis, dq_name, iter_queue, join_queue from ..errors import QueueNotFound from ..message import Message +from ..middleware import Middleware class StubBroker(Broker): - """A broker that can be used within unit tests.""" + """A broker that can be used within unit tests. - def __init__(self, middleware=None): + Parameters: + middleware: See :class:`Broker`. + fail_fast_default: Specifies the default value for the ``fail_fast`` + argument of :meth:`join`. + """ + + def __init__(self, middleware: Optional[list[Middleware]] = None, *, fail_fast_default: bool = True): super().__init__(middleware) - self.dead_letters_by_queue = defaultdict(list) + self.dead_letters_by_queue: defaultdict[str, list[MessageProxy]] = defaultdict(list) + self.fail_fast_default: bool = fail_fast_default @property - def dead_letters(self) -> list[Message]: + def dead_letters(self) -> list[MessageProxy]: """The dead-lettered messages for all defined queues.""" return [message for messages in self.dead_letters_by_queue.values() for message in messages] @@ -131,8 +139,7 @@ def flush_all(self) -> None: self.dead_letters_by_queue.clear() - # TODO: Make fail_fast default to True. - def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: bool = False) -> None: + def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: Optional[bool] = None) -> None: """Wait for all the messages on the given queue to be processed. This method is only meant to be used in tests to wait for all the messages in a queue to be processed. @@ -145,10 +152,16 @@ def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: boo queue_name(str): The queue to wait on. fail_fast(bool): When this is True and any message gets dead-lettered during the join, then an exception will be - raised. This will be True by default starting with - version 2.0. + raised. When False, no exception will be raised. + Defaults to None, which means use the value of the + ``fail_fast_default`` instance attribute + (which defaults to True). timeout(Optional[int]): The max amount of time, in milliseconds, to wait on this queue. + + .. versionchanged:: 2.0.0 + The ``fail_fast`` parameter now defaults to ``self.fail_fast_default`` + (which defaults to True). """ try: queues = [ @@ -159,6 +172,7 @@ def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: boo raise QueueNotFound(queue_name) from None deadline = timeout and time.monotonic() + timeout / 1000 + should_fail_fast = fail_fast if fail_fast is not None else self.fail_fast_default while True: for queue in queues: join_timeout = deadline and deadline - time.monotonic() @@ -171,7 +185,7 @@ def join(self, queue_name: str, *, timeout: Optional[int] = None, fail_fast: boo if queue.unfinished_tasks: break else: - if fail_fast: + if should_fail_fast: for message in self.dead_letters_by_queue[queue_name]: raise (message._exception or Exception("Message failed with unknown error")) from None diff --git a/tests/middleware/test_retries.py b/tests/middleware/test_retries.py index 74a0e10e..d092163d 100644 --- a/tests/middleware/test_retries.py +++ b/tests/middleware/test_retries.py @@ -27,7 +27,7 @@ def do_work(): do_work.send() # Then join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # I expect successes @@ -48,7 +48,7 @@ def do_work(): do_work.send() # And join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then I expect 4 attempts to have occurred @@ -69,7 +69,7 @@ def do_work(): do_work.send() # And join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then I expect at least one attempt to have occurred @@ -88,7 +88,7 @@ def do_work(): do_work.send() # And join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then no error should be logged @@ -113,7 +113,7 @@ def do_work(): do_work.send() # And join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then the actor should have been retried after 100ms @@ -136,7 +136,7 @@ def do_work(): do_work.send() # And join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then the actor should have retried 10 times without delay @@ -160,7 +160,7 @@ def do_work(): do_work.send() # And join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then the actor should have retried 10 times without delay @@ -184,7 +184,7 @@ def do_work(): do_work.send_with_options(min_backoff=0) # And join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then the actor should have retried 10 times without delay @@ -208,7 +208,7 @@ def do_work(): do_work.send_with_options(max_backoff=0) # And join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then the actor should have retried 10 times without delay @@ -231,7 +231,7 @@ def do_work(): do_work.send_with_options(max_retries=max_retries_message_option, min_backoff=50, max_backoff=500) # And join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then I expect it to be retried as specified in the message options @@ -257,7 +257,7 @@ def raises_errors(raise_runtime_error): raises_errors.send(False) # And wait for it - stub_broker.join(raises_errors.queue_name) + stub_broker.join(raises_errors.queue_name, fail_fast=False) stub_worker.join() # Then I expect the actor not to retry @@ -268,7 +268,7 @@ def raises_errors(raise_runtime_error): raises_errors.send(True) # And wait for it - stub_broker.join(raises_errors.queue_name) + stub_broker.join(raises_errors.queue_name, fail_fast=False) stub_worker.join() # Then I expect the actor to retry 3 times @@ -312,7 +312,7 @@ def do_work(): do_work.send() # And join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then no errors and or warnings should be logged @@ -350,7 +350,7 @@ def do_work(): message = do_work.send_with_options(delay=100) # When I join on the queue and run the actor - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then I expect correct number of requeue timestamps recorded @@ -380,8 +380,8 @@ def do_work(): do_work.send() - stub_broker.join(do_work.queue_name) - stub_broker.join(handle_retries_exhausted.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) + stub_broker.join(handle_retries_exhausted.queue_name, fail_fast=False) stub_worker.join() # We should have the initial attempt + max_retries @@ -405,8 +405,8 @@ def do_work(): do_work.send() - stub_broker.join(do_work.queue_name) - stub_broker.join(handle_retries_exhausted.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) + stub_broker.join(handle_retries_exhausted.queue_name, fail_fast=False) stub_worker.join() # No retry should be required @@ -432,8 +432,8 @@ def do_work(): do_work.send() - stub_broker.join(do_work.queue_name) - stub_broker.join(handle_retries_exhausted.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) + stub_broker.join(handle_retries_exhausted.queue_name, fail_fast=False) stub_worker.join() # The first retry should have succeeded diff --git a/tests/middleware/test_shutdown.py b/tests/middleware/test_shutdown.py index bcb31415..3324f15d 100644 --- a/tests/middleware/test_shutdown.py +++ b/tests/middleware/test_shutdown.py @@ -174,7 +174,8 @@ def do_work(): stub_worker.stop() # Then join on the queue - stub_broker.join(do_work.queue_name) + with pytest.raises(shutdown.Shutdown): # expect Shutdown exception + stub_broker.join(do_work.queue_name) stub_worker.join() # I expect it to shutdown @@ -240,7 +241,7 @@ def do_work(n=10, i=0.1): stub_worker.stop() # Then join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # I expect only one success and one shutdown diff --git a/tests/middleware/test_time_limit.py b/tests/middleware/test_time_limit.py index 2c186100..00c68a36 100644 --- a/tests/middleware/test_time_limit.py +++ b/tests/middleware/test_time_limit.py @@ -163,7 +163,8 @@ def do_work(): do_work.send() # Then join on the queue - stub_broker.join(do_work.queue_name) + with pytest.raises(time_limit.TimeLimitExceeded): # expect TimeLimitExceeded exception + stub_broker.join(do_work.queue_name) stub_worker.join() # I expect the time limit to have been exceeded diff --git a/tests/test_actors.py b/tests/test_actors.py index dca1fe28..4ba5b91b 100644 --- a/tests/test_actors.py +++ b/tests/test_actors.py @@ -10,7 +10,7 @@ import dramatiq from dramatiq import Message, Middleware from dramatiq.errors import ActorNotFound, RateLimitExceeded -from dramatiq.middleware import CurrentMessage, SkipMessage +from dramatiq.middleware import CurrentMessage, SkipMessage, TimeLimitExceeded from .common import skip_on_pypy, worker @@ -199,7 +199,8 @@ def do_work(): do_work.send() # And join on the queue - stub_broker.join(do_work.queue_name) + with pytest.raises(TimeLimitExceeded): # expect TimeLimitExceeded exception + stub_broker.join(do_work.queue_name) stub_worker.join() # Then I expect it to fail @@ -223,7 +224,8 @@ def do_work(): do_work.send_with_options(time_limit=1000) # Then join on the queue - stub_broker.join(do_work.queue_name) + with pytest.raises(TimeLimitExceeded): # expect TimeLimitExceeded exception + stub_broker.join(do_work.queue_name) stub_worker.join() # I expect it to fail @@ -248,7 +250,8 @@ def do_work(): # Then join on its queue with worker(stub_broker, worker_timeout=100) as stub_worker: - stub_broker.join(do_work.queue_name) + with pytest.raises(SkipMessage): # expect SkipMessage exception + stub_broker.join(do_work.queue_name) stub_worker.join() # I expect the message to have been skipped @@ -272,7 +275,8 @@ def do_work(): # Then join on its queue with worker(stub_broker, worker_timeout=100) as stub_worker: - stub_broker.join(do_work.queue_name) + with pytest.raises(SkipMessage): # expect SkipMessage exception + stub_broker.join(do_work.queue_name) stub_worker.join() # I expect the message to have been skipped @@ -483,7 +487,8 @@ def raise_rate_limit_exceeded(): raise_rate_limit_exceeded.send() # And wait for the message to get processed - stub_broker.join(raise_rate_limit_exceeded.queue_name) + with pytest.raises(RateLimitExceeded): # expect RateLimitExceeded exception + stub_broker.join(raise_rate_limit_exceeded.queue_name) stub_worker.join() # Then debug mock should be called with a special message diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py index 1675c8d3..158256a4 100644 --- a/tests/test_callbacks.py +++ b/tests/test_callbacks.py @@ -50,7 +50,7 @@ def report_exceptions(message_data, exception_data): do_work.send_with_options(on_failure="report_exceptions") # And join on the broker and worker - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then my db should contain the result @@ -98,7 +98,7 @@ def do_work(): do_work.send() # And join on the queue - stub_broker.join(do_work.queue_name) + stub_broker.join(do_work.queue_name, fail_fast=False) stub_worker.join() # Then I expect 4 attempts to have occurred diff --git a/tests/test_composition.py b/tests/test_composition.py index afc42382..a89a2cfa 100644 --- a/tests/test_composition.py +++ b/tests/test_composition.py @@ -291,7 +291,7 @@ def should_never_run(): pipe = do_nothing.message_with_options(pipe_ignore=True) | should_never_run.message() pipe.run() - stub_broker.join(should_never_run.queue_name, timeout=10 * 1000) + stub_broker.join(should_never_run.queue_name, timeout=10 * 1000, fail_fast=False) stub_worker.join() # Then the second message in the pipe should never have run