Skip to content

Conversation

@J08nY
Copy link

@J08nY J08nY commented Feb 4, 2025

It is useful for me to have a pipeline of messages that execute. However, I want to keep executing the pipeline even if the individual message fails. This pull-request implements that using a new option on the message in the Pipelines middleware.

@J08nY J08nY force-pushed the feat/pipeline-errorok branch 4 times, most recently from ef3e3d4 to b941324 Compare February 4, 2025 16:45
@J08nY J08nY force-pushed the feat/pipeline-errorok branch from b941324 to 021e6fc Compare April 14, 2025 11:10
if exception is not None or message.failed:
actor = broker.get_actor(message.actor_name)

errorok = message.options.get("pipe_ignore_exception") or actor.options.get("pipe_ignore_exception")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@J08nY I'd rename the left-side to ignore_exception

actor = broker.get_actor(message.actor_name)

errorok = message.options.get("pipe_ignore_exception") or actor.options.get("pipe_ignore_exception")
if (exception is not None or message.failed) and not errorok:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@J08nY exception is not None and message.failed shouldn't be considered equal - they may have different meanings in the actor implementation. Only explicitly thrown exceptions should be caught.

I'd also go for explicit .. and ignore_exception is not True

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is why I had the name "errorok", to consider both cases of message failures. This can of course be split, but that requires a new parameter.

has_run = True

# When I pipe some messages intended for that actor together and run the pipeline
pipe = do_nothing.message_with_options(pipe_ignore_exception=True) | should_run.message_with_options(pipe_ignore=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@J08nY with the above, the changes would require a redo

@LincolnPuzey
Copy link
Collaborator

LincolnPuzey commented Apr 14, 2025

I believe we are trying to avoid adding new features when possible. Given @J08nY it looks like you could achieve what you needed with a middleware subclass, I am apprehensive about bringing the change into the core library, sorry.


Looking at the code, if we allow pipelines to continue when exception is not None (but the message hasn't failed yet), I think that could mean the Message will be re-tried and the pipeline is continued.

Which could then mean the pipeline would be continued multiple times? each time the message is re-tried (default 20 times). This is obviously not good.

@J08nY
Copy link
Author

J08nY commented Apr 14, 2025

I believe we are trying to avoid adding new features when possible. Given @J08nY it looks like you could achieve what you needed with a middleware subclass, I am apprehensive about bringing the change into the core library, sorry.

I see. Would you mind including it somewhere in the README? I mean like a recommendation to try to use middleware for custom features, or make an issue if it is not possible and that new features are unlikely to get accepted. Would save time.

Looking at the code, if we allow pipelines to continue when exception is not None (but the message hasn't failed yet), I think that could mean the Message will be re-tried and the pipeline is continued.

Hmm, I guess my understanding of what the different states can be and how they are handled is lacking.

Would you suggest subclassing the Pipeline middleware and replacing the built-in one? In my use-cases I do not have/want retries and thus it is much simpler.

@synweap15
Copy link
Collaborator

synweap15 commented Apr 14, 2025

@LincolnPuzey @Bogdanp gave me the go-ahead. Let's have a private conversation.

@J08nY, I apologize for the poor communication.

@J08nY J08nY force-pushed the feat/pipeline-errorok branch from 021e6fc to 3a31744 Compare April 15, 2025 11:54
@LincolnPuzey
Copy link
Collaborator

@synweap15 My apologies, I didn't know.

Do you agree that continuing a pipeline multiple times due to retries, is something we should avoid?

@J08nY
Copy link
Author

J08nY commented Apr 16, 2025

Are there some semantics of this concept that you want to ignore exceptions in an actor in a pipeline, that work together with retries?

The only thing I can think of is to make the actor eat the exception and thus effectively disable retries.

@J08nY
Copy link
Author

J08nY commented Jun 10, 2025

Any updates on this? I would still like to be able to have pipelines with actors that fail, yet the pipeline continues.

@synweap15
Copy link
Collaborator

synweap15 commented Jun 10, 2025 via email

@synweap15
Copy link
Collaborator

I see this feature useful in scenarios where an action may fail, but there are several attempts to execute it. Example I have encountered - heavy calculations result caching where the cache key may be locked. Not a deal breaker if fails but desirable to be persisted.

Explaination of params use

  • pipe_ignore=[any], retries=10, pipe_ignore_exception=False -> action fail - retries actor, halts on final failure (as it is now)
  • pipe_ignore=[any], retries=10, pipe_ignore_exception=True -> action fail - retries actor, then continues pipeline on fail
  • pipe_ignore=True, retries=[any], pipe_ignore_exception=True -> action fail - next actor runs with no input from prev
  • pipe_ignore=False, retries=[any], pipe_ignore_exception=True -> action fail - next actor runs and either the output value or None is passed (more on that below)

I guess this would be similar in function to a middleware:

class GracefulRetryMiddleware(Middleware):
    def after_process_message(self, broker, message, actor, result=None, exception=None):
        if exception is not None:
            retries_left = message.options.get("retries", 0)
            max_retries = actor.options.get("max_retries", 0)
            
            if retries_left >= max_retries:
                # Final retry failed, continuing anyway
                return None  # swallow the exception after final retry
            
            # "Exception: {exception}, will retry (attempt {retries_left}/{max_retries})"
            raise exception 

Considerations

  • we lose information whether or not the previous actor ran successfully.
  • if pipe_ignore=False and pipe_ignore_exception=True, the input to the following actor may be None - is None check may be needed

Thoughts?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants