Skip to content

Commit 07dabc2

Browse files
committed
Fix flaw in logic for selecting new platform from group
Tasks using a platform group could be recorded as submitting to an unavailable platform when in fact a new platform had been selected
1 parent 127414a commit 07dabc2

1 file changed

Lines changed: 56 additions & 63 deletions

File tree

cylc/flow/task_job_mgr.py

Lines changed: 56 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -334,9 +334,19 @@ def submit_livelike_task_jobs(
334334
# Non-prepared tasks can be considered done for now:
335335
done_tasks = bad_tasks
336336

337-
for _, itasks in sorted(platform_itasks.items()):
338-
platform = self._get_platform_with_good_host(itasks, done_tasks)
339-
if not platform:
337+
for _platform_name, itasks in sorted(platform_itasks.items()):
338+
# All tasks in this iteration have the same platform
339+
platform = itasks[0].platform
340+
341+
if self.bad_hosts.issuperset(platform['hosts']):
342+
# Out of hosts for this platform.
343+
for itask in itasks:
344+
# Get another platform, if task config platform is a group
345+
# (Note there may be tasks with different but intersecting
346+
# platform groups)
347+
if not self._select_new_platform(itask):
348+
# else set task to submit failed.
349+
self._platform_submit_failure(itask, done_tasks)
340350
continue
341351

342352
install_target = get_install_target_from_platform(platform)
@@ -563,69 +573,52 @@ def submit_livelike_task_jobs(
563573
)
564574
return done_tasks
565575

566-
def _get_platform_with_good_host(
567-
self, itasks: 'Iterable[TaskProxy]', done_tasks: 'List[TaskProxy]'
568-
) -> Optional[dict]:
569-
"""Find the first platform with at least one host that has not been
570-
tried and found to be unreachable.
571-
572-
If there are no good hosts for the tasks then we set them to
573-
submit-failed.
576+
def _select_new_platform(self, itask: 'TaskProxy') -> bool:
577+
"""Try to select a new platform for a task if it is using a
578+
platform group and the current platform is not available.
574579
575-
Args:
576-
itasks: tasks that share the same platform/platform group.
577-
done_tasks: list in which to put tasks that we could not find a
578-
platform for.
579-
580-
Returns:
581-
The platform with a good host, or None if no such platform is found
580+
Return True if a new platform was selected.
582581
"""
583-
out_of_hosts = False
584-
for itask in itasks:
585-
# If there are any hosts left for this platform which we
586-
# have not previously failed to contact with a 255 error.
587-
out_of_hosts |= self.bad_hosts.issuperset(itask.platform['hosts'])
588-
if not out_of_hosts:
589-
return itask.platform
590-
591-
# If there are no hosts left for this platform.
592-
# See if you can get another platform from the group or
593-
# else set task to submit failed.
594-
platform: Optional[dict] = None
595-
rtconf = self.task_events_mgr.broadcast_mgr.get_updated_rtconfig(
596-
itask
597-
)
598-
with suppress(PlatformLookupError):
599-
platform = get_platform(rtconf, bad_hosts=self.bad_hosts)
600-
601-
# If were able to select a new platform;
602-
if platform and platform != itask.platform:
603-
# store the previous platform's hosts so that when
604-
# we record a submit fail we can clear all hosts
605-
# from all platforms from bad_hosts.
606-
self.bad_hosts_to_clear.update(itask.platform['hosts'])
607-
itask.platform = platform
608-
return platform
609-
610-
itask.waiting_on_job_prep = False
611-
itask.local_job_file_path = None
612-
self._prep_submit_task_job_error(itask, '(remote init)', '')
613-
# Now that all hosts on all platforms in platform
614-
# group selected in task config are exhausted we
615-
# clear bad_hosts for all the hosts we have
616-
# tried for this platform or group.
617-
self.bad_hosts -= set(itask.platform['hosts'])
618-
self.bad_hosts -= self.bad_hosts_to_clear
619-
self.bad_hosts_to_clear.clear()
620-
LOG.critical(
621-
PlatformError(
622-
f"{PlatformError.MSG_INIT} (no hosts were reachable)",
623-
itask.platform['name'],
624-
)
582+
rtconf = self.task_events_mgr.broadcast_mgr.get_updated_rtconfig(
583+
itask
584+
)
585+
try:
586+
new_platform = get_platform(rtconf, bad_hosts=self.bad_hosts)
587+
except PlatformLookupError:
588+
return False
589+
# If were able to select a new platform;
590+
if new_platform and new_platform != itask.platform:
591+
# store the previous platform's hosts so that when
592+
# we record a submit fail we can clear all hosts
593+
# from all platforms from bad_hosts.
594+
self.bad_hosts_to_clear.update(itask.platform['hosts'])
595+
itask.platform = new_platform
596+
self._prep_submit_task_job_impl(itask, rtconf)
597+
return True
598+
return False
599+
600+
def _platform_submit_failure(
601+
self, itask: 'TaskProxy', done_tasks: 'List[TaskProxy]'
602+
) -> None:
603+
"""If there are no good platforms for a task then we set it to
604+
submit-failed."""
605+
itask.waiting_on_job_prep = False
606+
itask.local_job_file_path = None
607+
self._prep_submit_task_job_error(itask, '(remote init)', '')
608+
# Now that all hosts on all platforms in platform
609+
# group selected in task config are exhausted we
610+
# clear bad_hosts for all the hosts we have
611+
# tried for this platform or group.
612+
self.bad_hosts -= set(itask.platform['hosts'])
613+
self.bad_hosts -= self.bad_hosts_to_clear
614+
self.bad_hosts_to_clear.clear()
615+
LOG.critical(
616+
PlatformError(
617+
f"{PlatformError.MSG_INIT} (no hosts were reachable)",
618+
itask.platform['name'],
625619
)
626-
done_tasks.append(itask)
627-
628-
return None
620+
)
621+
done_tasks.append(itask)
629622

630623
def _create_job_log_path(self, itask):
631624
"""Create job log directory for a task job, etc.

0 commit comments

Comments
 (0)