Skip to content

Commit 5dd62b7

Browse files
committed
Improve server-client communication error handling
1 parent b942907 commit 5dd62b7

File tree

8 files changed

+315
-147
lines changed

8 files changed

+315
-147
lines changed

cylc/flow/network/__init__.py

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,13 @@
1818
import asyncio
1919
import getpass
2020
import json
21-
from typing import Optional, Tuple
21+
from typing import (
22+
Optional,
23+
Tuple,
24+
Union,
25+
)
2226

27+
from typing_extensions import TypedDict
2328
import zmq
2429
import zmq.asyncio
2530
import zmq.auth
@@ -30,34 +35,64 @@
3035
CylcError,
3136
CylcVersionError,
3237
ServiceFileError,
33-
WorkflowStopped
38+
WorkflowStopped,
3439
)
3540
from cylc.flow.hostuserutil import get_fqdn_by_host
3641
from cylc.flow.workflow_files import (
3742
ContactFileFields,
38-
KeyType,
39-
KeyOwner,
4043
KeyInfo,
44+
KeyOwner,
45+
KeyType,
46+
get_workflow_srv_dir,
4147
load_contact_file,
42-
get_workflow_srv_dir
4348
)
4449

50+
4551
API = 5 # cylc API version
4652
MSG_TIMEOUT = "TIMEOUT"
4753

4854

49-
def encode_(message):
50-
"""Convert the structure holding a message field from JSON to a string."""
51-
try:
52-
return json.dumps(message)
53-
except TypeError as exc:
54-
return json.dumps({'errors': [{'message': str(exc)}]})
55+
class ResponseDict(TypedDict, total=False):
56+
"""Structure of server response messages.
57+
58+
Confusingly, has similar format to GraphQL execution result.
59+
But if we change this now we could break compatibility for
60+
issuing commands to/receiving responses from workflows running in
61+
different versions of Cylc 8.
62+
"""
63+
data: object
64+
"""For most Cylc commands that issue GQL mutations, the data field will
65+
look like:
66+
data: {
67+
<mutationName1>: {
68+
result: [
69+
{
70+
id: <workflow/task ID>,
71+
response: [<success_bool>, <message>]
72+
},
73+
...
74+
]
75+
}
76+
}
77+
but this is not 100% consistent unfortunately
78+
"""
79+
error: Union[Exception, str, dict]
80+
"""If an error occurred that could not be handled.
81+
(usually a dict {message: str, traceback?: str}).
82+
"""
83+
user: str
84+
cylc_version: str
85+
"""Server (i.e. running workflow) Cylc version.
86+
87+
Going forward, we include this so we can more easily handle any future
88+
back-compat issues."""
5589

5690

57-
def decode_(message):
58-
"""Convert an encoded message string to JSON with an added 'user' field."""
91+
def load_server_response(message: str) -> ResponseDict:
92+
"""Convert a JSON message string to dict with an added 'user' field."""
5993
msg = json.loads(message)
60-
msg['user'] = getpass.getuser() # assume this is the user
94+
if 'user' not in msg:
95+
msg['user'] = getpass.getuser() # assume this is the user
6196
return msg
6297

6398

cylc/flow/network/client.py

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,30 @@
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
"""Client for workflow runtime API."""
1717

18-
from abc import ABCMeta, abstractmethod
18+
from abc import (
19+
ABCMeta,
20+
abstractmethod,
21+
)
1922
import asyncio
23+
import json
2024
import os
2125
from shutil import which
2226
import socket
2327
import sys
24-
from typing import Any, Optional, Union, Dict
28+
from typing import (
29+
Any,
30+
Dict,
31+
Optional,
32+
Union,
33+
)
2534

2635
import zmq
2736
import zmq.asyncio
2837

29-
from cylc.flow import LOG
38+
from cylc.flow import (
39+
LOG,
40+
__version__ as CYLC_VERSION,
41+
)
3042
from cylc.flow.exceptions import (
3143
ClientError,
3244
ClientTimeout,
@@ -36,16 +48,14 @@
3648
)
3749
from cylc.flow.hostuserutil import get_fqdn_by_host
3850
from cylc.flow.network import (
39-
encode_,
40-
decode_,
51+
ResponseDict,
52+
ZMQSocketBase,
53+
load_server_response,
4154
get_location,
42-
ZMQSocketBase
4355
)
4456
from cylc.flow.network.client_factory import CommsMeth
4557
from cylc.flow.network.server import PB_METHOD_MAP
46-
from cylc.flow.workflow_files import (
47-
detect_old_contact_file,
48-
)
58+
from cylc.flow.workflow_files import detect_old_contact_file
4959

5060

5161
class WorkflowRuntimeClientBase(metaclass=ABCMeta):
@@ -270,7 +280,7 @@ async def async_request(
270280
args: Optional[Dict[str, Any]] = None,
271281
timeout: Optional[float] = None,
272282
req_meta: Optional[Dict[str, Any]] = None
273-
) -> object:
283+
) -> Union[bytes, object]:
274284
"""Send an asynchronous request using asyncio.
275285
276286
Has the same arguments and return values as ``serial_request``.
@@ -292,12 +302,12 @@ async def async_request(
292302
if req_meta:
293303
msg['meta'].update(req_meta)
294304
LOG.debug('zmq:send %s', msg)
295-
message = encode_(msg)
305+
message = json.dumps(msg)
296306
self.socket.send_string(message)
297307

298308
# receive response
299309
if self.poller.poll(timeout):
300-
res = await self.socket.recv()
310+
res: bytes = await self.socket.recv()
301311
else:
302312
self.timeout_handler()
303313
raise ClientTimeout(
@@ -307,26 +317,28 @@ async def async_request(
307317
' --comms-timeout option;'
308318
'\n* or check the workflow log.'
309319
)
320+
LOG.debug('zmq:recv %s', res)
310321

311-
if msg['command'] in PB_METHOD_MAP:
312-
response = {'data': res}
313-
else:
314-
response = decode_(
315-
res.decode() if isinstance(res, bytes) else res
316-
)
317-
LOG.debug('zmq:recv %s', response)
322+
if command in PB_METHOD_MAP:
323+
return res
324+
325+
response: ResponseDict = load_server_response(res.decode())
318326

319327
try:
320328
return response['data']
321329
except KeyError:
322-
error = response.get(
323-
'error',
324-
{'message': f'Received invalid response: {response}'},
325-
)
326-
raise ClientError(
327-
error.get('message'), # type: ignore
328-
error.get('traceback'), # type: ignore
329-
) from None
330+
error = response.get('error')
331+
if not error:
332+
error = (
333+
f"Received invalid response for Cylc {CYLC_VERSION}: "
334+
f"{response}"
335+
)
336+
wflow_cylc_ver = response.get('cylc_version')
337+
if wflow_cylc_ver:
338+
error += (
339+
f"\n(Workflow is running in Cylc {wflow_cylc_ver})"
340+
)
341+
raise ClientError(str(error)) from None
330342

331343
def get_header(self) -> dict:
332344
"""Return "header" data to attach to each request for traceability.

cylc/flow/network/multi.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,23 @@
1616

1717
import asyncio
1818
import sys
19-
from typing import Callable, Dict, List, Tuple, Optional, Union, Type
19+
from typing import (
20+
Callable,
21+
Dict,
22+
List,
23+
Optional,
24+
Tuple,
25+
Type,
26+
Union,
27+
)
2028

2129
from ansimarkup import ansiprint
2230

2331
from cylc.flow.async_util import unordered_map
24-
from cylc.flow.exceptions import CylcError, WorkflowStopped
32+
from cylc.flow.exceptions import (
33+
CylcError,
34+
WorkflowStopped,
35+
)
2536
import cylc.flow.flags
2637
from cylc.flow.id_cli import parse_ids_async
2738
from cylc.flow.terminal import DIM
@@ -220,21 +231,28 @@ def _process_response(
220231

221232

222233
def _report(
223-
response: dict,
234+
response: Union[dict, list],
224235
) -> Tuple[Optional[str], Optional[str], bool]:
225236
"""Report the result of a GraphQL operation.
226237
227238
This analyses GraphQL mutation responses to determine the outcome.
228239
229240
Args:
230-
response: The GraphQL response.
241+
response: The workflow server response (NOT necessarily conforming to
242+
GraphQL execution result spec).
231243
232244
Returns:
233245
(stdout, stderr, outcome)
234246
235247
"""
236248
try:
237249
ret: List[Tuple[Optional[str], Optional[str], bool]] = []
250+
if not isinstance(response, dict):
251+
if isinstance(response, list) and response[0].get('error'):
252+
# If operating on workflow running in older Cylc version,
253+
# may get a error response like [{'error': '...'}]
254+
raise Exception(response)
255+
raise Exception(f"Unexpected response: {response}")
238256
for mutation_response in response.values():
239257
# extract the result of each mutation result in the response
240258
success, msg = mutation_response['result'][0]['response']
@@ -268,7 +286,7 @@ def _report(
268286
# response returned is not in the expected format - this shouldn't
269287
# happen but we need to protect against it
270288
err_msg = ''
271-
if cylc.flow.flags.verbosity > 1: # debug mode
289+
if cylc.flow.flags.verbosity > 0: # verbose mode
272290
# print the full result to stderr
273291
err_msg += f'\n <{DIM}>response={response}</{DIM}>'
274292
return (

cylc/flow/network/replier.py

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,25 @@
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
"""Server for workflow runtime API."""
1717

18+
import json
1819
from queue import Queue
19-
from typing import TYPE_CHECKING, Optional
20+
from typing import (
21+
TYPE_CHECKING,
22+
Optional,
23+
)
2024

2125
import zmq
2226

23-
from cylc.flow import LOG
24-
from cylc.flow.network import encode_, decode_, ZMQSocketBase
27+
from cylc.flow import (
28+
LOG,
29+
__version__ as CYLC_VERSION,
30+
)
31+
from cylc.flow.network import (
32+
ResponseDict,
33+
ZMQSocketBase,
34+
load_server_response,
35+
)
36+
2537

2638
if TYPE_CHECKING:
2739
from cylc.flow.network.server import WorkflowRuntimeServer
@@ -69,7 +81,7 @@ def _bespoke_stop(self) -> None:
6981
LOG.debug('stopping zmq replier...')
7082
self.queue.put('STOP')
7183

72-
def listener(self):
84+
def listener(self) -> None:
7385
"""The server main loop, listen for and serve requests.
7486
7587
When called, this method will receive and respond until there are no
@@ -90,7 +102,9 @@ def listener(self):
90102

91103
try:
92104
# Check for messages
93-
msg = self.socket.recv_string(zmq.NOBLOCK)
105+
msg = self.socket.recv_string( # type: ignore[union-attr]
106+
zmq.NOBLOCK
107+
)
94108
except zmq.error.Again:
95109
# No messages, break to parent loop/caller.
96110
break
@@ -99,27 +113,27 @@ def listener(self):
99113
continue
100114
# attempt to decode the message, authenticating the user in the
101115
# process
116+
res: ResponseDict
117+
response: bytes
102118
try:
103-
message = decode_(msg)
119+
message = load_server_response(msg)
104120
except Exception as exc: # purposefully catch generic exception
105121
# failed to decode message, possibly resulting from failed
106122
# authentication
107-
LOG.exception('failed to decode message: "%s"', exc)
108-
import traceback
109-
response = encode_(
110-
{
111-
'error': {
112-
'message': 'failed to decode message: "%s"' % msg,
113-
'traceback': traceback.format_exc(),
114-
}
115-
}
116-
).encode()
123+
LOG.exception(exc)
124+
LOG.error(f'failed to decode message: "{msg}"')
125+
res = {
126+
'error': {'message': str(exc)},
127+
'cylc_version': CYLC_VERSION,
128+
}
129+
response = json.dumps(res).encode()
117130
else:
118131
# success case - serve the request
119132
res = self.server.receiver(message)
133+
data = res.get('data')
120134
# send back the string to bytes response
121-
if isinstance(res.get('data'), bytes):
122-
response = res['data']
135+
if isinstance(data, bytes):
136+
response = data
123137
else:
124-
response = encode_(res).encode()
125-
self.socket.send(response)
138+
response = json.dumps(res).encode()
139+
self.socket.send(response) # type: ignore[union-attr]

0 commit comments

Comments
 (0)