Skip to content

Commit c2a4099

Browse files
Code review changes
1 parent e42cc58 commit c2a4099

File tree

10 files changed

+81
-48
lines changed

10 files changed

+81
-48
lines changed

cylc/flow/remote.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
)
3535
import sys
3636
from typing import (
37+
TYPE_CHECKING,
3738
Any,
3839
Dict,
3940
List,
@@ -56,6 +57,9 @@
5657
)
5758
from cylc.flow.util import format_cmd
5859

60+
if TYPE_CHECKING:
61+
import psutil
62+
5963

6064
def get_proc_ancestors():
6165
"""Return list of parent PIDs back to init."""
@@ -76,7 +80,10 @@ def get_proc_ancestors():
7680
pid = ppid
7781

7882

79-
async def watch_and_kill(proc, interval=None):
83+
async def watch_and_kill(
84+
proc: 'psutil.Process',
85+
interval: float | None = None,
86+
):
8087
"""Watch a process and kill it if any of its parent processes change.
8188
8289
Processes exist in a tree which inherits from the process with PID 1.

cylc/flow/scripts/cat_log.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
$ cylc cat-log foo//2020/bar -m f
6262
"""
6363

64+
import asyncio
6465
import os
6566
from contextlib import suppress
6667
from glob import glob
@@ -70,10 +71,12 @@
7071
import sys
7172
from typing import TYPE_CHECKING
7273

74+
from psutil import Process
75+
7376
from cylc.flow.exceptions import InputError
7477
import cylc.flow.flags
7578
from cylc.flow.hostuserutil import is_remote_platform
76-
from cylc.flow.id_cli import parse_id
79+
from cylc.flow.id_cli import parse_id_async
7780
from cylc.flow.log_level import verbosity_to_opts
7881
from cylc.flow.option_parsers import (
7982
ID_MULTI_ARG_DOC,
@@ -244,7 +247,7 @@ def _check_fs_path(path):
244247
)
245248

246249

247-
def view_log(
250+
async def view_log(
248251
logpath,
249252
mode,
250253
tailer_tmpl,
@@ -308,7 +311,7 @@ def view_log(
308311
proc = Popen(shlex.split(cmd), stdin=DEVNULL) # nosec
309312
# * batchview command is user configurable
310313
with suppress(KeyboardInterrupt):
311-
watch_and_kill(proc)
314+
await watch_and_kill(Process(proc.pid))
312315
return proc.wait()
313316

314317

@@ -414,10 +417,10 @@ def main(
414417
):
415418
"""Wrapper around the main script for simpler testing.
416419
"""
417-
_main(parser, options, *ids, color=color)
420+
asyncio.run(_main(parser, options, *ids, color=color))
418421

419422

420-
def _main(
423+
async def _main(
421424
parser: COP,
422425
options: 'Values',
423426
*ids,
@@ -445,7 +448,7 @@ def _main(
445448
batchview_cmd = options.remote_args[3]
446449
except IndexError:
447450
batchview_cmd = None
448-
res = view_log(
451+
res = await view_log(
449452
logpath,
450453
mode,
451454
tail_tmpl,
@@ -458,7 +461,7 @@ def _main(
458461
sys.exit(res)
459462
return
460463

461-
workflow_id, tokens, _ = parse_id(*ids, constraint='mixed')
464+
workflow_id, tokens, _ = await parse_id_async(*ids, constraint='mixed')
462465

463466
# Get long-format mode.
464467
try:
@@ -522,7 +525,7 @@ def _main(
522525
tail_tmpl = os.path.expandvars(
523526
get_platform()["tail command template"]
524527
)
525-
out = view_log(
528+
out = await view_log(
526529
log_file_path,
527530
mode,
528531
tail_tmpl,
@@ -679,7 +682,7 @@ def _main(
679682
# Local task job or local job log.
680683
logpath = os.path.join(local_log_dir, options.filename)
681684
tail_tmpl = os.path.expandvars(platform["tail command template"])
682-
out = view_log(
685+
out = await view_log(
683686
logpath,
684687
mode,
685688
tail_tmpl,

cylc/flow/scripts/cylc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ def main() -> None: # pragma: no cover
653653

654654

655655
def _main(opts, cmd_args) -> int:
656-
"""Implemnent the Cylc CLI.
656+
"""Implement the Cylc CLI.
657657
658658
Returns the exit code as an integer.
659659
"""

cylc/flow/scripts/message.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,13 @@
9393
"""
9494

9595

96+
import asyncio
9697
from logging import getLevelName, INFO
9798
import os
9899
import sys
99100
from typing import TYPE_CHECKING
100101

101-
from cylc.flow.id_cli import parse_id
102+
from cylc.flow.id_cli import parse_id_async
102103
from cylc.flow.option_parsers import (
103104
WORKFLOW_ID_ARG_DOC,
104105
CylcOptionParser as COP
@@ -142,6 +143,10 @@ def main(parser: COP, options: 'Values', *args: str) -> None:
142143
if not args:
143144
parser.error('No message supplied')
144145
return
146+
asyncio.run(_main(options, args))
147+
148+
149+
async def _main(options, args):
145150
if len(args) <= 2:
146151
# BACK COMPAT: args <= 2
147152
# from:
@@ -160,7 +165,7 @@ def main(parser: COP, options: 'Values', *args: str) -> None:
160165
message_strs = list(args)
161166
else:
162167
workflow_id, job_id, *message_strs = args
163-
workflow_id, *_ = parse_id(
168+
workflow_id, *_ = await parse_id_async(
164169
workflow_id,
165170
constraint='workflows',
166171
)
@@ -198,4 +203,4 @@ def main(parser: COP, options: 'Values', *args: str) -> None:
198203
messages.append([options.severity, message_str.strip()])
199204
else:
200205
messages.append([getLevelName(INFO), message_str.strip()])
201-
record_messages(workflow_id, job_id, messages)
206+
await record_messages(workflow_id, job_id, messages)

cylc/flow/scripts/profiler.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@
2121
"""
2222

2323
import asyncio
24+
from contextlib import suppress
2425
import json
2526
import os
2627
import re
27-
import sys
2828
import signal
2929
import psutil
30-
import functools
3130

3231
from pathlib import Path
3332
from dataclasses import dataclass
@@ -53,20 +52,25 @@ class Process:
5352
cgroup_version: int
5453

5554

56-
def stop_profiler(process, comms_timeout, *_args):
55+
async def stop_profiler(process, comms_timeout, tasks, *_args):
5756
"""Stop the profiler and return its data to the scheduler.
5857
5958
This function will be executed when the profiler receives a stop signal.
6059
"""
60+
# stop the profiler
61+
for task in tasks:
62+
task.cancel()
63+
64+
# extract the stats
6165
profiler_data = get_profiler_data(process)
6266

63-
record_messages(
67+
# send a task message to the scheduler / write message to job.status file
68+
await record_messages(
6469
os.environ['CYLC_WORKFLOW_ID'],
6570
os.environ['CYLC_TASK_JOB'],
6671
[['DEBUG', f'_cylc_profiler: {json.dumps(profiler_data)}']],
6772
comms_timeout=comms_timeout,
6873
)
69-
sys.exit(0)
7074

7175

7276
def get_profiler_data(process):
@@ -219,7 +223,7 @@ async def profile(_process: Process, delay, keep_looping=lambda: True):
219223
while keep_looping():
220224
# Polling the cgroup for memory and keeping track of the max rss value
221225
max_rss = parse_memory_file(_process)
222-
if max_rss > _process.max_rss:
226+
if max_rss is not None and max_rss > _process.max_rss:
223227
_process.max_rss = max_rss
224228
await asyncio.sleep(delay)
225229

@@ -244,29 +248,36 @@ def get_option_parser() -> COP:
244248
@cli_function(get_option_parser)
245249
def main(_parser: COP, options) -> None:
246250
"""CLI main."""
247-
asyncio.run(_main(options))
251+
with suppress(SystemExit, asyncio.exceptions.CancelledError, Exception):
252+
asyncio.run(_main(options))
248253

249254

250255
async def _main(options) -> None:
251256
# get cgroup information
252257
process = get_cgroup_paths(options.cgroup_location)
253258

259+
# list of asyncio tasks
260+
tasks = []
261+
254262
# Register the stop_profiler function with the signal library
255263
# The signal library doesn't work with asyncio, so we have to use the
256264
# loop's add_signal_handler function instead
257265
loop = asyncio.get_running_loop()
258266
for sig in (signal.SIGINT, signal.SIGHUP, signal.SIGTERM):
259267
loop.add_signal_handler(
260268
sig,
261-
functools.partial(stop_profiler, process, options.comms_timeout)
269+
lambda: asyncio.create_task(
270+
stop_profiler(process, options.comms_timeout, tasks)
271+
),
262272
)
263273

264274
# the profiler will run until one of these coroutines calls `sys.exit`:
265-
await asyncio.gather(
275+
tasks.extend([
266276
# run the profiler itself
267-
profile(process, options.delay),
277+
asyncio.create_task(profile(process, options.delay)),
268278

269279
# kill the profiler if its PPID changes
270280
# (i.e, if the job exits before the profiler does)
271-
watch_and_kill(psutil.Process(os.getpid())),
272-
)
281+
asyncio.create_task(watch_and_kill(psutil.Process(os.getpid()))),
282+
])
283+
await asyncio.gather(*tasks)

cylc/flow/task_message.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def split_run_signal(message: str) -> tuple[str, str | None]:
9292
return prefix, signal[0] if signal else None
9393

9494

95-
def record_messages(
95+
async def record_messages(
9696
workflow: str,
9797
job_id: str,
9898
messages: List[list],
@@ -115,7 +115,8 @@ def record_messages(
115115
override_use_utc=(os.getenv('CYLC_UTC') == 'True'))
116116
write_messages(workflow, job_id, messages, event_time)
117117
if get_comms_method() != CommsMeth.POLL:
118-
send_messages(workflow, job_id, messages, event_time, comms_timeout)
118+
await send_messages(workflow, job_id, messages,
119+
event_time, comms_timeout)
119120

120121

121122
def write_messages(workflow, job_id, messages, event_time):
@@ -131,7 +132,7 @@ def write_messages(workflow, job_id, messages, event_time):
131132
_append_job_status_file(workflow, job_id, event_time, messages)
132133

133134

134-
def send_messages(
135+
async def send_messages(
135136
workflow: str,
136137
job_id: str,
137138
messages: List[list],
@@ -163,7 +164,7 @@ def send_messages(
163164
'messages': messages,
164165
}
165166
}
166-
pclient('graphql', mutation_kwargs)
167+
await pclient.async_request('graphql', mutation_kwargs)
167168

168169

169170
def _append_job_status_file(workflow, job_id, event_time, messages):

tests/functional/cylc-cat-log/12-delete-kill.t

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
# or when tail is killed.
2020

2121
. "$(dirname "$0")/test_header"
22-
set_test_number 2
22+
set_test_number 1
2323

2424
# Get PID of tail cmd given the parent cat-log PPID
2525
get_tail_pid() {
@@ -33,6 +33,8 @@ __EOF__
3333
log_file="${WORKFLOW_RUN_DIR}/log/foo.log"
3434
echo "Hello, Mr. Thompson" > "$log_file"
3535

36+
export CYLC_PROC_POLL_INTERVAL=0.5
37+
3638
TEST_NAME="${TEST_NAME_BASE}-delete"
3739
cylc cat-log --mode=tail "$WORKFLOW_NAME" -f foo.log 2>&1 &
3840
cat_log_pid="$!"

tests/integration/scripts/test_cat_log.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,18 @@ def brokendir(run_dir):
3939
shutil.rmtree(brokendir)
4040

4141

42-
def test_fail_no_file(flow):
42+
@pytest.mark.asyncio
43+
async def test_fail_no_file(flow):
4344
"""It produces a helpful error if there is no workflow log file.
4445
"""
4546
parser = cat_log_gop()
4647
id_ = flow({})
4748
with pytest.raises(InputError, match='Log file not found.'):
48-
cat_log(parser, Options(parser)(), id_)
49+
await cat_log(parser, Options(parser)(), id_)
4950

5051

51-
def test_fail_rotation_out_of_range(flow):
52+
@pytest.mark.asyncio
53+
async def test_fail_rotation_out_of_range(flow):
5254
"""It produces a helpful error if rotation number > number of log files.
5355
"""
5456
parser = cat_log_gop()
@@ -60,15 +62,16 @@ def test_fail_rotation_out_of_range(flow):
6062
(logpath / '01-start-01.log').touch()
6163

6264
with pytest.raises(SystemExit):
63-
cat_log(parser, Options(parser)(rotation_num=0), id_)
65+
await cat_log(parser, Options(parser)(rotation_num=0), id_)
6466

6567
msg = r'--rotation 1 invalid \(max value is 0\)'
6668

6769
with pytest.raises(InputError, match=msg):
68-
cat_log(parser, Options(parser)(rotation_num=1), id_)
70+
await cat_log(parser, Options(parser)(rotation_num=1), id_)
6971

7072

71-
def test_bad_workflow(run_dir):
73+
@pytest.mark.asyncio
74+
async def test_bad_workflow(run_dir):
7275
"""Test "cylc cat-log" with bad workflow name."""
7376
parser = cat_log_gop()
7477
msg = re.compile(
@@ -77,15 +80,16 @@ def test_bad_workflow(run_dir):
7780
re.MULTILINE
7881
)
7982
with pytest.raises(InputError, match=msg):
80-
cat_log(parser, Options(parser)(filename='l'), BAD_NAME)
83+
await cat_log(parser, Options(parser)(filename='l'), BAD_NAME)
8184

8285

83-
def test_bad_workflow2(run_dir, brokendir, capsys):
86+
@pytest.mark.asyncio
87+
async def test_bad_workflow2(run_dir, brokendir, capsys):
8488
"""Check a non existent file in a valid workflow results in error.
8589
"""
8690
parser = cat_log_gop()
8791
with pytest.raises(SystemExit, match='1'):
88-
cat_log(
92+
await cat_log(
8993
parser,
9094
Options(parser)(filename='j'),
9195
BAD_NAME
@@ -96,12 +100,13 @@ def test_bad_workflow2(run_dir, brokendir, capsys):
96100
assert capsys.readouterr().err == msg
97101

98102

99-
def test_bad_task_dir(run_dir, brokendir, capsys):
103+
@pytest.mark.asyncio
104+
async def test_bad_task_dir(run_dir, brokendir, capsys):
100105
"""Check a non-existent job log dir in a valid workflow results in error.
101106
"""
102107
parser = cat_log_gop()
103108
with pytest.raises(SystemExit, match='1'):
104-
cat_log(
109+
await cat_log(
105110
parser,
106111
Options(parser)(mode='list-dir'),
107112
BAD_NAME + "//1/foo"

0 commit comments

Comments
 (0)