Skip to content

Commit f6c3d79

Browse files
authored
Merge pull request #6608 from cylc/8.4.x-sync
🤖 Merge 8.4.x-sync into master
2 parents 0e738b4 + 637b5d8 commit f6c3d79

30 files changed

+656
-225
lines changed

changes.d/6480.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
`cat-log`: List log files which are available via a configured tailer/viewer command.

changes.d/6535.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Ensure tasks can be killed while in the preparing state.

cylc/flow/network/server.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ class WorkflowRuntimeServer:
123123
124124
"""
125125
endpoints: Dict[str, object]
126+
curve_auth: ThreadAuthenticator
127+
"""The ZMQ authenticator."""
128+
client_pub_key_dir: str
129+
"""Client public key directory, used by the ZMQ authenticator."""
126130

127131
OPERATE_SLEEP_INTERVAL = 0.2
128132
STOP_SLEEP_INTERVAL = 0.2
@@ -136,8 +140,6 @@ def __init__(self, schd):
136140
self.publisher = None
137141
self.loop = None
138142
self.thread = None
139-
self.curve_auth = None
140-
self.client_pub_key_dir = None
141143

142144
self.schd: 'Scheduler' = schd
143145
self.resolvers = Resolvers(
@@ -184,10 +186,7 @@ def start(self, barrier):
184186
self.client_pub_key_dir = client_pub_keyinfo.key_path
185187

186188
# Initial load for the localhost key.
187-
self.curve_auth.configure_curve(
188-
domain='*',
189-
location=(self.client_pub_key_dir)
190-
)
189+
self.configure_curve()
191190

192191
min_, max_ = glbl_cfg().get(['scheduler', 'run hosts', 'ports'])
193192
self.replier = WorkflowReplier(self, context=self.zmq_context)
@@ -207,6 +206,11 @@ def start(self, barrier):
207206

208207
self.operate()
209208

209+
def configure_curve(self) -> None:
210+
self.curve_auth.configure_curve(
211+
domain='*', location=self.client_pub_key_dir
212+
)
213+
210214
async def stop(self, reason: Union[BaseException, str]) -> None:
211215
"""Stop the TCP servers, and clean up authentication.
212216

cylc/flow/option_parsers.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -242,19 +242,20 @@ class CylcOptionParser(OptionParser):
242242

243243
MULTITASK_USAGE = dedent('''
244244
This command can operate on multiple tasks. Globs and selectors may
245-
be used to match active tasks:
245+
be used to match tasks in the n=0 active window (except in the
246+
`cylc show` command, where globs match in the wider n-window):
246247
Multiple Tasks:
247248
# Operate on two tasks
248249
workflow //cycle-1/task-1 //cycle-2/task-2
249250
250-
Globs (note: globs should be quoted and only match active tasks):
251-
# Match any active task "foo" in all cycles
251+
Globs (note: quote globs; they only match in the active-window):
252+
# Match any active-window task "foo" in all cycles
252253
'//*/foo'
253254
254255
# Match the tasks "foo-1" and "foo-2"
255256
'//*/foo-[12]'
256257
257-
Selectors (note: selectors only match active tasks):
258+
Selectors (note: selectors only match in the active window too):
258259
# match all failed tasks in cycle "1"
259260
//1:failed
260261

cylc/flow/scheduler.py

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from collections import deque
2020
from contextlib import suppress
2121
import itertools
22+
import logging
2223
import os
2324
from pathlib import Path
2425
from queue import (
@@ -82,6 +83,7 @@
8283
FLOW_NONE,
8384
FlowMgr,
8485
repr_flow_nums,
86+
stringify_flow_nums,
8587
)
8688
from cylc.flow.host_select import (
8789
HostSelectException,
@@ -440,7 +442,8 @@ async def initialise(self):
440442
self.workflow_db_mgr,
441443
self.task_events_mgr,
442444
self.data_store_mgr,
443-
self.bad_hosts
445+
self.bad_hosts,
446+
self.server,
444447
)
445448

446449
self.profiler = Profiler(self, self.options.profile_mode)
@@ -910,9 +913,7 @@ def restart_remote_init(self):
910913
if install_target == get_localhost_install_target():
911914
continue
912915
# set off remote init
913-
self.task_job_mgr.task_remote_mgr.remote_init(
914-
platform, self.server.curve_auth,
915-
self.server.client_pub_key_dir)
916+
self.task_job_mgr.task_remote_mgr.remote_init(platform)
916917
# Remote init/file-install is done via process pool
917918
self.proc_pool.process()
918919
# add platform to map (to be picked up on main loop)
@@ -1078,18 +1079,21 @@ def kill_tasks(
10781079
to_kill: List[TaskProxy] = []
10791080
unkillable: List[TaskProxy] = []
10801081
for itask in itasks:
1081-
if itask.state(*TASK_STATUSES_ACTIVE):
1082-
if itask.state_reset(is_held=True):
1083-
self.data_store_mgr.delta_task_state(itask)
1082+
if not itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
1083+
unkillable.append(itask)
1084+
continue
1085+
if itask.state_reset(is_held=True):
1086+
self.data_store_mgr.delta_task_state(itask)
1087+
if itask.state(TASK_STATUS_PREPARING):
1088+
self.task_job_mgr.kill_prep_task(itask)
1089+
else:
10841090
to_kill.append(itask)
10851091
if jobless:
10861092
# Directly set failed in sim mode:
10871093
self.task_events_mgr.process_message(
10881094
itask, 'CRITICAL', TASK_STATUS_FAILED,
10891095
flag=self.task_events_mgr.FLAG_RECEIVED
10901096
)
1091-
else:
1092-
unkillable.append(itask)
10931097
if warn and unkillable:
10941098
LOG.warning(
10951099
"Tasks not killable: "
@@ -1250,6 +1254,7 @@ def get_contact_data(self) -> Dict[str, str]:
12501254
"""
12511255
fields = workflow_files.ContactFileFields
12521256
proc = psutil.Process()
1257+
platform = get_platform()
12531258
# fmt: off
12541259
return {
12551260
fields.API:
@@ -1275,11 +1280,11 @@ def get_contact_data(self) -> Dict[str, str]:
12751280
fields.VERSION:
12761281
CYLC_VERSION,
12771282
fields.SCHEDULER_SSH_COMMAND:
1278-
str(get_platform()['ssh command']),
1283+
str(platform['ssh command']),
12791284
fields.SCHEDULER_CYLC_PATH:
1280-
str(get_platform()['cylc path']),
1285+
str(platform['cylc path']),
12811286
fields.SCHEDULER_USE_LOGIN_SHELL:
1282-
str(get_platform()['use login shell'])
1287+
str(platform['use login shell'])
12831288
}
12841289
# fmt: on
12851290

@@ -1531,28 +1536,32 @@ def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool:
15311536
self.task_job_mgr.task_remote_mgr.rsync_includes = (
15321537
self.config.get_validated_rsync_includes())
15331538

1534-
log = LOG.debug
1539+
submitted = self.submit_task_jobs(itasks)
1540+
if not submitted:
1541+
return False
1542+
1543+
log_lvl = logging.DEBUG
15351544
if self.options.reftest or self.options.genref:
1536-
log = LOG.info
1545+
log_lvl = logging.INFO
15371546

1538-
for itask in self.task_job_mgr.submit_task_jobs(
1539-
itasks,
1540-
self.server.curve_auth,
1541-
self.server.client_pub_key_dir,
1542-
run_mode=self.get_run_mode()
1543-
):
1544-
if itask.flow_nums:
1545-
flow = ','.join(str(i) for i in itask.flow_nums)
1546-
else:
1547-
flow = FLOW_NONE
1548-
log(
1547+
for itask in submitted:
1548+
flow = stringify_flow_nums(itask.flow_nums) or FLOW_NONE
1549+
LOG.log(
1550+
log_lvl,
15491551
f"{itask.identity} -triggered off "
15501552
f"{itask.state.get_resolved_dependencies()} in flow {flow}"
15511553
)
15521554

15531555
# one or more tasks were passed through the submission pipeline
15541556
return True
15551557

1558+
def submit_task_jobs(
1559+
self, itasks: 'Iterable[TaskProxy]'
1560+
) -> 'List[TaskProxy]':
1561+
"""Submit task jobs, return tasks that attempted submission."""
1562+
# Note: keep this as simple wrapper for task job mgr's method
1563+
return self.task_job_mgr.submit_task_jobs(itasks, self.get_run_mode())
1564+
15561565
def process_workflow_db_queue(self):
15571566
"""Update workflow DB."""
15581567
self.workflow_db_mgr.process_queued_ops()

cylc/flow/scripts/cat_log.py

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -596,28 +596,51 @@ def _main(
596596
cmd.append('--prepend-path')
597597
cmd.append(workflow_id)
598598
# TODO: Add Intelligent Host selection to this
599+
proc = None
599600
with suppress(KeyboardInterrupt):
600601
# (Ctrl-C while tailing)
601602
# NOTE: This will raise NoHostsError if the platform is not
602603
# contactable
603-
remote_cylc_cmd(
604+
proc = remote_cylc_cmd(
604605
cmd,
605606
platform,
606-
capture_process=False,
607+
capture_process=(mode == 'list-dir'),
607608
manage=(mode == 'tail'),
608-
text=False
609+
text=(mode == 'list-dir'),
609610
)
610-
if (
611-
mode == 'list-dir'
612-
and os.path.exists(
613-
os.path.join(
614-
local_log_dir,
615-
'job-activity.log'
616-
)
617-
)
618-
):
619-
# add the local-only job-activity.log file to the remote-list
620-
print('job-activity.log')
611+
612+
# add and missing items to file listing results
613+
if isinstance(proc, Popen):
614+
# i.e: if mode=='list-dir' and ctrl+c not pressed
615+
out, err = proc.communicate()
616+
files = out.splitlines()
617+
618+
# add files which can be accessed via a tailer
619+
if live_job_id is not None:
620+
if (
621+
# NOTE: only list the file if it can be viewed in
622+
# both modes
623+
(platform['out tailer'] and platform['out viewer'])
624+
and 'job.out' not in files
625+
):
626+
files.append('job.out')
627+
if (
628+
(platform['err tailer'] and platform['err viewer'])
629+
and 'job.err' not in files
630+
):
631+
files.append('job.err')
632+
633+
# add the job-activity.log file which is always local
634+
if os.path.exists(
635+
os.path.join(local_log_dir, 'job-activity.log')
636+
):
637+
files.append('job-activity.log')
638+
639+
files.sort()
640+
print('\n'.join(files))
641+
print(err, file=sys.stderr)
642+
sys.exit(proc.returncode)
643+
621644
else:
622645
# Local task job or local job log.
623646
logpath = os.path.join(local_log_dir, options.filename)

cylc/flow/scripts/cylc.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,7 @@ def get_version(long=False):
146146
$ cylc pause foo/run1
147147
$ cylc stop foo/run1
148148
149-
In the case of numbered runs (e.g. "run1", "run2", ...) you can omit
150-
the run number, Cylc will infer latest run.
149+
If you omit run number ("run1", "run2", ...) Cylc will infer latest run.
151150
$ cylc play foo
152151
$ cylc pause foo
153152
$ cylc stop foo
@@ -164,8 +163,7 @@ def get_version(long=False):
164163
You can omit the user name when working on your own workflows.
165164
166165
Cycle / Family / Task / Job IDs:
167-
Just as workflows have IDs, the things within workflows have IDs too.
168-
These IDs take the format:
166+
Just as workflows have IDs, so do objects within workflows:
169167
cycle/task_or_family/job
170168
171169
Examples:
@@ -174,8 +172,7 @@ def get_version(long=False):
174172
1/a/1 # The first job of the task "a" in the cycle point "1".
175173
176174
Full ID
177-
We join the workflow and cycle/task/job IDs together using //:
178-
workflow//cycle/task/job
175+
Join workflow and cycle/task/job IDs with //: workflow//cycle/task/job
179176
180177
Examples:
181178
w// # The workflow "w"
@@ -201,8 +198,9 @@ def get_version(long=False):
201198
workflow//cycle/task/* # All jobs in workflow//cycle/job
202199
203200
Warning:
204-
Remember to write IDs inside single quotes when using them on the
205-
command line otherwise your shell may expand them.
201+
Quote IDs on the command line to protect them from shell expansion.
202+
Patterns only match tasks in the n=0 active window (except for the
203+
`cylc show` command where they match in the wider n-window).
206204
207205
Filters
208206
Filters allow you to filter for specific states.

cylc/flow/subprocctx.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from shlex import quote
2424
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
2525

26+
2627
from cylc.flow.wallclock import get_current_time_string
2728

2829
if TYPE_CHECKING:
@@ -137,6 +138,9 @@ def __str__(self):
137138
'mesg': mesg}
138139
return ret.rstrip()
139140

141+
def __repr__(self) -> str:
142+
return f"<{type(self).__name__} {self.cmd_key}>"
143+
140144

141145
class SubFuncContext(SubProcContext):
142146
"""Represent the context of a Python function to run as a subprocess.

0 commit comments

Comments
 (0)