Skip to content

Commit ef3e3d4

Browse files
committed
Allow for pipelines to ignore error and continue.
1 parent a978028 commit ef3e3d4

File tree

3 files changed

+47
-2
lines changed

3 files changed

+47
-2
lines changed

docs/source/cookbook.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,16 @@ call |pipeline_get_results|::
144144
for res in pipe.get_results(block=True):
145145
...
146146

147+
If you want the actors in the pipeline to keep executing even if
148+
a actor in it errors out, use the ``pipe_errorok`` option::
149+
150+
(
151+
may_fail.mmessage(pipe_errorok=True) |
152+
will_still_run.message()
153+
)
154+
155+
Note that if the ``may_fail`` actor fails, the ``will_still_run`` actor
156+
will receive ``None`` as input.
147157

148158
Error Reporting
149159
---------------

dramatiq/middleware/pipelines.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ class Pipelines(Middleware):
2525
Parameters:
2626
pipe_ignore(bool): When True, ignores the result of the previous
2727
actor in the pipeline.
28+
pipe_errorok(bool): When True, ignores the fact that the previous
29+
actor in the pipeline failed and runs anyway.
2830
pipe_target(dict): A message representing the actor the current
2931
result should be fed into.
3032
"""
@@ -33,6 +35,7 @@ class Pipelines(Middleware):
3335
def actor_options(self):
3436
return {
3537
"pipe_ignore",
38+
"pipe_errorok",
3639
"pipe_target",
3740
}
3841

@@ -42,10 +45,12 @@ def after_process_message(self, broker, message, *, result=None, exception=None)
4245
# from broker -> pipelines -> messages -> broker.
4346
from ..message import Message
4447

45-
if exception is not None or message.failed:
48+
actor = broker.get_actor(message.actor_name)
49+
50+
errorok = message.options.get("pipe_errorok") or actor.options.get("pipe_errorok")
51+
if (exception is not None or message.failed) and not errorok:
4652
return
4753

48-
actor = broker.get_actor(message.actor_name)
4954
message_data = message.options.get("pipe_target")
5055
if message_data is not None:
5156
next_message = Message(**message_data)

tests/test_composition.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,36 @@ def should_never_run():
298298
assert not has_run
299299

300300

301+
def test_pipeline_does_continue_to_next_actor_when_errorok_is_set_and_message_is_marked_as_failed(stub_broker, stub_worker):
302+
# Given that I have an actor that fails messages
303+
class FailMessageMiddleware(middleware.Middleware):
304+
def after_process_message(self, broker, message, *, result=None, exception=None):
305+
message.fail()
306+
307+
stub_broker.add_middleware(FailMessageMiddleware())
308+
309+
has_run = False
310+
311+
@dramatiq.actor
312+
def do_nothing():
313+
pass
314+
315+
@dramatiq.actor
316+
def should_run():
317+
nonlocal has_run
318+
has_run = True
319+
320+
# When I pipe some messages intended for that actor together and run the pipeline
321+
pipe = do_nothing.message_with_options(pipe_errorok=True) | should_run.message(pipe_ignore=True)
322+
pipe.run()
323+
324+
stub_broker.join(should_run.queue_name, timeout=10 * 1000)
325+
stub_worker.join()
326+
327+
# Then the second message in the pipe should have run
328+
assert has_run
329+
330+
301331
def test_pipeline_respects_own_delay(stub_broker, stub_worker, result_backend):
302332
# Given a result backend
303333
# And a broker with the results middleware

0 commit comments

Comments
 (0)