Skip to content

Commit 9fe7f51

Browse files
authored
Merge pull request #6535 from MetRonnie/kill-prep
Kill tasks during job prep
2 parents 1593ef3 + c28a565 commit 9fe7f51

File tree

24 files changed

+507
-215
lines changed

24 files changed

+507
-215
lines changed

changes.d/6535.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Ensure tasks can be killed while in the preparing state.

cylc/flow/network/server.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ class WorkflowRuntimeServer:
123123
124124
"""
125125
endpoints: Dict[str, object]
126+
curve_auth: ThreadAuthenticator
127+
"""The ZMQ authenticator."""
128+
client_pub_key_dir: str
129+
"""Client public key directory, used by the ZMQ authenticator."""
126130

127131
OPERATE_SLEEP_INTERVAL = 0.2
128132
STOP_SLEEP_INTERVAL = 0.2
@@ -136,8 +140,6 @@ def __init__(self, schd):
136140
self.publisher = None
137141
self.loop = None
138142
self.thread = None
139-
self.curve_auth = None
140-
self.client_pub_key_dir = None
141143

142144
self.schd: 'Scheduler' = schd
143145
self.resolvers = Resolvers(
@@ -184,10 +186,7 @@ def start(self, barrier):
184186
self.client_pub_key_dir = client_pub_keyinfo.key_path
185187

186188
# Initial load for the localhost key.
187-
self.curve_auth.configure_curve(
188-
domain='*',
189-
location=(self.client_pub_key_dir)
190-
)
189+
self.configure_curve()
191190

192191
min_, max_ = glbl_cfg().get(['scheduler', 'run hosts', 'ports'])
193192
self.replier = WorkflowReplier(self, context=self.zmq_context)
@@ -207,6 +206,11 @@ def start(self, barrier):
207206

208207
self.operate()
209208

209+
def configure_curve(self) -> None:
210+
self.curve_auth.configure_curve(
211+
domain='*', location=self.client_pub_key_dir
212+
)
213+
210214
async def stop(self, reason: Union[BaseException, str]) -> None:
211215
"""Stop the TCP servers, and clean up authentication.
212216

cylc/flow/scheduler.py

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from collections import deque
2020
from contextlib import suppress
2121
import itertools
22+
import logging
2223
import os
2324
from pathlib import Path
2425
from queue import (
@@ -82,6 +83,7 @@
8283
FLOW_NONE,
8384
FlowMgr,
8485
repr_flow_nums,
86+
stringify_flow_nums,
8587
)
8688
from cylc.flow.host_select import (
8789
HostSelectException,
@@ -440,7 +442,8 @@ async def initialise(self):
440442
self.workflow_db_mgr,
441443
self.task_events_mgr,
442444
self.data_store_mgr,
443-
self.bad_hosts
445+
self.bad_hosts,
446+
self.server,
444447
)
445448

446449
self.profiler = Profiler(self, self.options.profile_mode)
@@ -910,9 +913,7 @@ def restart_remote_init(self):
910913
if install_target == get_localhost_install_target():
911914
continue
912915
# set off remote init
913-
self.task_job_mgr.task_remote_mgr.remote_init(
914-
platform, self.server.curve_auth,
915-
self.server.client_pub_key_dir)
916+
self.task_job_mgr.task_remote_mgr.remote_init(platform)
916917
# Remote init/file-install is done via process pool
917918
self.proc_pool.process()
918919
# add platform to map (to be picked up on main loop)
@@ -1078,18 +1079,21 @@ def kill_tasks(
10781079
to_kill: List[TaskProxy] = []
10791080
unkillable: List[TaskProxy] = []
10801081
for itask in itasks:
1081-
if itask.state(*TASK_STATUSES_ACTIVE):
1082-
if itask.state_reset(is_held=True):
1083-
self.data_store_mgr.delta_task_state(itask)
1082+
if not itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
1083+
unkillable.append(itask)
1084+
continue
1085+
if itask.state_reset(is_held=True):
1086+
self.data_store_mgr.delta_task_state(itask)
1087+
if itask.state(TASK_STATUS_PREPARING):
1088+
self.task_job_mgr.kill_prep_task(itask)
1089+
else:
10841090
to_kill.append(itask)
10851091
if jobless:
10861092
# Directly set failed in sim mode:
10871093
self.task_events_mgr.process_message(
10881094
itask, 'CRITICAL', TASK_STATUS_FAILED,
10891095
flag=self.task_events_mgr.FLAG_RECEIVED
10901096
)
1091-
else:
1092-
unkillable.append(itask)
10931097
if warn and unkillable:
10941098
LOG.warning(
10951099
"Tasks not killable: "
@@ -1250,6 +1254,7 @@ def get_contact_data(self) -> Dict[str, str]:
12501254
"""
12511255
fields = workflow_files.ContactFileFields
12521256
proc = psutil.Process()
1257+
platform = get_platform()
12531258
# fmt: off
12541259
return {
12551260
fields.API:
@@ -1275,11 +1280,11 @@ def get_contact_data(self) -> Dict[str, str]:
12751280
fields.VERSION:
12761281
CYLC_VERSION,
12771282
fields.SCHEDULER_SSH_COMMAND:
1278-
str(get_platform()['ssh command']),
1283+
str(platform['ssh command']),
12791284
fields.SCHEDULER_CYLC_PATH:
1280-
str(get_platform()['cylc path']),
1285+
str(platform['cylc path']),
12811286
fields.SCHEDULER_USE_LOGIN_SHELL:
1282-
str(get_platform()['use login shell'])
1287+
str(platform['use login shell'])
12831288
}
12841289
# fmt: on
12851290

@@ -1531,29 +1536,32 @@ def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool:
15311536
self.task_job_mgr.task_remote_mgr.rsync_includes = (
15321537
self.config.get_validated_rsync_includes())
15331538

1534-
log = LOG.debug
1539+
submitted = self.submit_task_jobs(itasks)
1540+
if not submitted:
1541+
return False
1542+
1543+
log_lvl = logging.DEBUG
15351544
if self.options.reftest or self.options.genref:
1536-
log = LOG.info
1545+
log_lvl = logging.INFO
15371546

1538-
for itask in self.task_job_mgr.submit_task_jobs(
1539-
self.workflow,
1540-
itasks,
1541-
self.server.curve_auth,
1542-
self.server.client_pub_key_dir,
1543-
run_mode=self.get_run_mode()
1544-
):
1545-
if itask.flow_nums:
1546-
flow = ','.join(str(i) for i in itask.flow_nums)
1547-
else:
1548-
flow = FLOW_NONE
1549-
log(
1547+
for itask in submitted:
1548+
flow = stringify_flow_nums(itask.flow_nums) or FLOW_NONE
1549+
LOG.log(
1550+
log_lvl,
15501551
f"{itask.identity} -triggered off "
15511552
f"{itask.state.get_resolved_dependencies()} in flow {flow}"
15521553
)
15531554

15541555
# one or more tasks were passed through the submission pipeline
15551556
return True
15561557

1558+
def submit_task_jobs(
1559+
self, itasks: 'Iterable[TaskProxy]'
1560+
) -> 'List[TaskProxy]':
1561+
"""Submit task jobs, return tasks that attempted submission."""
1562+
# Note: keep this as simple wrapper for task job mgr's method
1563+
return self.task_job_mgr.submit_task_jobs(itasks, self.get_run_mode())
1564+
15571565
def process_workflow_db_queue(self):
15581566
"""Update workflow DB."""
15591567
self.workflow_db_mgr.process_queued_ops()

cylc/flow/subprocctx.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from shlex import quote
2424
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
2525

26+
2627
from cylc.flow.wallclock import get_current_time_string
2728

2829
if TYPE_CHECKING:
@@ -137,6 +138,9 @@ def __str__(self):
137138
'mesg': mesg}
138139
return ret.rstrip()
139140

141+
def __repr__(self) -> str:
142+
return f"<{type(self).__name__} {self.cmd_key}>"
143+
140144

141145
class SubFuncContext(SubProcContext):
142146
"""Represent the context of a Python function to run as a subprocess.

0 commit comments

Comments
 (0)