forked from cylc/cylc-flow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclean.py
More file actions
263 lines (210 loc) · 7.67 KB
/
clean.py
File metadata and controls
263 lines (210 loc) · 7.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
#!/usr/bin/env python3
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""cylc clean [OPTIONS] ARGS
Delete a stopped workflow.
Remove workflow files from the local scheduler filesystem and any remote hosts
the workflow was installed on.
NOTE: this command is intended for workflows installed with `cylc install`. If
this is run for a workflow that was instead written directly in ~/cylc-run and
not backed up elsewhere, it will be lost.
It will also remove any symlink directory targets.
Workflow IDs can be hierarchical, corresponding to the path under ~/cylc-run.
Examples:
# Remove the workflow at ~/cylc-run/foo/bar
$ cylc clean foo/bar
# Remove multiple workflows
$ cylc clean one two three
# Remove the workflow's log directory
$ cylc clean foo/bar --rm log
# Remove the log and work directories
$ cylc clean foo/bar --rm log:work
# or
$ cylc clean foo/bar --rm log --rm work
# Remove all job log files from the 2020 cycle points
$ cylc clean foo/bar --rm 'log/job/2020*'
# Remove all .csv files
$ cylc clean foo/bar --rm '**/*.csv'
# Only remove the workflow on the local filesystem
$ cylc clean foo/bar --local-only
# Only remove the workflow on remote install targets
$ cylc clean foo/bar --remote-only
"""
import asyncio
from optparse import SUPPRESS_HELP
import sys
from typing import TYPE_CHECKING, Iterable, List, Tuple
from metomi.isodatetime.exceptions import ISO8601SyntaxError
from metomi.isodatetime.parsers import DurationParser
from cylc.flow import LOG
from cylc.flow.clean import init_clean, get_contained_workflows
from cylc.flow.exceptions import CylcError, InputError
import cylc.flow.flags
from cylc.flow.id_cli import parse_ids_async
from cylc.flow.loggingutil import set_timestamps
from cylc.flow.option_parsers import (
WORKFLOW_ID_MULTI_ARG_DOC,
CylcOptionParser as COP,
Options,
)
from cylc.flow.terminal import cli_function, is_terminal
if TYPE_CHECKING:
from optparse import Values
def get_option_parser():
parser = COP(
__doc__,
multiworkflow=True,
argdoc=[WORKFLOW_ID_MULTI_ARG_DOC],
segregated_log=True,
)
parser.add_option(
'--rm', metavar='DIR[:DIR:...]',
help=("Only clean the specified subdirectories (or files) in the "
"run directory, rather than the whole run directory. "
"Accepts quoted globs."),
action='append', dest='rm_dirs', default=[]
)
parser.add_option(
'--local-only', '--local',
help="Only clean on the local filesystem (not remote hosts).",
action='store_true', dest='local_only'
)
parser.add_option(
'--remote-only', '--remote',
help="Only clean on remote hosts (not the local filesystem).",
action='store_true', dest='remote_only'
)
parser.add_option(
'--yes', '-y',
help=(
"Skip interactive prompt if trying to clean multiple "
"run directories at once."
),
action='store_true', dest='skip_interactive'
)
parser.add_option(
'--timeout',
help=(
"The length of time to wait for cleaning to take place on "
r"remote hosts before cancelling. Default: %default."
),
action='store', default='PT5M', dest='remote_timeout'
)
parser.add_option(
'--no-scan',
help=SUPPRESS_HELP, action='store_true', dest='no_scan'
# Used on remote re-invocation - do not scan for workflows, just
# clean exactly what you were told to clean
)
return parser
CleanOptions = Options(get_option_parser())
def parse_timeout(opts: 'Values') -> None:
"""Parse timeout as ISO 8601 duration or number of seconds."""
if opts.remote_timeout:
try:
timeout = int(
DurationParser().parse(opts.remote_timeout).get_seconds()
)
except ISO8601SyntaxError:
try:
timeout = int(opts.remote_timeout)
except ValueError:
raise InputError(
f"Invalid timeout: {opts.remote_timeout}. Must be "
"an ISO 8601 duration or number of seconds."
) from None
opts.remote_timeout = str(timeout)
def prompt(workflows: Iterable[str]) -> None:
"""Ask user if they want to clean the given set of workflows."""
print("Would clean the following workflows:")
for workflow in workflows:
print(f' {workflow}')
if is_terminal():
while True:
ret = input('Remove these workflows (y/n): ')
if ret.lower() == 'y':
return
if ret.lower() == 'n':
sys.exit(1)
else:
print(
"Use --yes to remove multiple workflows in non-interactive mode.",
file=sys.stderr
)
sys.exit(1)
async def scan(
workflows: Iterable[str], multi_mode: bool
) -> Tuple[List[str], bool]:
"""Expand tuncated workflow IDs
For example "one" might expand to "one/run1" & "one/run2"
or "one/two/run1".
Returns (workflows, multi_mode)
"""
ret = []
for workflow in list(workflows):
contained_flows = await get_contained_workflows(workflow)
if contained_flows:
ret.extend(contained_flows)
multi_mode = True
else:
ret.append(workflow)
return ret, multi_mode
async def run(*ids: str, opts: 'Values') -> None:
if opts.no_scan:
workflows: Iterable[str] = ids
else:
# parse ids from the CLI
workflows, multi_mode = await parse_ids_async(
*ids,
constraint='workflows',
match_workflows=True,
match_active=False,
infer_latest_runs=False, # don't infer latest runs like other cmds
)
# expand partial workflow ids (including run names)
workflows, multi_mode = await scan(workflows, multi_mode)
if not workflows:
LOG.warning(f"No stopped workflows matching {', '.join(ids)}")
return
workflows.sort()
if multi_mode and not opts.skip_interactive:
prompt(workflows) # prompt for approval or exit
failed = False
for workflow in workflows:
try:
init_clean(workflow, opts)
except Exception as exc:
failed = True
LOG.error(f"Failed to clean {workflow}\nError: {exc}")
if cylc.flow.flags.verbosity > 0:
LOG.exception(exc)
if failed:
raise CylcError("Clean failed")
@cli_function(get_option_parser)
def main(_parser, opts: 'Values', *ids: str):
_main(opts, *ids)
def _main(opts: 'Values', *ids: str):
"""Run the clean command.
This is a separate function for ease of testing.
"""
if cylc.flow.flags.verbosity < 2:
set_timestamps(LOG, False)
if opts.local_only and opts.remote_only:
raise InputError(
"--local and --remote options are mutually exclusive"
)
parse_timeout(opts)
asyncio.run(run(*ids, opts=opts))