diff --git a/docs/source/cookbook.rst b/docs/source/cookbook.rst index 4213b97b..82281bea 100644 --- a/docs/source/cookbook.rst +++ b/docs/source/cookbook.rst @@ -144,6 +144,16 @@ call |pipeline_get_results|:: for res in pipe.get_results(block=True): ... +If you want the actors in the pipeline to keep executing even if +a actor in it errors out, use the ``pipe_ignore_exception`` option:: + + ( + throws_exception.message(pipe_ignore_exception=True) | + will_still_run.message() + ) + +Note that if the ``may_fail`` actor throws an exception, +the ``will_still_run`` actor will receive ``None`` as input. Error Reporting --------------- diff --git a/dramatiq/middleware/pipelines.py b/dramatiq/middleware/pipelines.py index eb2ea9e2..85bef088 100644 --- a/dramatiq/middleware/pipelines.py +++ b/dramatiq/middleware/pipelines.py @@ -25,6 +25,8 @@ class Pipelines(Middleware): Parameters: pipe_ignore(bool): When True, ignores the result of the previous actor in the pipeline. + pipe_ignore_exception(bool): When True, ignores the fact that the previous + actor in the pipeline threw an exception and runs anyway. pipe_target(dict): A message representing the actor the current result should be fed into. """ @@ -33,6 +35,7 @@ class Pipelines(Middleware): def actor_options(self): return { "pipe_ignore", + "pipe_ignore_exception", "pipe_target", } @@ -42,10 +45,12 @@ def after_process_message(self, broker, message, *, result=None, exception=None) # from broker -> pipelines -> messages -> broker. from ..message import Message - if exception is not None or message.failed: + actor = broker.get_actor(message.actor_name) + + ignore_exception = message.options.get("pipe_ignore_exception") or actor.options.get("pipe_ignore_exception") + if message.failed or (exception is not None and not ignore_exception): return - actor = broker.get_actor(message.actor_name) message_data = message.options.get("pipe_target") if message_data is not None: next_message = Message(**message_data) diff --git a/tests/test_composition.py b/tests/test_composition.py index 2fed053d..fa378883 100644 --- a/tests/test_composition.py +++ b/tests/test_composition.py @@ -298,6 +298,59 @@ def should_never_run(): assert not has_run +def test_pipeline_does_continue_to_next_actor_when_ignore_exception_is_set_and_message_threw_an_exception(stub_broker, stub_worker): + has_run = False + + @dramatiq.actor + def throw_something(): + raise ValueError("something") + + @dramatiq.actor + def should_run(): + nonlocal has_run + has_run = True + + # When I pipe some messages intended for that actor together and run the pipeline + pipe = throw_something.message_with_options(pipe_ignore_exception=True) | should_run.message_with_options(pipe_ignore=True) + pipe.run() + + stub_broker.join(should_run.queue_name, timeout=10 * 1000) + stub_worker.join() + + # Then the second message in the pipe should have run + assert has_run + + +def test_pipeline_does_not_continue_to_next_actor_when_ignore_exception_is_set_and_message_is_marked_as_failed(stub_broker, stub_worker): + # Given that I have an actor that fails messages + class FailMessageMiddleware(middleware.Middleware): + def after_process_message(self, broker, message, *, result=None, exception=None): + message.fail() + + stub_broker.add_middleware(FailMessageMiddleware()) + + has_run = False + + @dramatiq.actor + def do_nothing(): + pass + + @dramatiq.actor + def should_run(): + nonlocal has_run + has_run = True + + # When I pipe some messages intended for that actor together and run the pipeline + pipe = do_nothing.message_with_options(pipe_ignore_exception=True) | should_run.message_with_options(pipe_ignore=True) + pipe.run() + + stub_broker.join(should_run.queue_name, timeout=10 * 1000) + stub_worker.join() + + # Then the second message in the pipe should not have run + assert not has_run + + def test_pipeline_respects_own_delay(stub_broker, stub_worker, result_backend): # Given a result backend # And a broker with the results middleware