Skip to content

Commit ee3102d

Browse files
committed
Improve server-client communication error handling
1 parent b942907 commit ee3102d

File tree

12 files changed

+377
-165
lines changed

12 files changed

+377
-165
lines changed

cylc/flow/network/__init__.py

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@
1818
import asyncio
1919
import getpass
2020
import json
21-
from typing import Optional, Tuple
21+
from typing import (
22+
TYPE_CHECKING,
23+
Optional,
24+
Tuple,
25+
Union,
26+
)
2227

2328
import zmq
2429
import zmq.asyncio
@@ -30,34 +35,71 @@
3035
CylcError,
3136
CylcVersionError,
3237
ServiceFileError,
33-
WorkflowStopped
38+
WorkflowStopped,
3439
)
3540
from cylc.flow.hostuserutil import get_fqdn_by_host
3641
from cylc.flow.workflow_files import (
3742
ContactFileFields,
38-
KeyType,
39-
KeyOwner,
4043
KeyInfo,
44+
KeyOwner,
45+
KeyType,
46+
get_workflow_srv_dir,
4147
load_contact_file,
42-
get_workflow_srv_dir
4348
)
4449

50+
51+
if TYPE_CHECKING:
52+
# BACK COMPAT: typing_extensions.TypedDict
53+
# FROM: Python 3.7
54+
# TO: Python 3.11
55+
from typing_extensions import TypedDict
56+
57+
4558
API = 5 # cylc API version
4659
MSG_TIMEOUT = "TIMEOUT"
4760

61+
if TYPE_CHECKING:
62+
class ResponseDict(TypedDict, total=False):
63+
"""Structure of server response messages.
4864
49-
def encode_(message):
50-
"""Convert the structure holding a message field from JSON to a string."""
51-
try:
52-
return json.dumps(message)
53-
except TypeError as exc:
54-
return json.dumps({'errors': [{'message': str(exc)}]})
65+
Confusingly, has similar format to GraphQL execution result.
66+
But if we change this now we could break compatibility for
67+
issuing commands to/receiving responses from workflows running in
68+
different versions of Cylc 8.
69+
"""
70+
data: object
71+
"""For most Cylc commands that issue GQL mutations, the data field will
72+
look like:
73+
data: {
74+
<mutationName1>: {
75+
result: [
76+
{
77+
id: <workflow/task ID>,
78+
response: [<success_bool>, <message>]
79+
},
80+
...
81+
]
82+
}
83+
}
84+
but this is not 100% consistent unfortunately
85+
"""
86+
error: Union[Exception, str, dict]
87+
"""If an error occurred that could not be handled.
88+
(usually a dict {message: str, traceback?: str}).
89+
"""
90+
user: str
91+
cylc_version: str
92+
"""Server (i.e. running workflow) Cylc version.
93+
94+
Going forward, we include this so we can more easily handle any future
95+
back-compat issues."""
5596

5697

57-
def decode_(message):
58-
"""Convert an encoded message string to JSON with an added 'user' field."""
98+
def load_server_response(message: str) -> 'ResponseDict':
99+
"""Convert a JSON message string to dict with an added 'user' field."""
59100
msg = json.loads(message)
60-
msg['user'] = getpass.getuser() # assume this is the user
101+
if 'user' not in msg:
102+
msg['user'] = getpass.getuser() # assume this is the user
61103
return msg
62104

63105

cylc/flow/network/client.py

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,31 @@
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
"""Client for workflow runtime API."""
1717

18-
from abc import ABCMeta, abstractmethod
18+
from abc import (
19+
ABCMeta,
20+
abstractmethod,
21+
)
1922
import asyncio
23+
import json
2024
import os
2125
from shutil import which
2226
import socket
2327
import sys
24-
from typing import Any, Optional, Union, Dict
28+
from typing import (
29+
TYPE_CHECKING,
30+
Any,
31+
Dict,
32+
Optional,
33+
Union,
34+
)
2535

2636
import zmq
2737
import zmq.asyncio
2838

29-
from cylc.flow import LOG
39+
from cylc.flow import (
40+
LOG,
41+
__version__ as CYLC_VERSION,
42+
)
3043
from cylc.flow.exceptions import (
3144
ClientError,
3245
ClientTimeout,
@@ -36,16 +49,17 @@
3649
)
3750
from cylc.flow.hostuserutil import get_fqdn_by_host
3851
from cylc.flow.network import (
39-
encode_,
40-
decode_,
52+
ZMQSocketBase,
4153
get_location,
42-
ZMQSocketBase
54+
load_server_response,
4355
)
4456
from cylc.flow.network.client_factory import CommsMeth
4557
from cylc.flow.network.server import PB_METHOD_MAP
46-
from cylc.flow.workflow_files import (
47-
detect_old_contact_file,
48-
)
58+
from cylc.flow.workflow_files import detect_old_contact_file
59+
60+
61+
if TYPE_CHECKING:
62+
from cylc.flow.network import ResponseDict
4963

5064

5165
class WorkflowRuntimeClientBase(metaclass=ABCMeta):
@@ -270,7 +284,7 @@ async def async_request(
270284
args: Optional[Dict[str, Any]] = None,
271285
timeout: Optional[float] = None,
272286
req_meta: Optional[Dict[str, Any]] = None
273-
) -> object:
287+
) -> Union[bytes, object]:
274288
"""Send an asynchronous request using asyncio.
275289
276290
Has the same arguments and return values as ``serial_request``.
@@ -292,12 +306,12 @@ async def async_request(
292306
if req_meta:
293307
msg['meta'].update(req_meta)
294308
LOG.debug('zmq:send %s', msg)
295-
message = encode_(msg)
309+
message = json.dumps(msg)
296310
self.socket.send_string(message)
297311

298312
# receive response
299313
if self.poller.poll(timeout):
300-
res = await self.socket.recv()
314+
res: bytes = await self.socket.recv()
301315
else:
302316
self.timeout_handler()
303317
raise ClientTimeout(
@@ -307,26 +321,28 @@ async def async_request(
307321
' --comms-timeout option;'
308322
'\n* or check the workflow log.'
309323
)
324+
LOG.debug('zmq:recv %s', res)
310325

311-
if msg['command'] in PB_METHOD_MAP:
312-
response = {'data': res}
313-
else:
314-
response = decode_(
315-
res.decode() if isinstance(res, bytes) else res
316-
)
317-
LOG.debug('zmq:recv %s', response)
326+
if command in PB_METHOD_MAP:
327+
return res
328+
329+
response: ResponseDict = load_server_response(res.decode())
318330

319331
try:
320332
return response['data']
321333
except KeyError:
322-
error = response.get(
323-
'error',
324-
{'message': f'Received invalid response: {response}'},
325-
)
326-
raise ClientError(
327-
error.get('message'), # type: ignore
328-
error.get('traceback'), # type: ignore
329-
) from None
334+
error = response.get('error')
335+
if not error:
336+
error = (
337+
f"Received invalid response for Cylc {CYLC_VERSION}: "
338+
f"{response}"
339+
)
340+
wflow_cylc_ver = response.get('cylc_version')
341+
if wflow_cylc_ver:
342+
error += (
343+
f"\n(Workflow is running in Cylc {wflow_cylc_ver})"
344+
)
345+
raise ClientError(str(error)) from None
330346

331347
def get_header(self) -> dict:
332348
"""Return "header" data to attach to each request for traceability.

cylc/flow/network/multi.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,23 @@
1616

1717
import asyncio
1818
import sys
19-
from typing import Callable, Dict, List, Tuple, Optional, Union, Type
19+
from typing import (
20+
Callable,
21+
Dict,
22+
List,
23+
Optional,
24+
Tuple,
25+
Type,
26+
Union,
27+
)
2028

2129
from ansimarkup import ansiprint
2230

2331
from cylc.flow.async_util import unordered_map
24-
from cylc.flow.exceptions import CylcError, WorkflowStopped
32+
from cylc.flow.exceptions import (
33+
CylcError,
34+
WorkflowStopped,
35+
)
2536
import cylc.flow.flags
2637
from cylc.flow.id_cli import parse_ids_async
2738
from cylc.flow.terminal import DIM
@@ -220,21 +231,28 @@ def _process_response(
220231

221232

222233
def _report(
223-
response: dict,
234+
response: Union[dict, list],
224235
) -> Tuple[Optional[str], Optional[str], bool]:
225236
"""Report the result of a GraphQL operation.
226237
227238
This analyses GraphQL mutation responses to determine the outcome.
228239
229240
Args:
230-
response: The GraphQL response.
241+
response: The workflow server response (NOT necessarily conforming to
242+
GraphQL execution result spec).
231243
232244
Returns:
233245
(stdout, stderr, outcome)
234246
235247
"""
236248
try:
237249
ret: List[Tuple[Optional[str], Optional[str], bool]] = []
250+
if not isinstance(response, dict):
251+
if isinstance(response, list) and response[0].get('error'):
252+
# If operating on workflow running in older Cylc version,
253+
# may get a error response like [{'error': '...'}]
254+
raise Exception(response)
255+
raise Exception(f"Unexpected response: {response}")
238256
for mutation_response in response.values():
239257
# extract the result of each mutation result in the response
240258
success, msg = mutation_response['result'][0]['response']
@@ -268,7 +286,7 @@ def _report(
268286
# response returned is not in the expected format - this shouldn't
269287
# happen but we need to protect against it
270288
err_msg = ''
271-
if cylc.flow.flags.verbosity > 1: # debug mode
289+
if cylc.flow.flags.verbosity > 0: # verbose mode
272290
# print the full result to stderr
273291
err_msg += f'\n <{DIM}>response={response}</{DIM}>'
274292
return (

cylc/flow/network/replier.py

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,27 @@
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
"""Server for workflow runtime API."""
1717

18+
import json
1819
from queue import Queue
19-
from typing import TYPE_CHECKING, Optional
20+
from typing import (
21+
TYPE_CHECKING,
22+
Optional,
23+
)
2024

2125
import zmq
2226

23-
from cylc.flow import LOG
24-
from cylc.flow.network import encode_, decode_, ZMQSocketBase
27+
from cylc.flow import (
28+
LOG,
29+
__version__ as CYLC_VERSION,
30+
)
31+
from cylc.flow.network import (
32+
ZMQSocketBase,
33+
load_server_response,
34+
)
35+
2536

2637
if TYPE_CHECKING:
38+
from cylc.flow.network import ResponseDict
2739
from cylc.flow.network.server import WorkflowRuntimeServer
2840

2941

@@ -69,7 +81,7 @@ def _bespoke_stop(self) -> None:
6981
LOG.debug('stopping zmq replier...')
7082
self.queue.put('STOP')
7183

72-
def listener(self):
84+
def listener(self) -> None:
7385
"""The server main loop, listen for and serve requests.
7486
7587
When called, this method will receive and respond until there are no
@@ -90,7 +102,9 @@ def listener(self):
90102

91103
try:
92104
# Check for messages
93-
msg = self.socket.recv_string(zmq.NOBLOCK)
105+
msg = self.socket.recv_string( # type: ignore[union-attr]
106+
zmq.NOBLOCK
107+
)
94108
except zmq.error.Again:
95109
# No messages, break to parent loop/caller.
96110
break
@@ -99,27 +113,27 @@ def listener(self):
99113
continue
100114
# attempt to decode the message, authenticating the user in the
101115
# process
116+
res: ResponseDict
117+
response: bytes
102118
try:
103-
message = decode_(msg)
119+
message = load_server_response(msg)
104120
except Exception as exc: # purposefully catch generic exception
105121
# failed to decode message, possibly resulting from failed
106122
# authentication
107-
LOG.exception('failed to decode message: "%s"', exc)
108-
import traceback
109-
response = encode_(
110-
{
111-
'error': {
112-
'message': 'failed to decode message: "%s"' % msg,
113-
'traceback': traceback.format_exc(),
114-
}
115-
}
116-
).encode()
123+
LOG.exception(exc)
124+
LOG.error(f'failed to decode message: "{msg}"')
125+
res = {
126+
'error': {'message': str(exc)},
127+
'cylc_version': CYLC_VERSION,
128+
}
129+
response = json.dumps(res).encode()
117130
else:
118131
# success case - serve the request
119132
res = self.server.receiver(message)
133+
data = res.get('data')
120134
# send back the string to bytes response
121-
if isinstance(res.get('data'), bytes):
122-
response = res['data']
135+
if isinstance(data, bytes):
136+
response = data
123137
else:
124-
response = encode_(res).encode()
125-
self.socket.send(response)
138+
response = json.dumps(res).encode()
139+
self.socket.send(response) # type: ignore[union-attr]

0 commit comments

Comments
 (0)