Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6578.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved handling of any internal errors when executing commands against a running workflow.
76 changes: 63 additions & 13 deletions cylc/flow/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
import asyncio
import getpass
import json
from typing import Optional, Tuple
from typing import (
TYPE_CHECKING,
Optional,
Tuple,
Union,
)

import zmq
import zmq.asyncio
Expand All @@ -30,32 +35,77 @@
CylcError,
CylcVersionError,
ServiceFileError,
WorkflowStopped
WorkflowStopped,
)
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.workflow_files import (
ContactFileFields,
KeyType,
KeyOwner,
KeyInfo,
KeyOwner,
KeyType,
get_workflow_srv_dir,
load_contact_file,
get_workflow_srv_dir
)


if TYPE_CHECKING:
# BACK COMPAT: typing_extensions.TypedDict
# FROM: Python 3.7
# TO: Python 3.11
from typing_extensions import TypedDict


API = 5 # cylc API version
MSG_TIMEOUT = "TIMEOUT"

if TYPE_CHECKING:
class ResponseDict(TypedDict, total=False):
"""Structure of server response messages.

def encode_(message):
"""Convert the structure holding a message field from JSON to a string."""
try:
return json.dumps(message)
except TypeError as exc:
return json.dumps({'errors': [{'message': str(exc)}]})
Confusingly, has similar format to GraphQL execution result.
But if we change this now we could break compatibility for
issuing commands to/receiving responses from workflows running in
different versions of Cylc 8.
"""
data: object
"""For most Cylc commands that issue GQL mutations, the data field will
look like:
data: {
<mutationName1>: {
result: [
{
id: <workflow/task ID>,
response: [<success_bool>, <message>]
},
...
]
}
}
but this is not 100% consistent unfortunately
"""
error: Union[Exception, str, dict]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is error an Exception or a str?

Copy link
Copy Markdown
Member Author

@MetRonnie MetRonnie Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I tried to change it to Exception | str to begin with, to simplify things, but realised it breaks inter-version compatibility. So presently I don't think it ever is an Exception or str. However, with these changes we should be able to change it to be Exception | str in future without breaking inter-version compatibility with this version onwards.

"""If an error occurred that could not be handled.
(usually a dict {message: str, traceback?: str}).
"""
user: str
cylc_version: str
"""Server (i.e. running workflow) Cylc version.

Going forward, we include this so we can more easily handle any future
back-compat issues."""


def serialize(data: object) -> str:
"""Convert the structure holding a message to a JSON message string."""
# Abstract out the transport format in order to allow it to be changed
# in future.
return json.dumps(data)


def decode_(message):
"""Convert an encoded message string to JSON with an added 'user' field."""
def deserialize(message: str) -> 'ResponseDict':
"""Convert a JSON message string to dict with an added 'user' field."""
# Abstract out the transport format in order to allow it to be changed
# in future.
msg = json.loads(message)
msg['user'] = getpass.getuser() # assume this is the user
return msg
Expand Down
68 changes: 42 additions & 26 deletions cylc/flow/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,30 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Client for workflow runtime API."""

from abc import ABCMeta, abstractmethod
from abc import (
ABCMeta,
abstractmethod,
)
import asyncio
import os
from shutil import which
import socket
import sys
from typing import Any, Optional, Union, Dict
from typing import (
TYPE_CHECKING,
Any,
Dict,
Optional,
Union,
)

import zmq
import zmq.asyncio

from cylc.flow import LOG
from cylc.flow import (
LOG,
__version__ as CYLC_VERSION,
)
from cylc.flow.exceptions import (
ClientError,
ClientTimeout,
Expand All @@ -36,16 +48,18 @@
)
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.network import (
encode_,
decode_,
ZMQSocketBase,
get_location,
ZMQSocketBase
deserialize,
serialize,
)
from cylc.flow.network.client_factory import CommsMeth
from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.workflow_files import (
detect_old_contact_file,
)
from cylc.flow.workflow_files import detect_old_contact_file


if TYPE_CHECKING:
from cylc.flow.network import ResponseDict


class WorkflowRuntimeClientBase(metaclass=ABCMeta):
Expand Down Expand Up @@ -292,12 +306,12 @@ async def async_request(
if req_meta:
msg['meta'].update(req_meta)
LOG.debug('zmq:send %s', msg)
message = encode_(msg)
message = serialize(msg)
self.socket.send_string(message)

# receive response
if self.poller.poll(timeout):
res = await self.socket.recv()
res: bytes = await self.socket.recv()
else:
self.timeout_handler()
raise ClientTimeout(
Expand All @@ -307,26 +321,28 @@ async def async_request(
' --comms-timeout option;'
'\n* or check the workflow log.'
)
LOG.debug('zmq:recv %s', res)

if msg['command'] in PB_METHOD_MAP:
response = {'data': res}
else:
response = decode_(
res.decode() if isinstance(res, bytes) else res
)
LOG.debug('zmq:recv %s', response)
if command in PB_METHOD_MAP:
return res

response: ResponseDict = deserialize(res.decode())

try:
return response['data']
except KeyError:
error = response.get(
'error',
{'message': f'Received invalid response: {response}'},
)
raise ClientError(
error.get('message'), # type: ignore
error.get('traceback'), # type: ignore
) from None
error = response.get('error')
if not error:
error = (
f"Received invalid response for Cylc {CYLC_VERSION}: "
f"{response}"
)
wflow_cylc_ver = response.get('cylc_version')
if wflow_cylc_ver and wflow_cylc_ver != CYLC_VERSION:
error += (
f"\n(Workflow is running in Cylc {wflow_cylc_ver})"
)
raise ClientError(str(error)) from None

def get_header(self) -> dict:
"""Return "header" data to attach to each request for traceability.
Expand Down
44 changes: 39 additions & 5 deletions cylc/flow/network/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,36 @@

import asyncio
import sys
from typing import Callable, Dict, List, Tuple, Optional, Union, Type
from typing import (
Callable,
Dict,
List,
Optional,
Tuple,
Type,
Union,
)

from ansimarkup import ansiprint

from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.async_util import unordered_map
from cylc.flow.exceptions import CylcError, WorkflowStopped
from cylc.flow.exceptions import (
CylcError,
WorkflowStopped,
)
import cylc.flow.flags
from cylc.flow.id_cli import parse_ids_async
from cylc.flow.terminal import DIM


# Known error messages for incompatibilites between this version of Cylc (that
# is running the command) and the version of Cylc running the workflow:
KNOWN_INCOMPAT = {
'Unknown argument "onResume" on field "trigger" of type "Mutations".',
}


def call_multi(*args, **kwargs):
"""Call a function for each workflow in a list of IDs.

Expand Down Expand Up @@ -220,21 +239,36 @@


def _report(
response: dict,
response: Union[dict, list],
) -> Tuple[Optional[str], Optional[str], bool]:
"""Report the result of a GraphQL operation.

This analyses GraphQL mutation responses to determine the outcome.

Args:
response: The GraphQL response.
response: The workflow server response (NOT necessarily conforming to
GraphQL execution result spec).

Returns:
(stdout, stderr, outcome)

"""
try:
ret: List[Tuple[Optional[str], Optional[str], bool]] = []
if not isinstance(response, dict):
if isinstance(response, list) and response[0].get('error'):
# If operating on workflow running in older Cylc version,
# may get a error response like [{'error': '...'}]
if response[0]['error'].get('message') in KNOWN_INCOMPAT:
raise Exception(

Check warning on line 263 in cylc/flow/network/multi.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/multi.py#L263

Added line #L263 was not covered by tests
"This command is no longer compatible with the "
"version of Cylc running the workflow. Please stop & "
f"restart the workflow with Cylc {CYLC_VERSION} "
"or higher."
f"\n\n{response}"
)
raise Exception(response)
raise Exception(f"Unexpected response: {response}")

Check warning on line 271 in cylc/flow/network/multi.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/multi.py#L270-L271

Added lines #L270 - L271 were not covered by tests
for mutation_response in response.values():
# extract the result of each mutation result in the response
success, msg = mutation_response['result'][0]['response']
Expand Down Expand Up @@ -268,7 +302,7 @@
# response returned is not in the expected format - this shouldn't
# happen but we need to protect against it
err_msg = ''
if cylc.flow.flags.verbosity > 1: # debug mode
if cylc.flow.flags.verbosity > 0: # verbose mode
# print the full result to stderr
err_msg += f'\n <{DIM}>response={response}</{DIM}>'
return (
Expand Down
Loading
Loading