Skip to content

Commit c52e752

Browse files
authored
Merge pull request #7085 from cylc/8.6.x-sync
🤖 Merge 8.6.x-sync into master
2 parents 3202105 + f876eb9 commit c52e752

File tree

19 files changed

+290
-169
lines changed

19 files changed

+290
-169
lines changed

changes.d/7035.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed platform subshell expression evaluating more than once for tasks triggered in paused workflow.

cylc/flow/exceptions.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,13 +184,13 @@ class PlatformError(CylcError):
184184
def __init__(
185185
self,
186186
message: str,
187-
platform_name: str,
187+
platform_name: str | None,
188188
*,
189-
ctx: 'Optional[SubFuncContext]' = None,
190-
cmd: Union[str, Sequence[str], None] = None,
191-
ret_code: Optional[int] = None,
192-
out: Optional[str] = None,
193-
err: Optional[str] = None
189+
ctx: 'SubFuncContext | None' = None,
190+
cmd: str | Sequence[str] | None = None,
191+
ret_code: int | None = None,
192+
out: str | None = None,
193+
err: str | None = None
194194
) -> None:
195195
self.msg = message
196196
self.platform_name = platform_name

cylc/flow/platforms.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def get_platform(
128128
129129
Returns:
130130
platform: A platform definition dictionary. Uses either
131-
get_platform() or platform_name_from_job_info(), but to the
131+
get_platform() or _platform_name_from_job_info(), but to the
132132
user these look the same. This will be None if the platform
133133
definition uses a subshell.
134134
@@ -169,7 +169,7 @@ def get_platform(
169169
task_job_section = task_conf['job'] if 'job' in task_conf else {}
170170
task_remote_section = task_conf['remote'] if 'remote' in task_conf else {}
171171
return platform_from_name(
172-
platform_name_from_job_info(
172+
_platform_name_from_job_info(
173173
glbl_cfg().get(['platforms']),
174174
task_job_section,
175175
task_remote_section,
@@ -329,7 +329,7 @@ def get_platform_from_group(
329329
return HOST_SELECTION_METHODS[method](platform_names)
330330

331331

332-
def platform_name_from_job_info(
332+
def _platform_name_from_job_info(
333333
platforms: Union[dict, 'OrderedDictWithDefaults'],
334334
job: Dict[str, Any],
335335
remote: Dict[str, Any],
@@ -407,14 +407,14 @@ def platform_name_from_job_info(
407407
... }
408408
>>> job = {'batch system': 'slurm'}
409409
>>> remote = {'host': 'localhost'}
410-
>>> platform_name_from_job_info(platforms, job, remote)
410+
>>> _platform_name_from_job_info(platforms, job, remote)
411411
'sugar'
412412
>>> remote = {}
413-
>>> platform_name_from_job_info(platforms, job, remote)
413+
>>> _platform_name_from_job_info(platforms, job, remote)
414414
'sugar'
415415
>>> remote ={'host': 'desktop92'}
416416
>>> job = {}
417-
>>> platform_name_from_job_info(platforms, job, remote)
417+
>>> _platform_name_from_job_info(platforms, job, remote)
418418
'desktop92'
419419
"""
420420

@@ -581,9 +581,9 @@ def fail_if_platform_and_host_conflict(
581581
if 'platform' in task_conf and task_conf['platform']:
582582
fail_items = [
583583
f'\n * [{section}]{key}'
584-
for section, keys in FORBIDDEN_WITH_PLATFORM.items()
584+
for section, settings in FORBIDDEN_WITH_PLATFORM.items()
585585
if section in task_conf
586-
for key, _ in keys.items()
586+
for key in settings.keys()
587587
if (
588588
key in task_conf[section] and
589589
task_conf[section][key] is not None

cylc/flow/scheduler.py

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,7 +1341,7 @@ def release_tasks_to_run(self) -> bool:
13411341
"""
13421342
pre_prep_tasks: Set['TaskProxy'] = set()
13431343
if (
1344-
self.stop_mode is None
1344+
not self.stop_mode
13451345
and self.auto_restart_time is None
13461346
and self.reload_pending is False
13471347
):
@@ -1350,36 +1350,29 @@ def release_tasks_to_run(self) -> bool:
13501350
pre_prep_tasks.update(self.pool.tasks_to_trigger_now)
13511351
self.pool.tasks_to_trigger_now = set()
13521352

1353-
if self.is_paused:
1354-
# finish processing preparing tasks
1355-
pre_prep_tasks.update({
1356-
itask for itask in self.pool.get_tasks()
1357-
if itask.waiting_on_job_prep
1358-
})
1359-
else:
1353+
if not self.is_paused:
13601354
# release queued tasks
13611355
pre_prep_tasks.update(self.pool.release_queued_tasks())
13621356

1363-
elif (
1364-
(
1365-
# Need to get preparing tasks to submit before auto restart
1366-
self.should_auto_restart_now()
1367-
and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL
1368-
) or (
1369-
# Need to get preparing tasks to submit before reload
1370-
self.reload_pending
1371-
)
1357+
if (
1358+
# Manually triggered tasks will be preparing and should
1359+
# be submitted even if paused (unless workflow is stopping).
1360+
self.is_paused and not self.stop_mode
1361+
) or (
1362+
# Need to get preparing tasks to submit before auto restart
1363+
self.should_auto_restart_now()
1364+
and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL
1365+
) or (
1366+
# Need to get preparing tasks to submit before reload
1367+
self.reload_pending
13721368
):
1373-
# finish processing preparing tasks first
1374-
pre_prep_tasks = {
1375-
itask for itask in self.pool.get_tasks()
1376-
if itask.state(TASK_STATUS_PREPARING)
1377-
}
1369+
pre_prep_tasks.update({
1370+
itask
1371+
for itask in self.pool.get_tasks()
1372+
if itask.waiting_on_job_prep
1373+
})
13781374

13791375
# Return, if no tasks to submit.
1380-
else:
1381-
return False
1382-
13831376
if not pre_prep_tasks:
13841377
return False
13851378

cylc/flow/task_job_mgr.py

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
NoPlatformsError,
5757
PlatformError,
5858
PlatformLookupError,
59-
WorkflowConfigError,
6059
)
6160
from cylc.flow.hostuserutil import (
6261
get_host,
@@ -70,6 +69,7 @@
7069
from cylc.flow.pathutil import get_remote_workflow_run_job_dir
7170
from cylc.flow.platforms import (
7271
FORBIDDEN_WITH_PLATFORM,
72+
fail_if_platform_and_host_conflict,
7373
get_host_from_platform,
7474
get_install_target_from_platform,
7575
get_localhost_install_target,
@@ -1168,15 +1168,7 @@ def _prep_submit_task_job(
11681168
# - host exists - eval host_n
11691169
# remove at:
11701170
# Cylc8.x
1171-
if (
1172-
rtconfig['platform'] is not None and
1173-
rtconfig['remote']['host'] is not None
1174-
):
1175-
raise WorkflowConfigError(
1176-
"A mixture of Cylc 7 (host) and Cylc 8 (platform) "
1177-
"logic should not be used. In this case for the task "
1178-
f"\"{itask.identity}\" the following are not compatible:\n"
1179-
)
1171+
fail_if_platform_and_host_conflict(rtconfig, itask.tdef.name)
11801172

11811173
host_name, platform_name = None, None
11821174
try:
@@ -1196,37 +1188,41 @@ def _prep_submit_task_job(
11961188
)
11971189

11981190
except PlatformError as exc:
1199-
itask.waiting_on_job_prep = False
1200-
itask.summary['platforms_used'][itask.submit_num] = ''
1201-
# Retry delays, needed for the try_num
1202-
self._create_job_log_path(itask)
1203-
self._set_retry_timers(itask, rtconfig)
1204-
self._prep_submit_task_job_error(
1205-
itask, '(remote host select)', exc
1206-
)
1191+
self._prep_submit_task_job_platform_error(itask, rtconfig, exc)
12071192
return False
12081193
else:
1209-
# host/platform select not ready
12101194
if host_name is None and platform_name is None:
1195+
# host/platform select not ready
12111196
return None
12121197
elif (
12131198
host_name is None
12141199
and rtconfig['platform']
12151200
and rtconfig['platform'] != platform_name
12161201
):
1217-
LOG.debug(
1202+
msg = (
12181203
f"for task {itask.identity}: platform = "
1219-
f"{rtconfig['platform']} evaluated as {platform_name}"
1204+
f"{rtconfig['platform']} evaluated as '{platform_name}'"
12201205
)
1221-
1206+
if not platform_name:
1207+
self._prep_submit_task_job_platform_error(
1208+
itask, rtconfig, msg
1209+
)
1210+
return False
1211+
LOG.debug(msg)
12221212
elif (
12231213
platform_name is None
12241214
and rtconfig['remote']['host'] != host_name
12251215
):
1226-
LOG.debug(
1216+
msg = (
12271217
f"[{itask}] host = "
1228-
f"{rtconfig['remote']['host']} evaluated as {host_name}"
1218+
f"{rtconfig['remote']['host']} evaluated as '{host_name}'"
12291219
)
1220+
if not host_name:
1221+
self._prep_submit_task_job_platform_error(
1222+
itask, rtconfig, msg
1223+
)
1224+
return False
1225+
LOG.debug(msg)
12301226

12311227
try:
12321228
platform = cast(
@@ -1293,6 +1289,20 @@ def _prep_submit_task_job(
12931289
itask.local_job_file_path = local_job_file_path
12941290
return itask
12951291

1292+
def _prep_submit_task_job_platform_error(
1293+
self, itask: 'TaskProxy', rtconfig: dict, exc: Exception | str
1294+
):
1295+
"""Helper for self._prep_submit_task_job. On platform selection error.
1296+
"""
1297+
itask.waiting_on_job_prep = False
1298+
itask.summary['platforms_used'][itask.submit_num] = ''
1299+
# Retry delays, needed for the try_num
1300+
self._create_job_log_path(itask)
1301+
self._set_retry_timers(itask, rtconfig)
1302+
self._prep_submit_task_job_error(
1303+
itask, '(remote host select)', exc
1304+
)
1305+
12961306
def _prep_submit_task_job_error(
12971307
self,
12981308
itask: 'TaskProxy',

cylc/flow/task_pool.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -900,8 +900,7 @@ def remove(self, itask: 'TaskProxy', reason: Optional[str] = None) -> None:
900900
except KeyError:
901901
pass
902902
else:
903-
with suppress(KeyError):
904-
self.tasks_to_trigger_now.remove(itask)
903+
self.tasks_to_trigger_now.discard(itask)
905904
self.tasks_removed = True
906905
self.active_tasks_changed = True
907906
if not self.active_tasks[itask.point]:
@@ -1042,7 +1041,7 @@ def count_active_tasks(self):
10421041

10431042
return active_task_counter, pre_prep_tasks
10441043

1045-
def release_queued_tasks(self):
1044+
def release_queued_tasks(self) -> set['TaskProxy']:
10461045
"""Return list of queue-released tasks awaiting job prep.
10471046
10481047
Note:
@@ -1071,7 +1070,7 @@ def release_queued_tasks(self):
10711070
self.spawn_on_all_outputs(itask)
10721071

10731072
# Note: released and pre_prep_tasks can overlap
1074-
return list(set(released + pre_prep_tasks))
1073+
return set(released + pre_prep_tasks)
10751074

10761075
def get_min_point(self):
10771076
"""Return the minimum cycle point currently in the pool."""

cylc/flow/task_remote_mgr.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
Dict,
3838
List,
3939
NamedTuple,
40-
Optional,
4140
Set,
4241
TYPE_CHECKING,
4342
Tuple,
@@ -120,8 +119,8 @@ def __init__(self, workflow, proc_pool, bad_hosts, db_mgr, server):
120119
self.server: WorkflowRuntimeServer = server
121120

122121
def _subshell_eval(
123-
self, eval_str: str, command_pattern: re.Pattern
124-
) -> Optional[str]:
122+
self, eval_str: str | None, command_pattern: re.Pattern
123+
) -> str | None:
125124
"""Evaluate a platform or host from a possible subshell string.
126125
127126
Arguments:
@@ -175,7 +174,7 @@ def _subshell_eval(
175174
# BACK COMPAT: references to "host"
176175
# remove at:
177176
# Cylc8.x
178-
def eval_host(self, host_str: str) -> Optional[str]:
177+
def eval_host(self, host_str: str | None) -> str | None:
179178
"""Evaluate a host from a possible subshell string.
180179
181180
Args:
@@ -191,7 +190,7 @@ def eval_host(self, host_str: str) -> Optional[str]:
191190
return 'localhost'
192191
return host
193192

194-
def eval_platform(self, platform_str: str) -> Optional[str]:
193+
def eval_platform(self, platform_str: str | None) -> str | None:
195194
"""Evaluate a platform from a possible subshell string.
196195
197196
Args:

tests/functional/job-submission/19-platform_select.t

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
#-------------------------------------------------------------------------------
1818
# Test recovery of a failed host select command for a group of tasks.
1919
. "$(dirname "$0")/test_header"
20-
set_test_number 7
20+
set_test_number 8
2121

2222
install_workflow "${TEST_NAME_BASE}"
2323

@@ -29,24 +29,28 @@ logfile="${WORKFLOW_RUN_DIR}/log/scheduler/log"
2929

3030
# Check that host = $(cmd) is correctly evaluated
3131
grep_ok \
32-
"1/host_subshell/01:.* evaluated as improbable host name$" \
32+
"1/host_subshell/01:.* evaluated as 'improbable host name'" \
3333
"${logfile}"
3434
grep_ok \
35-
"1/localhost_subshell/01:.* evaluated as localhost$" \
35+
"1/localhost_subshell/01:.* evaluated as 'localhost'" \
3636
"${logfile}"
3737

3838
# Check that host = `cmd` is correctly evaluated
3939
grep_ok \
40-
"1/host_subshell_backticks/01:.* evaluated as improbable host name$" \
40+
"1/host_subshell_backticks/01:.* evaluated as 'improbable host name'" \
4141
"${logfile}"
4242

4343
# Check that platform = $(cmd) correctly evaluated
4444
grep_ok \
45-
"1/platform_subshell:.* evaluated as improbable platform name$" \
45+
"1/platform_subshell:.* evaluated as 'improbable platform name'" \
4646
"${logfile}"
4747

4848
grep_ok \
49-
"1/platform_subshell_suffix:.* evaluated as prefix-middle-suffix$" \
49+
"1/platform_subshell_empty:.* evaluated as ''" \
5050
"${logfile}"
5151

52-
purge
52+
grep_ok \
53+
"1/platform_subshell_suffix:.* evaluated as 'prefix-middle-suffix'" \
54+
"${logfile}"
55+
56+
# purge

tests/functional/job-submission/19-platform_select/flow.cylc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ purpose = """
1717
localhost_subshell
1818
platform_subshell:submit-fail?
1919
platform_no_subshell:submit-fail?
20+
platform_subshell_empty:submit-fail?
2021
platform_subshell_suffix:submit-fail?
2122
host_subshell:submit-fail?
2223
host_subshell_backticks:submit-fail?
@@ -36,6 +37,9 @@ purpose = """
3637
[[platform_subshell]]
3738
platform = $(echo "improbable platform name")
3839

40+
[[platform_subshell_empty]]
41+
platform = $(echo "")
42+
3943
[[platform_subshell_suffix]]
4044
platform = prefix-$( echo middle )-suffix
4145

tests/functional/job-submission/19-platform_select/reference.log

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55
1/host_subshell_backticks -triggered off [] in flow 1
66
1/localhost_subshell -triggered off [] in flow 1
77
1/platform_subshell_suffix -triggered off [] in flow 1
8+
1/platform_subshell_empty -triggered off [] in flow 1

0 commit comments

Comments
 (0)