Skip to content

Commit 3a31744

Browse files
committed
Allow for pipelines to ignore error and continue.
1 parent a978028 commit 3a31744

File tree

3 files changed

+70
-2
lines changed

3 files changed

+70
-2
lines changed

docs/source/cookbook.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,16 @@ call |pipeline_get_results|::
144144
for res in pipe.get_results(block=True):
145145
...
146146

147+
If you want the actors in the pipeline to keep executing even if
148+
a actor in it errors out, use the ``pipe_ignore_exception`` option::
149+
150+
(
151+
throws_exception.message(pipe_ignore_exception=True) |
152+
will_still_run.message()
153+
)
154+
155+
Note that if the ``may_fail`` actor throws an exception,
156+
the ``will_still_run`` actor will receive ``None`` as input.
147157

148158
Error Reporting
149159
---------------

dramatiq/middleware/pipelines.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ class Pipelines(Middleware):
2525
Parameters:
2626
pipe_ignore(bool): When True, ignores the result of the previous
2727
actor in the pipeline.
28+
pipe_ignore_exception(bool): When True, ignores the fact that the previous
29+
actor in the pipeline threw an exception and runs anyway.
2830
pipe_target(dict): A message representing the actor the current
2931
result should be fed into.
3032
"""
@@ -33,6 +35,7 @@ class Pipelines(Middleware):
3335
def actor_options(self):
3436
return {
3537
"pipe_ignore",
38+
"pipe_ignore_exception",
3639
"pipe_target",
3740
}
3841

@@ -42,10 +45,12 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
4245
# from broker -> pipelines -> messages -> broker.
4346
from ..message import Message
4447

45-
if exception is not None or message.failed:
48+
actor = broker.get_actor(message.actor_name)
49+
50+
ignore_exception = message.options.get("pipe_ignore_exception") or actor.options.get("pipe_ignore_exception")
51+
if message.failed or (exception is not None and not ignore_exception):
4652
return
4753

48-
actor = broker.get_actor(message.actor_name)
4954
message_data = message.options.get("pipe_target")
5055
if message_data is not None:
5156
next_message = Message(**message_data)

tests/test_composition.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,59 @@ def should_never_run():
298298
assert not has_run
299299

300300

301+
def test_pipeline_does_continue_to_next_actor_when_ignore_exception_is_set_and_message_threw_an_exception(stub_broker, stub_worker):
302+
has_run = False
303+
304+
@dramatiq.actor
305+
def throw_something():
306+
raise ValueError("something")
307+
308+
@dramatiq.actor
309+
def should_run():
310+
nonlocal has_run
311+
has_run = True
312+
313+
# When I pipe some messages intended for that actor together and run the pipeline
314+
pipe = throw_something.message_with_options(pipe_ignore_exception=True) | should_run.message_with_options(pipe_ignore=True)
315+
pipe.run()
316+
317+
stub_broker.join(should_run.queue_name, timeout=10 * 1000)
318+
stub_worker.join()
319+
320+
# Then the second message in the pipe should have run
321+
assert has_run
322+
323+
324+
def test_pipeline_does_not_continue_to_next_actor_when_ignore_exception_is_set_and_message_is_marked_as_failed(stub_broker, stub_worker):
325+
# Given that I have an actor that fails messages
326+
class FailMessageMiddleware(middleware.Middleware):
327+
def after_process_message(self, broker, message, *, result=None, exception=None):
328+
message.fail()
329+
330+
stub_broker.add_middleware(FailMessageMiddleware())
331+
332+
has_run = False
333+
334+
@dramatiq.actor
335+
def do_nothing():
336+
pass
337+
338+
@dramatiq.actor
339+
def should_run():
340+
nonlocal has_run
341+
has_run = True
342+
343+
# When I pipe some messages intended for that actor together and run the pipeline
344+
pipe = do_nothing.message_with_options(pipe_ignore_exception=True) | should_run.message_with_options(pipe_ignore=True)
345+
pipe.run()
346+
347+
stub_broker.join(should_run.queue_name, timeout=10 * 1000)
348+
stub_worker.join()
349+
350+
# Then the second message in the pipe should not have run
351+
assert not has_run
352+
353+
301354
def test_pipeline_respects_own_delay(stub_broker, stub_worker, result_backend):
302355
# Given a result backend
303356
# And a broker with the results middleware

0 commit comments

Comments
 (0)