Skip to content

Add workflow system info in Workflow Execution context #5405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions contrib/runners/orquesta_runner/tests/unit/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from st2common.transport import liveaction as lv_ac_xport
from st2common.transport import workflow as wf_ex_xport
from st2common.transport import publishers
from st2common.util import system_info
from st2tests.mocks import liveaction as mock_lv_ac_xport
from st2tests.mocks import workflow as mock_wf_ex_xport

Expand Down Expand Up @@ -165,6 +166,7 @@ def test_run_workflow(self):
expected_lv_ac_ctx = {
"workflow_execution": str(wf_ex_db.id),
"pack": "orquesta_tests",
"engine_info": system_info.get_process_info(),
}

self.assertDictEqual(lv_ac_db.context, expected_lv_ac_ctx)
Expand Down
2 changes: 2 additions & 0 deletions st2actions/st2actions/workflows/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,13 @@ def fail_workflow_execution(self, message, exception):
wf_svc.update_progress(wf_ex_db, msg % (msg_type, wf_ex_id), severity="error")
wf_svc.fail_workflow_execution(wf_ex_id, exception, task=task)

@wf_svc.add_system_info_to_action_context
def handle_workflow_execution(self, wf_ex_db):
# Request the next set of tasks to execute.
wf_svc.update_progress(wf_ex_db, "Processing request for workflow execution.")
wf_svc.request_next_tasks(wf_ex_db)

@wf_svc.add_system_info_to_action_context
def handle_action_execution(self, ac_ex_db):
# Exit if action execution is not executed under an orquesta workflow.
if not wf_svc.is_action_execution_under_workflow_context(ac_ex_db):
Expand Down
38 changes: 38 additions & 0 deletions st2common/st2common/services/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import retrying
import six

from functools import wraps

from orquesta import conducting
from orquesta import events
from orquesta import exceptions as orquesta_exc
Expand Down Expand Up @@ -49,6 +51,7 @@
from st2common.util import action_db as action_utils
from st2common.util import date as date_utils
from st2common.util import param as param_utils
from st2common.util import system_info


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -1561,3 +1564,38 @@ def identify_orphaned_workflows():
continue

return orphaned


def add_system_info_to_action_context(func):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my other comment, I think we should refactor this 1 or 2 separate functions and not use a decorator.

@wraps(func)
def wrapper(*args, **kwargs):
engine_info = system_info.get_process_info()

# Argument will be of type WorkflowExecutionDB/ActionExecutionDB only.
if kwargs.get("wf_ex_db"):
ac_ex = ex_db_access.ActionExecution.get_by_id(
kwargs["wf_ex_db"].action_execution
)
lv_ex = lv_db_access.LiveAction.get_by_id(ac_ex.liveaction["id"])
elif kwargs.get("ac_ex_db"):
lv_ex = lv_db_access.LiveAction.get_by_id(
kwargs["ac_ex_db"].liveaction["id"]
)
else:
raise ValueError("Invalid message type")

lv_ex.context.update({"engine_info": engine_info})
lv_db_access.LiveAction.add_or_update(lv_ex, publish=False)

try:
return func(*args, **kwargs)
except Exception as e:
raise e
finally:
lv_ex = lv_db_access.LiveAction.get_by_id(lv_ex.id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear why we need this code path so we should add a comment on why this delete is here and why it's needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kami The intention here is to add engine information to the context at the start of handling and remove it once handling is complete.
Any action in running/pausing/cancelling state, having engine information about a deregistered service can be classified as orphaned workflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the workflow execution can be processed by 1 or more different workflow engine during its execution, it's difficult to know which workflow engine is currently handling the execution.

try:
del lv_ex.context["engine_info"]
except KeyError:
pass

return wrapper