Skip to content

Commit f49e2ba

Browse files
authored
Merge branch '8.5.x' into group-trigger-followup
2 parents c22dbc7 + 6578e3e commit f49e2ba

File tree

24 files changed

+508
-119
lines changed

24 files changed

+508
-119
lines changed

changes.d/6836.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a bug causing the results of `platform = $(subshell commands)` to be cached, and preventing re-evaluation for each task with the same config.

changes.d/6911.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix an issue where trigger could fail to run a task, removing it instead.

changes.d/6914.fix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fixed bug where jobs in the UI could regress to an earlier state.

changes.d/6924.feat.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Include `log/version/` files in `cylc cat-log` and the UI log view.

cylc/flow/cfgspec/workflow.py

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,14 +1158,60 @@ def get_script_common_text(this: str, example: Optional[str] = None):
11581158
.. versionadded:: 8.3.0
11591159
''')
11601160
Conf('platform', VDR.V_STRING, desc='''
1161-
The name of a compute resource defined in
1162-
:cylc:conf:`global.cylc[platforms]` or
1163-
:cylc:conf:`global.cylc[platform groups]`.
1161+
The platform to submit jobs to.
11641162
1165-
The platform specifies the host(s) that the tasks' jobs
1163+
The
1164+
:cylc:conf:`platform <global.cylc[platforms][<platform name>]>`
1165+
specifies the host(s) that the tasks' jobs
11661166
will run on and where (if necessary) files need to be
11671167
installed, and what job runner will be used.
11681168
1169+
This can be:
1170+
1171+
* A :cylc:conf:`platform <global.cylc[platforms]>`,
1172+
* a :cylc:conf:`platform group <global.cylc[platform groups]>`,
1173+
* or a command which returns a platform or platform group.
1174+
1175+
To see what platforms have been configured at your site, run
1176+
``cylc config --platform-names``.
1177+
1178+
.. rubric:: Commands:
1179+
1180+
The ``platform`` can be set to a command which returns the name
1181+
of the platform to submit jobs to using the ``$()`` syntax,
1182+
i.e:
1183+
1184+
.. code-block:: cylc
1185+
1186+
platform = $(command)
1187+
1188+
The configured command will be evaluated for each job
1189+
submission (i.e, different submissions of the same task may
1190+
submit on different platforms).
1191+
1192+
Cylc batches job submissions, so when multiple jobs are
1193+
submitted at the same time, using a platform defined by the
1194+
same command, the command will be run once, and all jobs in the
1195+
batch will submit to the same platform.
1196+
1197+
Note: do not use a command to configure a list of login nodes.
1198+
Instead, define a platform and configure the login nodes it
1199+
can use; see
1200+
:ref:`config.platforms.cluster_with_multiple_login_nodes`.
1201+
1202+
.. rubric:: Examples:
1203+
1204+
.. code-block:: cylc
1205+
1206+
# run the job on the same host the Cylc scheduler runs on
1207+
platform = localhost
1208+
1209+
# run the job on a platform (or platform group) called hpc
1210+
platform = hpc
1211+
1212+
# run a command to select the platform (or platform group):
1213+
platform = $(select-platform)
1214+
11691215
.. versionadded:: 8.0.0
11701216
''')
11711217
Conf('inherit', VDR.V_STRING_LIST, desc='''

cylc/flow/commands.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@
9292
from cylc.flow.task_id import TaskID
9393
from cylc.flow.task_state import (
9494
TASK_STATUS_PREPARING,
95-
TASK_STATUS_WAITING,
9695
TASK_STATUSES_ACTIVE,
9796
)
9897
from cylc.flow.taskdef import generate_graph_children
@@ -751,7 +750,7 @@ def _force_trigger_tasks(
751750
# Remove non group start and final-status group start tasks, and
752751
# trigger them from scratch (so only the TaskDef matters).
753752

754-
# Waiting group start tasks are not removed, but a reload would
753+
# Group start tasks are not removed, but a reload would
755754
# replace them, so using the TaskDef is fine.
756755

757756
if not any(
@@ -774,12 +773,13 @@ def _force_trigger_tasks(
774773
(str(itask.point), itask.tdef.name)] = (label, msg)
775774

776775
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
776+
# This is a live active group start task
777777
warnings_has_job.append(str(itask))
778778
# Just merge the flows.
779779
schd.pool.merge_flows(itask, flow_nums)
780780

781-
elif itask.state(TASK_STATUS_WAITING):
782-
# This is a waiting active group start task...
781+
else:
782+
# This is a non-live active group start task...
783783
# ... satisfy off-group (i.e. all) prerequisites
784784
itask.state.set_all_task_prerequisites_satisfied()
785785
# ... and satisfy all xtrigger prerequisites.
@@ -791,8 +791,7 @@ def _force_trigger_tasks(
791791

792792
# Trigger group start task.
793793
schd.pool.queue_or_trigger(itask, on_resume)
794-
else:
795-
active_to_remove.append(itask)
794+
796795
else:
797796
active_to_remove.append(itask)
798797

cylc/flow/data_store_mgr.py

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
Set,
6868
TYPE_CHECKING,
6969
Tuple,
70+
Union,
7071
)
7172
import zlib
7273

@@ -1609,13 +1610,20 @@ def _apply_broadcasts_to_runtime(self, tokens, rtconfig):
16091610
poverride(rtconfig, overrides, prepend=True)
16101611
return rtconfig
16111612

1612-
def insert_job(self, name, cycle_point, status, job_conf):
1613+
def insert_job(
1614+
self,
1615+
name: str,
1616+
cycle_point: Union['PointBase', str],
1617+
status: str,
1618+
job_conf: dict,
1619+
):
16131620
"""Insert job into data-store.
16141621
16151622
Args:
1616-
name (str): Corresponding task name.
1617-
cycle_point (str|PointBase): Cycle point string
1618-
job_conf (dic):
1623+
name: Corresponding task name.
1624+
cycle_point: Cycle point string
1625+
status: The task's state.
1626+
job_conf:
16191627
Dictionary of job configuration used to generate
16201628
the job script.
16211629
(see TaskJobManager._prep_submit_task_job_impl)
@@ -1625,6 +1633,11 @@ def insert_job(self, name, cycle_point, status, job_conf):
16251633
None
16261634
16271635
"""
1636+
if status not in JOB_STATUS_SET:
1637+
# Ignore task-only states e.g. preparing
1638+
# https://github.com/cylc/cylc-flow/issues/4994
1639+
return
1640+
16281641
sub_num = job_conf['submit_num']
16291642
tp_tokens = self.id_.duplicate(
16301643
cycle=str(cycle_point),
@@ -1641,9 +1654,6 @@ def insert_job(self, name, cycle_point, status, job_conf):
16411654
# Job already exists (i.e. post-submission submit failure)
16421655
return
16431656

1644-
if status not in JOB_STATUS_SET:
1645-
return
1646-
16471657
j_buf = PbJob(
16481658
stamp=f'{j_id}@{update_time}',
16491659
id=j_id,
@@ -2761,12 +2771,12 @@ def delta_job_msg(self, tokens: Tokens, msg: str) -> None:
27612771

27622772
def delta_job_attr(
27632773
self,
2764-
tokens: Tokens,
2774+
itask: 'TaskProxy',
27652775
attr_key: str,
27662776
attr_val: Any,
27672777
) -> None:
27682778
"""Set job attribute."""
2769-
j_id, job = self.store_node_fetcher(tokens)
2779+
j_id, job = self.store_node_fetcher(itask.job_tokens)
27702780
if not job:
27712781
return
27722782
j_delta = PbJob(stamp=f'{j_id}@{time()}')
@@ -2779,12 +2789,19 @@ def delta_job_attr(
27792789

27802790
def delta_job_state(
27812791
self,
2782-
tokens: Tokens,
2792+
itask: 'TaskProxy',
27832793
status: str,
27842794
) -> None:
27852795
"""Set job state."""
2786-
j_id, job = self.store_node_fetcher(tokens)
2787-
if not job or status not in JOB_STATUS_SET:
2796+
if status not in JOB_STATUS_SET:
2797+
# Ignore task-only states e.g. preparing
2798+
return
2799+
j_id, job = self.store_node_fetcher(itask.job_tokens)
2800+
if not job or (
2801+
# Don't cause backwards state change:
2802+
JOB_STATUSES_ALL.index(status) <= JOB_STATUSES_ALL.index(job.state)
2803+
and not itask.job_vacated
2804+
):
27882805
return
27892806
j_delta = PbJob(
27902807
stamp=f'{j_id}@{time()}',
@@ -2798,15 +2815,15 @@ def delta_job_state(
27982815

27992816
def delta_job_time(
28002817
self,
2801-
tokens: Tokens,
2818+
itask: 'TaskProxy',
28022819
event_key: str,
28032820
time_str: Optional[str] = None,
28042821
) -> None:
28052822
"""Set an event time in job pool object.
28062823
28072824
Set values of both event_key + '_time' and event_key + '_time_string'.
28082825
"""
2809-
j_id, job = self.store_node_fetcher(tokens)
2826+
j_id, job = self.store_node_fetcher(itask.job_tokens)
28102827
if not job:
28112828
return
28122829
j_delta = PbJob(stamp=f'{j_id}@{time()}')

cylc/flow/install_plugins/log_vc_info.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,8 +285,8 @@ def write_vc_info(
285285
)
286286
info_file.parent.mkdir(exist_ok=True, parents=True)
287287
with open(info_file, 'w') as f:
288-
f.write(
289-
json.dumps(info, indent=JSON_INDENT)
288+
print(
289+
json.dumps(info, indent=JSON_INDENT), file=f
290290
)
291291

292292

@@ -356,7 +356,7 @@ def write_diff(
356356
try:
357357
_run_cmd(vcs, args, repo_path, stdout=f)
358358
except VCSMissingBaseError as exc:
359-
f.write(f"# No diff - {exc}")
359+
print(f"# No diff - {exc}", file=f)
360360
return diff_file
361361

362362

cylc/flow/platforms.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ def log_platform_event(
8383
def get_platform(
8484
task_conf: Optional[str] = None,
8585
task_name: str = UNKNOWN_TASK,
86-
bad_hosts: Optional[Set[str]] = None
86+
bad_hosts: Optional[Set[str]] = None,
87+
evaluated_host: Optional[str] = None,
8788
) -> Dict[str, Any]:
8889
...
8990

@@ -92,7 +93,8 @@ def get_platform(
9293
def get_platform(
9394
task_conf: Union[dict, 'OrderedDictWithDefaults'],
9495
task_name: str = UNKNOWN_TASK,
95-
bad_hosts: Optional[Set[str]] = None
96+
bad_hosts: Optional[Set[str]] = None,
97+
evaluated_host: Optional[str] = None,
9698
) -> Optional[Dict[str, Any]]:
9799
...
98100

@@ -108,7 +110,8 @@ def get_platform(
108110
def get_platform(
109111
task_conf: Union[str, dict, 'OrderedDictWithDefaults', None] = None,
110112
task_name: str = UNKNOWN_TASK,
111-
bad_hosts: Optional[Set[str]] = None
113+
bad_hosts: Optional[Set[str]] = None,
114+
evaluated_host: Optional[str] = None,
112115
) -> Optional[Dict[str, Any]]:
113116
"""Get a platform.
114117
@@ -121,6 +124,7 @@ def get_platform(
121124
task_name: Help produce more helpful error messages.
122125
bad_hosts: A set of hosts known to be unreachable (had an ssh 255
123126
error)
127+
evaluated_host: Host name evaluated from platform subshell.
124128
125129
Returns:
126130
platform: A platform definition dictionary. Uses either
@@ -169,6 +173,7 @@ def get_platform(
169173
glbl_cfg().get(['platforms']),
170174
task_job_section,
171175
task_remote_section,
176+
evaluated_host,
172177
),
173178
bad_hosts=bad_hosts,
174179
)
@@ -330,7 +335,8 @@ def get_platform_from_group(
330335
def platform_name_from_job_info(
331336
platforms: Union[dict, 'OrderedDictWithDefaults'],
332337
job: Dict[str, Any],
333-
remote: Dict[str, Any]
338+
remote: Dict[str, Any],
339+
evaluated_host: Optional[str] = None,
334340
) -> str:
335341
"""
336342
Find out which job platform to use given a list of possible platforms
@@ -385,6 +391,7 @@ def platform_name_from_job_info(
385391
job: Workflow config [runtime][TASK][job] section.
386392
remote: Workflow config [runtime][TASK][remote] section.
387393
platforms: Dictionary containing platform definitions.
394+
evaluated_host: Host is the result of evaluating a subshell.
388395
389396
Returns:
390397
platform: string representing a platform from the global config.
@@ -422,7 +429,9 @@ def platform_name_from_job_info(
422429

423430
# NOTE: Do NOT use .get() on OrderedDictWithDefaults -
424431
# https://github.com/cylc/cylc-flow/pull/4975
425-
if 'host' in remote and remote['host']:
432+
if evaluated_host:
433+
task_host = evaluated_host
434+
elif 'host' in remote and remote['host']:
426435
task_host = remote['host']
427436
else:
428437
task_host = 'localhost'

cylc/flow/scripts/cat_log.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
),
107107
'i': ('install log', r'install/*-*install.log'),
108108
's': ('scheduler log', r'scheduler/*-*start*.log'),
109+
'v': ('version control info (JSON)', r'version/*'),
109110
}
110111

111112

0 commit comments

Comments
 (0)