Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,31 @@ $ towncrier create <PR-number>.<break|feat|fix>.md --content "Short description"

<!-- towncrier release notes start -->

## __cylc-8.4.1 (Released 2025-02-25)__

### 🔧 Fixes

[#6480](https://github.com/cylc/cylc-flow/pull/6480) - `cat-log`: List log files which are available via a configured tailer/viewer command.

[#6506](https://github.com/cylc/cylc-flow/pull/6506) - Work around caching behaviour observed on NFS filesystems which could cause workflows to appear to be stopped or even to not exist, when they are running.

[#6518](https://github.com/cylc/cylc-flow/pull/6518) - Allow setting empty values in `flow.cylc[scheduler][events]` to override the global configuration.

[#6535](https://github.com/cylc/cylc-flow/pull/6535) - Ensure tasks can be killed while in the preparing state.

[#6551](https://github.com/cylc/cylc-flow/pull/6551) - Fix bug in `cylc lint` S014 where it warned about use of legitimate `-W` directive for PBS.

[#6571](https://github.com/cylc/cylc-flow/pull/6571) - Disabled PEP-515-style integer coercion of task parameters containing underscores (e.g. `084_132` was becoming `84132`). This fix returns older behaviour seen in Cylc 7.

[#6577](https://github.com/cylc/cylc-flow/pull/6577) - Fixed a bug where if you prematurely deleted the job log directory, it would leave tasks permanently in the submitted or running states.

[#6578](https://github.com/cylc/cylc-flow/pull/6578) - Improved handling of any internal errors when executing commands against a running workflow.

[#6586](https://github.com/cylc/cylc-flow/pull/6586) - Update PBS job runner to reflect error message change. This change
continues to support older PBS versions.

[#6616](https://github.com/cylc/cylc-flow/pull/6616) - Fixed wrapper script `PATH` override preventing selection of Cylc version in the GUI when running Cylc Hub.

## __cylc-8.4.0 (Released 2025-01-08)__

### ⚠ Breaking Changes
Expand Down
1 change: 0 additions & 1 deletion changes.d/6480.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6506.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6518.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6535.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6551.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6571.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6577.fix.md

This file was deleted.

2 changes: 0 additions & 2 deletions changes.d/6586.fix.md

This file was deleted.

1 change: 0 additions & 1 deletion changes.d/6616.fix.md

This file was deleted.

22 changes: 21 additions & 1 deletion cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@

from textwrap import wrap
from typing import (
TYPE_CHECKING,
Dict,
Optional,
Sequence,
Set,
Union,
TYPE_CHECKING,
)

from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.util import format_cmd


if TYPE_CHECKING:
from cylc.flow.subprocctx import SubFuncContext

Expand Down Expand Up @@ -285,6 +287,24 @@ def __str__(self) -> str:
return ret


class RequestError(ClientError):
"""Represents an error handling a request, returned by the server."""

def __init__(
self, message: str, workflow_cylc_version: Optional[str] = None
):
ClientError.__init__(
self,
message,
traceback=(
f"(Workflow is running in Cylc {workflow_cylc_version})"
if workflow_cylc_version
and workflow_cylc_version != CYLC_VERSION
else None
),
)


class WorkflowStopped(ClientError):
"""The Cylc scheduler you attempted to connect to is stopped."""

Expand Down
76 changes: 63 additions & 13 deletions cylc/flow/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
import asyncio
import getpass
import json
from typing import Optional, Tuple
from typing import (
TYPE_CHECKING,
Optional,
Tuple,
Union,
)

import zmq
import zmq.asyncio
Expand All @@ -30,32 +35,77 @@
CylcError,
CylcVersionError,
ServiceFileError,
WorkflowStopped
WorkflowStopped,
)
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.workflow_files import (
ContactFileFields,
KeyType,
KeyOwner,
KeyInfo,
KeyOwner,
KeyType,
get_workflow_srv_dir,
load_contact_file,
get_workflow_srv_dir
)


if TYPE_CHECKING:
# BACK COMPAT: typing_extensions.TypedDict
# FROM: Python 3.7
# TO: Python 3.11
from typing_extensions import TypedDict


API = 5 # cylc API version
MSG_TIMEOUT = "TIMEOUT"

if TYPE_CHECKING:
class ResponseDict(TypedDict, total=False):
"""Structure of server response messages.

def encode_(message):
"""Convert the structure holding a message field from JSON to a string."""
try:
return json.dumps(message)
except TypeError as exc:
return json.dumps({'errors': [{'message': str(exc)}]})
Confusingly, has similar format to GraphQL execution result.
But if we change this now we could break compatibility for
issuing commands to/receiving responses from workflows running in
different versions of Cylc 8.
"""
data: object
"""For most Cylc commands that issue GQL mutations, the data field will
look like:
data: {
<mutationName1>: {
result: [
{
id: <workflow/task ID>,
response: [<success_bool>, <message>]
},
...
]
}
}
but this is not 100% consistent unfortunately
"""
error: Union[Exception, str, dict]
"""If an error occurred that could not be handled.
(usually a dict {message: str, traceback?: str}).
"""
user: str
cylc_version: str
"""Server (i.e. running workflow) Cylc version.

Going forward, we include this so we can more easily handle any future
back-compat issues."""


def serialize(data: object) -> str:
"""Convert the structure holding a message to a JSON message string."""
# Abstract out the transport format in order to allow it to be changed
# in future.
return json.dumps(data)


def decode_(message):
"""Convert an encoded message string to JSON with an added 'user' field."""
def deserialize(message: str) -> 'ResponseDict':
"""Convert a JSON message string to dict with an added 'user' field."""
# Abstract out the transport format in order to allow it to be changed
# in future.
msg = json.loads(message)
msg['user'] = getpass.getuser() # assume this is the user
return msg
Expand Down
67 changes: 41 additions & 26 deletions cylc/flow/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,51 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Client for workflow runtime API."""

from abc import ABCMeta, abstractmethod
from abc import (
ABCMeta,
abstractmethod,
)
import asyncio
import os
from shutil import which
import socket
import sys
from typing import Any, Optional, Union, Dict
from typing import (
TYPE_CHECKING,
Any,
Dict,
Optional,
Union,
)

import zmq
import zmq.asyncio

from cylc.flow import LOG
from cylc.flow import (
LOG,
__version__ as CYLC_VERSION,
)
from cylc.flow.exceptions import (
ClientError,
ClientTimeout,
ContactFileExists,
CylcError,
RequestError,
WorkflowStopped,
)
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.network import (
encode_,
decode_,
ZMQSocketBase,
deserialize,
get_location,
ZMQSocketBase
serialize,
)
from cylc.flow.network.client_factory import CommsMeth
from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.workflow_files import (
detect_old_contact_file,
)
from cylc.flow.workflow_files import detect_old_contact_file


if TYPE_CHECKING:
from cylc.flow.network import ResponseDict


class WorkflowRuntimeClientBase(metaclass=ABCMeta):
Expand Down Expand Up @@ -292,12 +306,12 @@
if req_meta:
msg['meta'].update(req_meta)
LOG.debug('zmq:send %s', msg)
message = encode_(msg)
message = serialize(msg)
self.socket.send_string(message)

# receive response
if self.poller.poll(timeout):
res = await self.socket.recv()
res: bytes = await self.socket.recv()
else:
self.timeout_handler()
raise ClientTimeout(
Expand All @@ -307,25 +321,26 @@
' --comms-timeout option;'
'\n* or check the workflow log.'
)
LOG.debug('zmq:recv %s', res)

if msg['command'] in PB_METHOD_MAP:
response = {'data': res}
else:
response = decode_(
res.decode() if isinstance(res, bytes) else res
)
LOG.debug('zmq:recv %s', response)
if command in PB_METHOD_MAP:
return res

response: ResponseDict = deserialize(res.decode())

try:
return response['data']
except KeyError:
error = response.get(
'error',
{'message': f'Received invalid response: {response}'},
)
raise ClientError(
error.get('message'), # type: ignore
error.get('traceback'), # type: ignore
error = response.get('error')
if isinstance(error, dict):
error = error.get('message', error)

Check warning on line 336 in cylc/flow/network/client.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/client.py#L336

Added line #L336 was not covered by tests
if not error:
error = (
f"Received invalid response for Cylc {CYLC_VERSION}: "
f"{response}"
)
raise RequestError(
str(error), response.get('cylc_version')
) from None

def get_header(self) -> dict:
Expand Down
Loading
Loading