Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
125ed0c
Initial profiler implementation (non working)
ChrisPaulBennett Jan 7, 2025
60ffd73
CPU/Memory Logging working
ChrisPaulBennett Feb 24, 2025
36de59f
Time Series now working
MetRonnie Mar 12, 2025
12c347d
Code review changes
ChrisPaulBennett Apr 7, 2025
e793c00
Adding test coverage
ChrisPaulBennett Nov 19, 2025
b4a32cd
Added custom exception for cylc profiler
ChrisPaulBennett Nov 20, 2025
d310c12
Linting
ChrisPaulBennett Dec 4, 2025
749f456
Removing spurious version control text
ChrisPaulBennett Jan 13, 2026
c5b175c
profiler: change message format to JSON
oliver-sanders Feb 4, 2026
19ad0d0
Removing accidentally added files
ChrisPaulBennett Mar 2, 2026
ef9e599
change how the profiler sends its data
ChrisPaulBennett Mar 2, 2026
ea0b764
Updating unit tests
ChrisPaulBennett Mar 2, 2026
e880ab4
Updating unit tests
ChrisPaulBennett Mar 3, 2026
4303726
Updating unit tests
ChrisPaulBennett Mar 3, 2026
7fd5e4b
Updating unit tests
ChrisPaulBennett Mar 5, 2026
91f73bb
Adding mechanism to ensure profiler is killed
ChrisPaulBennett Mar 5, 2026
fc74395
Merge branch 'master' into cylc_profiler
ChrisPaulBennett Mar 5, 2026
7afd6a6
Linting
ChrisPaulBennett Mar 5, 2026
e9dd7ac
Merge remote-tracking branch 'ChrisPaulBennett/cylc_profiler' into cy…
ChrisPaulBennett Mar 5, 2026
6ec836a
Updating testing to cope with asyncio
ChrisPaulBennett Mar 6, 2026
9956de7
Update changes.d/6663.feat.md
ChrisPaulBennett Mar 9, 2026
5790c38
Update cylc/flow/cfgspec/globalcfg.py
ChrisPaulBennett Mar 9, 2026
5c0375f
Hopefully working asyncio
ChrisPaulBennett Mar 9, 2026
9ed2fc4
Linting
ChrisPaulBennett Mar 10, 2026
3520e8c
Removing redundant asyncio
ChrisPaulBennett Mar 17, 2026
4071d8e
Linting
ChrisPaulBennett Mar 17, 2026
cc9b5a8
Merge branch 'cylc:master' into cylc_profiler
ChrisPaulBennett Mar 25, 2026
6c3d108
Reverted back to constant polling of the memory statistic
ChrisPaulBennett Mar 31, 2026
e42cc58
Linting
ChrisPaulBennett Mar 31, 2026
c2a4099
Code review changes
ChrisPaulBennett Mar 31, 2026
a79a3ce
Fix unit tests
ChrisPaulBennett Apr 8, 2026
9e8f426
Type hinting
ChrisPaulBennett Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ __pycache__/
# vscode
.vscode

# pycharm
.idea

# processed workflow configs
*.rc.processed
*.cylc.processed
Expand Down
1 change: 1 addition & 0 deletions .mailmap
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,5 @@ github-actions[bot] <github-actions@noreply.github.com>
github-actions[bot] <github-actions@noreply.github.com> GitHub Action
Diquan Jabbour <165976689+Diquan-BOM@users.noreply.github.com>
Maxime Rio <maxime.rio@niwa.co.nz>
Christopher Bennett <christopher.bennett@metoffice.gov.uk> ChrisPaulBennett <christopher.bennett@metoffice.gov.uk>
Christopher Bennett <christopher.bennett@metoffice.gov.uk> christopher.bennett <christopher.bennett@metoffice.gov.uk>
1 change: 1 addition & 0 deletions changes.d/6663.feat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a Cylc job profiler which captures CPU and memory information from job runners which use cGroups. This information can be reviewed in the Analysis view in the GUI.
32 changes: 32 additions & 0 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,38 @@ def default_for(

.. versionadded:: 8.0.0
''')

with Conf('profile', desc='''
Configure the Cylc job profiler.

This tool can capture CPU and memory information from
job runners which use cGroups such as PBS and Slurm.

.. versionadded:: 8.7.0
'''):
Conf('activate', VDR.V_BOOLEAN, False, desc='''
Enable the Cylc profiler for this platform.
''')
Conf('cgroups path', VDR.V_STRING,
default='/sys/fs/cgroup',
desc='''
Configure the path to the cgroups filesystem.

The default value (``/sys/fs/cgroup``) is the standard
location for cgroups on linux and should work in
most circumstances
''')
Conf('polling interval', VDR.V_INTEGER,
default=10,
desc='''
Configure the profiler polling interval.

The interval (in seconds) at which the profiler will
poll the cgroups filesystem for resource usage data.
The default value of 10 seconds should be sufficient for
most use cases, but can be adjusted as needed.
''')
Comment on lines +1501 to +1510
Copy link
Copy Markdown
Member

@oliver-sanders oliver-sanders Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should really be an ISO8601 interval rather than an int (we switched the other configs from in in Cylc 6).

But will punt that to a follow-on issue - #7265


Conf('job runner', VDR.V_STRING, 'background', desc=f'''
The system used to run jobs on the platform.

Expand Down
21 changes: 21 additions & 0 deletions cylc/flow/etc/job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ cylc__job__main() {
mkdir -p "$(dirname "${CYLC_TASK_WORK_DIR}")" || true
mkdir -p "${CYLC_TASK_WORK_DIR}"
cd "${CYLC_TASK_WORK_DIR}"

if [[ "${CYLC_PROFILE}" == "True" ]] ; then
cylc profile -m "${CYLC_CGROUP}" -i "${CYLC_POLLING_INTERVAL}" &
export profiler_pid="$!"
fi

# Env-Script, User Environment, Pre-Script, Script and Post-Script
# Run user scripts in subshell to protect cylc job script from interference.
# Waiting on background process allows signal traps to trigger immediately.
Expand All @@ -157,11 +163,15 @@ cylc__job__main() {
cylc__set_return "$ret_code"
fi
}
# Grab the max rss and cpu_time and clean up before changing directory
cylc__kill_profiler
# Empty work directory remove
cd
rmdir "${CYLC_TASK_WORK_DIR}" 2>'/dev/null' || true
# Send task succeeded message

wait "${CYLC_TASK_MESSAGE_STARTED_PID}" 2>'/dev/null' || true

cylc message -- "${CYLC_WORKFLOW_ID}" "${CYLC_TASK_JOB}" 'succeeded' || true
# (Ignore shellcheck "globbing and word splitting" warning here).
# shellcheck disable=SC2086
Expand All @@ -187,6 +197,14 @@ cylc__set_return() {
return "${1:-0}"
}

###############################################################################
# Save the data using cylc message and exit the profiler
cylc__kill_profiler() {
if [[ -n "${profiler_pid:-}" ]] && ps -p "$profiler_pid" > /dev/null; then
kill -s SIGINT "${profiler_pid}" || true
fi
}

###############################################################################
# Disable selected or all (if no arguments given) fail traps.
# Globals:
Expand Down Expand Up @@ -268,6 +286,9 @@ cylc__job_finish_err() {
# (Ignore shellcheck "globbing and word splitting" warning here).
# shellcheck disable=SC2086
trap '' ${CYLC_VACATION_SIGNALS:-} ${CYLC_FAIL_SIGNALS}

cylc__kill_profiler

if [[ -n "${CYLC_TASK_MESSAGE_STARTED_PID:-}" ]]; then
wait "${CYLC_TASK_MESSAGE_STARTED_PID}" 2>'/dev/null' || true
fi
Expand Down
13 changes: 13 additions & 0 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,3 +529,16 @@ def __init__(self, message, expr=None):

def __str__(self):
return self.message


class CylcProfilerError(CylcError):
"""Exception for errors raised from the cylc profiler. These errors do not
affect workflows functionally, just stats gathering. We don't want to
panic users."""
def __init__(self, exc: Exception, error_msg: str) -> None:
CylcError.__init__(
self,
f"{exc}. {error_msg}. This error came from the Cylc profiler"
f" and is not a problem with your workflow. Statistics gathering "
f"for the analysis view may be incomplete."
)
12 changes: 10 additions & 2 deletions cylc/flow/job_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,16 @@ def _write_task_environment(self, handle, job_conf):
'\n export CYLC_TASK_TRY_NUMBER=%s' % job_conf['try_num'])
handle.write(
"\n export CYLC_TASK_FLOW_NUMBERS="
f"{','.join(str(f) for f in job_conf['flow_nums'])}"
)
f"{','.join(str(f) for f in job_conf['flow_nums'])}")
handle.write(
"\n export CYLC_PROFILE="
f"{job_conf['platform']['profile']['activate']}")
handle.write(
"\n export CYLC_CGROUP="
f"{job_conf['platform']['profile']['cgroups path']}")
handle.write(
"\n export CYLC_POLLING_INTERVAL="
f"{job_conf['platform']['profile']['polling interval']}")
# Standard parameter environment variables
for var, val in job_conf['param_var'].items():
handle.write('\n export CYLC_TASK_PARAM_%s="%s"' % (var, val))
Expand Down
51 changes: 39 additions & 12 deletions cylc/flow/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Run command on a remote, (i.e. a remote [user@]host)."""

import asyncio
import os
from pathlib import Path
from posix import WIFSIGNALED
Expand All @@ -32,8 +33,8 @@
Popen,
)
import sys
from time import sleep
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Expand All @@ -56,6 +57,9 @@
)
from cylc.flow.util import format_cmd

if TYPE_CHECKING:
import psutil


def get_proc_ancestors():
"""Return list of parent PIDs back to init."""
Expand All @@ -76,21 +80,44 @@ def get_proc_ancestors():
pid = ppid


def watch_and_kill(proc):
"""Kill proc if my PPID (etc.) changed - e.g. ssh connection dropped."""
async def watch_and_kill(
proc: 'psutil.Process',
interval: float | None = None,
):
"""Watch a process and kill it if any of its parent processes change.

Processes exist in a tree which inherits from the process with PID 1.

If a parent process dies, the child will be re-assigned a new parent.
This can happen:
* If the parent is killed but the signal is not propagated to its children
(or is caught and swallowed).
* If an SSH connection drops.

This coroutine monitors the parents of a process and will kill the child
if any of its parents change.

Args:
proc:
The process to monitor and kill if needed.
interval:
The polling interval to check the parent process tree in seconds.
If not provided, this will default to the environment variable
"CYLC_PROC_POLL_INTERVAL" if set, else 60.

"""
gpa = get_proc_ancestors()
interval = 1 # secs
count = 0
# Allow customising the interval to allow tests to run faster:
interval = interval or float(os.getenv('CYLC_PROC_POLL_INTERVAL', 60))
while True:
count += 1
if proc.poll() is not None:
await asyncio.sleep(interval)
if proc.is_running() is False:
break
if round(count * interval) % 60 and get_proc_ancestors() != gpa:
# (Only run ps command once a minute)
sleep(1)
new_gpa = get_proc_ancestors()
if new_gpa != gpa:
await asyncio.sleep(1)
os.kill(proc.pid, signal.SIGTERM)
break
sleep(interval)
return True


def run_cmd(
Expand Down
28 changes: 17 additions & 11 deletions cylc/flow/scripts/cat_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
$ cylc cat-log foo//2020/bar -m f
"""

import asyncio
import os
from contextlib import suppress
from glob import glob
Expand All @@ -70,10 +71,12 @@
import sys
from typing import TYPE_CHECKING

from psutil import Process

from cylc.flow.exceptions import InputError
import cylc.flow.flags
from cylc.flow.hostuserutil import is_remote_platform
from cylc.flow.id_cli import parse_id
from cylc.flow.id_cli import parse_id_async
from cylc.flow.log_level import verbosity_to_opts
from cylc.flow.option_parsers import (
ID_MULTI_ARG_DOC,
Expand Down Expand Up @@ -244,7 +247,7 @@ def _check_fs_path(path):
)


def view_log(
async def view_log(
logpath,
mode,
tailer_tmpl,
Expand Down Expand Up @@ -307,9 +310,12 @@ def view_log(
cmd = tailer_tmpl % {"filename": shlex.quote(str(logpath))}
proc = Popen(shlex.split(cmd), stdin=DEVNULL) # nosec
# * batchview command is user configurable
with suppress(KeyboardInterrupt):
watch_and_kill(proc)
return proc.wait()
watcher = asyncio.create_task(watch_and_kill(Process(proc.pid)))
try:
ret = proc.wait()
finally:
watcher.cancel()
return ret


def get_option_parser() -> COP:
Expand Down Expand Up @@ -414,10 +420,10 @@ def main(
):
"""Wrapper around the main script for simpler testing.
"""
_main(parser, options, *ids, color=color)
asyncio.run(_main(parser, options, *ids, color=color))


def _main(
async def _main(
parser: COP,
options: 'Values',
*ids,
Expand Down Expand Up @@ -445,7 +451,7 @@ def _main(
batchview_cmd = options.remote_args[3]
except IndexError:
batchview_cmd = None
res = view_log(
res = await view_log(
logpath,
mode,
tail_tmpl,
Expand All @@ -458,7 +464,7 @@ def _main(
sys.exit(res)
return

workflow_id, tokens, _ = parse_id(*ids, constraint='mixed')
workflow_id, tokens, _ = await parse_id_async(*ids, constraint='mixed')

# Get long-format mode.
try:
Expand Down Expand Up @@ -522,7 +528,7 @@ def _main(
tail_tmpl = os.path.expandvars(
get_platform()["tail command template"]
)
out = view_log(
out = await view_log(
log_file_path,
mode,
tail_tmpl,
Expand Down Expand Up @@ -679,7 +685,7 @@ def _main(
# Local task job or local job log.
logpath = os.path.join(local_log_dir, options.filename)
tail_tmpl = os.path.expandvars(platform["tail command template"])
out = view_log(
out = await view_log(
logpath,
mode,
tail_tmpl,
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/scripts/cylc.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ def main() -> None: # pragma: no cover


def _main(opts, cmd_args) -> int:
"""Implemnent the Cylc CLI.
"""Implement the Cylc CLI.

Returns the exit code as an integer.
"""
Expand Down
11 changes: 8 additions & 3 deletions cylc/flow/scripts/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,13 @@
"""


import asyncio
from logging import getLevelName, INFO
import os
import sys
from typing import TYPE_CHECKING

from cylc.flow.id_cli import parse_id
from cylc.flow.id_cli import parse_id_async
from cylc.flow.option_parsers import (
WORKFLOW_ID_ARG_DOC,
CylcOptionParser as COP
Expand Down Expand Up @@ -142,6 +143,10 @@ def main(parser: COP, options: 'Values', *args: str) -> None:
if not args:
parser.error('No message supplied')
return
asyncio.run(_main(options, args))


async def _main(options, args):
if len(args) <= 2:
# BACK COMPAT: args <= 2
# from:
Expand All @@ -160,7 +165,7 @@ def main(parser: COP, options: 'Values', *args: str) -> None:
message_strs = list(args)
else:
workflow_id, job_id, *message_strs = args
workflow_id, *_ = parse_id(
workflow_id, *_ = await parse_id_async(
workflow_id,
constraint='workflows',
)
Expand Down Expand Up @@ -198,4 +203,4 @@ def main(parser: COP, options: 'Values', *args: str) -> None:
messages.append([options.severity, message_str.strip()])
else:
messages.append([getLevelName(INFO), message_str.strip()])
record_messages(workflow_id, job_id, messages)
await record_messages(workflow_id, job_id, messages)
Loading
Loading