Skip to content

Commit 43cab17

Browse files
committed
Abstract out the network transport format again
1 parent 8529d37 commit 43cab17

File tree

4 files changed

+24
-16
lines changed

4 files changed

+24
-16
lines changed

cylc/flow/network/__init__.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,19 @@ class ResponseDict(TypedDict, total=False):
9595
back-compat issues."""
9696

9797

98-
def load_server_response(message: str) -> 'ResponseDict':
98+
def serialize(data: object) -> str:
99+
"""Convert the structure holding a message to a JSON message string."""
100+
# Abstract out the transport format in order to allow it to be changed
101+
# in future.
102+
return json.dumps(data)
103+
104+
105+
def deserialize(message: str) -> 'ResponseDict':
99106
"""Convert a JSON message string to dict with an added 'user' field."""
107+
# Abstract out the transport format in order to allow it to be changed
108+
# in future.
100109
msg = json.loads(message)
101-
if 'user' not in msg:
102-
msg['user'] = getpass.getuser() # assume this is the user
110+
msg['user'] = getpass.getuser() # assume this is the user
103111
return msg
104112

105113

cylc/flow/network/client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
abstractmethod,
2121
)
2222
import asyncio
23-
import json
2423
import os
2524
from shutil import which
2625
import socket
@@ -51,7 +50,8 @@
5150
from cylc.flow.network import (
5251
ZMQSocketBase,
5352
get_location,
54-
load_server_response,
53+
deserialize,
54+
serialize,
5555
)
5656
from cylc.flow.network.client_factory import CommsMeth
5757
from cylc.flow.network.server import PB_METHOD_MAP
@@ -284,7 +284,7 @@ async def async_request(
284284
args: Optional[Dict[str, Any]] = None,
285285
timeout: Optional[float] = None,
286286
req_meta: Optional[Dict[str, Any]] = None
287-
) -> Union[bytes, object]:
287+
) -> object:
288288
"""Send an asynchronous request using asyncio.
289289
290290
Has the same arguments and return values as ``serial_request``.
@@ -306,7 +306,7 @@ async def async_request(
306306
if req_meta:
307307
msg['meta'].update(req_meta)
308308
LOG.debug('zmq:send %s', msg)
309-
message = json.dumps(msg)
309+
message = serialize(msg)
310310
self.socket.send_string(message)
311311

312312
# receive response
@@ -326,7 +326,7 @@ async def async_request(
326326
if command in PB_METHOD_MAP:
327327
return res
328328

329-
response: ResponseDict = load_server_response(res.decode())
329+
response: ResponseDict = deserialize(res.decode())
330330

331331
try:
332332
return response['data']
@@ -338,7 +338,7 @@ async def async_request(
338338
f"{response}"
339339
)
340340
wflow_cylc_ver = response.get('cylc_version')
341-
if wflow_cylc_ver:
341+
if wflow_cylc_ver and wflow_cylc_ver != CYLC_VERSION:
342342
error += (
343343
f"\n(Workflow is running in Cylc {wflow_cylc_ver})"
344344
)

cylc/flow/network/replier.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
"""Server for workflow runtime API."""
1717

18-
import json
1918
from queue import Queue
2019
from typing import (
2120
TYPE_CHECKING,
@@ -30,7 +29,8 @@
3029
)
3130
from cylc.flow.network import (
3231
ZMQSocketBase,
33-
load_server_response,
32+
deserialize,
33+
serialize,
3434
)
3535

3636

@@ -116,7 +116,7 @@ def listener(self) -> None:
116116
res: ResponseDict
117117
response: bytes
118118
try:
119-
message = load_server_response(msg)
119+
message = deserialize(msg)
120120
except Exception as exc: # purposefully catch generic exception
121121
# failed to decode message, possibly resulting from failed
122122
# authentication
@@ -126,7 +126,7 @@ def listener(self) -> None:
126126
'error': {'message': str(exc)},
127127
'cylc_version': CYLC_VERSION,
128128
}
129-
response = json.dumps(res).encode()
129+
response = serialize(res).encode()
130130
else:
131131
# success case - serve the request
132132
res = self.server.receiver(message)
@@ -135,5 +135,5 @@ def listener(self) -> None:
135135
if isinstance(data, bytes):
136136
response = data
137137
else:
138-
response = json.dumps(res).encode()
138+
response = serialize(res).encode()
139139
self.socket.send(response) # type: ignore[union-attr]

tests/integration/network/test_replier.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import pytest
2222

2323
from cylc.flow import __version__ as CYLC_VERSION
24-
from cylc.flow.network import load_server_response
24+
from cylc.flow.network import deserialize
2525
from cylc.flow.network.client import WorkflowRuntimeClient
2626
from cylc.flow.scheduler import Scheduler
2727

@@ -33,7 +33,7 @@ async def test_listener(one: Scheduler, start):
3333
# (without directly calling listener):
3434
client = WorkflowRuntimeClient(one.workflow)
3535
client.socket.send_string(r'Not JSON')
36-
res = load_server_response(
36+
res = deserialize(
3737
(await client.socket.recv()).decode()
3838
)
3939
assert res['error']

0 commit comments

Comments
 (0)