Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/source/cookbook.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,16 @@ call |pipeline_get_results|::
for res in pipe.get_results(block=True):
...

If you want the actors in the pipeline to keep executing even if
a actor in it errors out, use the ``pipe_ignore_exception`` option::

(
throws_exception.message(pipe_ignore_exception=True) |
will_still_run.message()
)

Note that if the ``may_fail`` actor throws an exception,
the ``will_still_run`` actor will receive ``None`` as input.

Error Reporting
---------------
Expand Down
9 changes: 7 additions & 2 deletions dramatiq/middleware/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class Pipelines(Middleware):
Parameters:
pipe_ignore(bool): When True, ignores the result of the previous
actor in the pipeline.
pipe_ignore_exception(bool): When True, ignores the fact that the previous
actor in the pipeline threw an exception and runs anyway.
pipe_target(dict): A message representing the actor the current
result should be fed into.
"""
Expand All @@ -33,6 +35,7 @@ class Pipelines(Middleware):
def actor_options(self):
return {
"pipe_ignore",
"pipe_ignore_exception",
"pipe_target",
}

Expand All @@ -42,10 +45,12 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
# from broker -> pipelines -> messages -> broker.
from ..message import Message

if exception is not None or message.failed:
actor = broker.get_actor(message.actor_name)

ignore_exception = message.options.get("pipe_ignore_exception") or actor.options.get("pipe_ignore_exception")
if message.failed or (exception is not None and not ignore_exception):
return

actor = broker.get_actor(message.actor_name)
message_data = message.options.get("pipe_target")
if message_data is not None:
next_message = Message(**message_data)
Expand Down
53 changes: 53 additions & 0 deletions tests/test_composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,59 @@ def should_never_run():
assert not has_run


def test_pipeline_does_continue_to_next_actor_when_ignore_exception_is_set_and_message_threw_an_exception(stub_broker, stub_worker):
has_run = False

@dramatiq.actor
def throw_something():
raise ValueError("something")

@dramatiq.actor
def should_run():
nonlocal has_run
has_run = True

# When I pipe some messages intended for that actor together and run the pipeline
pipe = throw_something.message_with_options(pipe_ignore_exception=True) | should_run.message_with_options(pipe_ignore=True)
pipe.run()

stub_broker.join(should_run.queue_name, timeout=10 * 1000)
stub_worker.join()

# Then the second message in the pipe should have run
assert has_run


def test_pipeline_does_not_continue_to_next_actor_when_ignore_exception_is_set_and_message_is_marked_as_failed(stub_broker, stub_worker):
# Given that I have an actor that fails messages
class FailMessageMiddleware(middleware.Middleware):
def after_process_message(self, broker, message, *, result=None, exception=None):
message.fail()

stub_broker.add_middleware(FailMessageMiddleware())

has_run = False

@dramatiq.actor
def do_nothing():
pass

@dramatiq.actor
def should_run():
nonlocal has_run
has_run = True

# When I pipe some messages intended for that actor together and run the pipeline
pipe = do_nothing.message_with_options(pipe_ignore_exception=True) | should_run.message_with_options(pipe_ignore=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@J08nY with the above, the changes would require a redo

pipe.run()

stub_broker.join(should_run.queue_name, timeout=10 * 1000)
stub_worker.join()

# Then the second message in the pipe should not have run
assert not has_run


def test_pipeline_respects_own_delay(stub_broker, stub_worker, result_backend):
# Given a result backend
# And a broker with the results middleware
Expand Down