forked from cylc/cylc-flow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcommands.py
More file actions
478 lines (408 loc) · 15.7 KB
/
commands.py
File metadata and controls
478 lines (408 loc) · 15.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Cylc scheduler commands.
These are the scripts which are actioned on the Scheduler instance when you
call a mutation.
Each is an async generator providing functionalities for:
* Validation:
* The generator is executed up to the first "yield" before the command is
queued.
* Validation and argument parsing can be performed at this stage.
* If the generator raises an Exception then the error message will be
communicated back to the user and the command will not be queued.
* If the execution string is not obvious to a user, catch the exception and
re-raise it as an InputError with a more obvious string.
* Any other exceptions will be treated as genuine errors.
* Execution:
* The generator is executed up to the second "yield" when the command is run
by the Scheduler's main-loop:
* The execution may also stop at a return or the end of the generator code.
* If the generator raises a CommandFailedError at this stage, the error will
be caught and logged.
* Any other exceptions will be treated as genuine errors.
In the future we may change this interface to allow generators to "yield" any
arbitrary number of strings to serve the function of communicating command
progress back to the user. For example, the generator might yield the messages:
* Command queued.
* Done 1/3 things
* Done 2/3 things
* Done 3/3 things
* Done
For more info see: https://github.com/cylc/cylc-flow/issues/3329
"""
from contextlib import suppress
from time import (
sleep,
time,
)
from typing import (
TYPE_CHECKING,
AsyncGenerator,
Callable,
Dict,
Iterable,
List,
Optional,
TypeVar,
)
from metomi.isodatetime.parsers import TimePointParser
from cylc.flow import LOG
import cylc.flow.command_validation as validate
from cylc.flow.exceptions import (
CommandFailedError,
CyclingError,
CylcConfigError,
)
import cylc.flow.flags
from cylc.flow.flow_mgr import get_flow_nums_set
from cylc.flow.log_level import log_level_to_verbosity
from cylc.flow.parsec.exceptions import ParsecError
from cylc.flow.run_modes import RunMode
from cylc.flow.task_id import TaskID
from cylc.flow.workflow_status import StopMode
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
if TYPE_CHECKING:
from enum import Enum
from cylc.flow.scheduler import Scheduler
# define a type for command implementations
Command = Callable[..., AsyncGenerator]
# define a generic type needed for the @_command decorator
_TCommand = TypeVar('_TCommand', bound=Command)
# a directory of registered commands (populated on module import)
COMMANDS: 'Dict[str, Command]' = {}
def _command(name: str):
"""Decorator to register a command."""
def _command(fcn: '_TCommand') -> '_TCommand':
COMMANDS[name] = fcn
fcn.command_name = name # type: ignore[attr-defined]
return fcn
return _command
async def run_cmd(bound_fcn: AsyncGenerator):
"""Run a command outside of the scheduler's main loop.
Normally commands are run via the Scheduler's command_queue (which is
correct), however, there are some use cases for running commands outside of
the loop:
* Running synchronous commands within the scheduler itself (e.g. on
shutdown).
* Integration tests (where you may want to cut out the main loop and
command queueing mechanism for simplicity).
For these purposes use "run_cmd", otherwise, queue commands via the
scheduler as normal.
"""
await bound_fcn.__anext__() # validate
with suppress(StopAsyncIteration):
return await bound_fcn.__anext__() # run
@_command('set')
async def set_prereqs_and_outputs(
schd: 'Scheduler',
tasks: List[str],
flow: List[str],
outputs: Optional[List[str]] = None,
prerequisites: Optional[List[str]] = None,
flow_wait: bool = False,
flow_descr: Optional[str] = None,
) -> AsyncGenerator:
"""Force spawn task successors.
Note, the "outputs" and "prerequisites" arguments might not be
populated in the mutation arguments so must provide defaults here.
"""
validate.consistency(outputs, prerequisites)
outputs = validate.outputs(outputs)
prerequisites = validate.prereqs(prerequisites)
validate.flow_opts(flow, flow_wait)
validate.is_tasks(tasks)
yield
if outputs is None:
outputs = []
if prerequisites is None:
prerequisites = []
yield schd.pool.set_prereqs_and_outputs(
tasks,
outputs,
prerequisites,
flow,
flow_wait,
flow_descr,
)
@_command('stop')
async def stop(
schd: 'Scheduler',
mode: 'Optional[Enum]',
cycle_point: Optional[str] = None,
# NOTE clock_time YYYY/MM/DD-HH:mm back-compat removed
clock_time: Optional[str] = None,
task: Optional[str] = None,
flow_num: Optional[int] = None,
):
if task:
validate.is_tasks([task])
yield
if flow_num:
schd.pool.stop_flow(flow_num)
return
if cycle_point is not None:
# schedule shutdown after tasks pass provided cycle point
point = TaskID.get_standardised_point(cycle_point)
if point is not None and schd.pool.set_stop_point(point):
schd.options.stopcp = str(point)
schd.config.stop_point = point
schd.workflow_db_mgr.put_workflow_stop_cycle_point(
schd.options.stopcp
)
schd._update_workflow_state()
elif clock_time is not None:
# schedule shutdown after wallclock time passes provided time
parser = TimePointParser()
schd.set_stop_clock(
int(parser.parse(clock_time).seconds_since_unix_epoch)
)
schd._update_workflow_state()
elif task is not None:
# schedule shutdown after task succeeds
task_id = TaskID.get_standardised_taskid(task)
schd.pool.set_stop_task(task_id)
schd._update_workflow_state()
else:
# immediate shutdown
try:
# BACK COMPAT: mode=None
# the mode can be `None` for commands issued from older Cylc
# versions
# From: 8.4
# To: 8.5
# Remove at: 8.x
mode = StopMode(mode.value) if mode else StopMode.REQUEST_CLEAN
except ValueError:
raise CommandFailedError(f"Invalid stop mode: '{mode}'") from None
schd._set_stop(mode)
if mode is StopMode.REQUEST_KILL:
schd.time_next_kill = time()
@_command('release')
async def release(schd: 'Scheduler', tasks: Iterable[str]):
"""Release held tasks."""
validate.is_tasks(tasks)
yield
yield schd.pool.release_held_tasks(tasks)
@_command('release_hold_point')
async def release_hold_point(schd: 'Scheduler'):
"""Release all held tasks and unset workflow hold after cycle point,
if set."""
yield
LOG.info("Releasing all tasks and removing hold cycle point.")
schd.pool.release_hold_point()
schd._update_workflow_state()
@_command('resume')
async def resume(schd: 'Scheduler'):
"""Resume paused workflow."""
yield
schd.resume_workflow()
@_command('poll_tasks')
async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]):
"""Poll pollable tasks or a task or family if options are provided."""
validate.is_tasks(tasks)
yield
if schd.get_run_mode() == RunMode.SIMULATION:
yield 0
itasks, _, bad_items = schd.pool.filter_task_proxies(tasks)
schd.task_job_mgr.poll_task_jobs(itasks)
yield len(bad_items)
@_command('kill_tasks')
async def kill_tasks(schd: 'Scheduler', tasks: Iterable[str]):
"""Kill tasks.
Args:
tasks: Tasks/families/globs to kill.
"""
validate.is_tasks(tasks)
yield
active, _, unmatched = schd.pool.filter_task_proxies(tasks)
num_unkillable = schd.kill_tasks(active)
yield len(unmatched) + num_unkillable
@_command('hold')
async def hold(schd: 'Scheduler', tasks: Iterable[str]):
"""Hold specified tasks."""
validate.is_tasks(tasks)
yield
yield schd.pool.hold_tasks(tasks)
@_command('set_hold_point')
async def set_hold_point(schd: 'Scheduler', point: str):
"""Hold all tasks after the specified cycle point."""
cycle_point = TaskID.get_standardised_point(point)
if cycle_point is None:
raise CyclingError("Cannot set hold point to None")
yield
LOG.info(
f"Setting hold cycle point: {cycle_point}\n"
"All tasks after this point will be held."
)
schd.pool.set_hold_point(cycle_point)
schd._update_workflow_state()
@_command('pause')
async def pause(schd: 'Scheduler'):
"""Pause the workflow."""
yield
schd.pause_workflow()
@_command('set_verbosity')
async def set_verbosity(schd: 'Scheduler', level: 'Enum'):
"""Set workflow verbosity."""
try:
LOG.setLevel(level.value)
except (TypeError, ValueError) as exc:
raise CommandFailedError(exc) from None
cylc.flow.flags.verbosity = log_level_to_verbosity(level.value)
yield
@_command('remove_tasks')
async def remove_tasks(
schd: 'Scheduler', tasks: Iterable[str], flow: List[str]
):
"""Remove tasks."""
validate.is_tasks(tasks)
validate.flow_opts(flow, flow_wait=False, allow_new_or_none=False)
yield
flow_nums = get_flow_nums_set(flow)
schd.remove_tasks(tasks, flow_nums)
@_command('reload_workflow')
async def reload_workflow(schd: 'Scheduler', reload_global: bool = False):
"""Reload workflow configuration."""
yield
# pause the workflow if not already
was_paused_before_reload = schd.is_paused
if not was_paused_before_reload:
schd.pause_workflow('Reloading workflow')
schd.process_workflow_db_queue() # see #5593
# flush out preparing tasks before attempting reload
schd.reload_pending = 'waiting for pending tasks to submit'
while schd.release_tasks_to_run():
# Run the subset of main-loop functionality required to push
# preparing through the submission pipeline and keep the workflow
# responsive (e.g. to the `cylc stop` command).
# NOTE: this reload method was called by process_command_queue
# which is called synchronously in the main loop so this call is
# blocking to other main loop functions
# subproc pool - for issueing/tracking remote-init commands
schd.proc_pool.process()
# task messages - for tracking task status changes
schd.process_queued_task_messages()
# command queue - keeps the scheduler responsive
await schd.process_command_queue()
# allows the scheduler to shutdown --now
await schd.workflow_shutdown()
# keep the data store up to date with what's going on
await schd.update_data_structure()
schd.update_data_store()
# give commands time to complete
sleep(1) # give any remove-init's time to complete
try:
# Back up the current config in case workflow reload errors
global_cfg_old = glbl_cfg()
if reload_global:
# Reload global config if requested
schd.reload_pending = 'reloading the global configuration'
schd.update_data_store() # update workflow status msg
await schd.update_data_structure()
LOG.info("Reloading the global configuration.")
glbl_cfg(reload=True)
# reload the workflow definition
schd.reload_pending = 'loading the workflow definition'
schd.update_data_store() # update workflow status msg
schd._update_workflow_state()
LOG.info("Reloading the workflow definition.")
config = schd.load_flow_file(is_reload=True)
except (ParsecError, CylcConfigError) as exc:
if cylc.flow.flags.verbosity > 1:
# log full traceback in debug mode
LOG.exception(exc)
LOG.critical(
f'Reload failed - {exc.__class__.__name__}: {exc}'
'\nThis is probably due to an issue with the new'
' configuration.'
'\nTo continue with the pre-reload config, un-pause the'
' workflow.'
'\nOtherwise, fix the configuration and attempt to reload'
' again.'
)
# Rollback global config
glbl_cfg().set_cache(global_cfg_old)
else:
schd.reload_pending = 'applying the new config'
old_tasks = set(schd.config.get_task_name_list())
# Things that can't change on workflow reload:
schd._set_workflow_params(
schd.workflow_db_mgr.pri_dao.select_workflow_params()
)
schd.apply_new_config(config, is_reload=True)
schd.broadcast_mgr.linearized_ancestors = (
schd.config.get_linearized_ancestors()
)
schd.task_events_mgr.mail_interval = schd.cylc_config['mail'][
'task event batch interval'
]
schd.task_events_mgr.mail_smtp = schd._get_events_conf("smtp")
schd.task_events_mgr.mail_footer = schd._get_events_conf("footer")
# Log tasks that have been added by the reload, removed tasks are
# logged by the TaskPool.
add = set(schd.config.get_task_name_list()) - old_tasks
for task in add:
LOG.warning(f"Added task: '{task}'")
schd.workflow_db_mgr.put_workflow_template_vars(schd.template_vars)
schd.workflow_db_mgr.put_runtime_inheritance(schd.config)
schd.workflow_db_mgr.put_workflow_params(schd)
schd.process_workflow_db_queue() # see #5593
schd.is_updated = True
schd.is_reloaded = True
schd._update_workflow_state()
# Re-initialise data model on reload
schd.data_store_mgr.initiate_data_model(schd.is_reloaded)
# Reset the remote init map to trigger fresh file installation
schd.task_job_mgr.task_remote_mgr.remote_init_map.clear()
schd.task_job_mgr.task_remote_mgr.is_reload = True
schd.pool.reload(config)
# Load jobs from DB
schd.workflow_db_mgr.pri_dao.select_jobs_for_restart(
schd.data_store_mgr.insert_db_job
)
if schd.pool.compute_runahead(force=True):
schd.pool.release_runahead_tasks()
schd.is_reloaded = True
schd.is_updated = True
LOG.info("Reload completed.")
# resume the workflow if previously paused
schd.reload_pending = False
schd.update_data_store() # update workflow status msg
schd._update_workflow_state()
if not was_paused_before_reload:
schd.resume_workflow()
schd.process_workflow_db_queue() # see #5593
@_command('force_trigger_tasks')
async def force_trigger_tasks(
schd: 'Scheduler',
tasks: Iterable[str],
flow: List[str],
flow_wait: bool = False,
flow_descr: Optional[str] = None,
on_resume: bool = False
):
"""Manual task trigger."""
validate.is_tasks(tasks)
validate.flow_opts(flow, flow_wait)
if on_resume:
LOG.warning(
"The --on-resume option is deprecated and will be removed "
"at Cylc 8.5."
)
yield
yield schd.pool.force_trigger_tasks(
tasks, flow, flow_wait, flow_descr, on_resume
)